# 19. Rx ________________________________________ 連載:Reactive Extensions(Rx)入門 https://atmarkit.itmedia.co.jp/fdotnet/introrx/index/index.html foreach - C# によるプログラミング入門 | ++C++; // 未確認飛行 C https://ufcpp.net/study/csharp/sp_foreach.html?key=foreach#foreach Reactive Extensions再入門 その3「IObservableのファクトリメソッド」 https://blog.okazuki.jp/entry/20111104/1320409976 Reactive Extensions入門 + メソッド早見解説表 http://neue.cc/2010/07/28_269.html Rxでのイベント変換まとめ - FromEvent vs FromEventPattern http://neue.cc/2011/07/06_332.html ________________________________________ ## 1. その前に、foreachを理解 ________________________________________ C#のforeach ```text 中身はIteratorパターンそのもの IEnumerable : IEnumerable // IEnumerator GetEnumerator() IEnumerator GetEnumerator() IEnumerator : IEnumerator // void Reset() // object Current T Current bool MoveNext() C#2.0以降は、GetEnumeratorさえ具象実装があればOK C#8.0以降は、GetEnumeratorが拡張メソッドで提供されてもOK ``` ________________________________________ ## 2. Rx ________________________________________ Rx概要 ```text IObservable IDisposable Subscribe(IObserver) IObserver void OnNext(T value) void OnError(Exception) void OnCompleted() foreachは、 Enumerableに対応して生成されたEnumeratorにより、 Enumerator(イテレータ)がEnumerable(具象コレクション)を調べて要素を取り出す(Pull) Rxは、 ObservableをSubscribeするObserverを登録することで、 Observable(イベントソース)がObserver(観察者/イベントハンドラ)へ通知する(Push) ※ 典型的なpub/subやget/setはそうなのだが、主語と目的語が逆向き mqttClient.publish(msg) : 別にブローカー(mqttClient)がpublishしているわけではない mqttClient.subscribe(matchPattern) : 別にブローカー(mqttClient)がsubscribeしているわけではない src.Subscrive(observer)は、observerがsrcを購読(観察)する、というニュアンス。 ``` 実際の使い方のイメージ ```cs using System; using System.Diagnostics; using System.Net.Http; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; using System.Windows.Forms; // ... // ... int[] numArray = new int[3] { 1, 5, 10 }; Button btn = this.button1; HttpClient hc = new HttpClient(); string url = "https://example.com/"; // 例1:従来のコレクションをObservableにラップした場合 var srcObs1 = numArray.ToObservable(); srcObs1.Subscribe(onNextI => { // 実質foreachと同じ Trace.WriteLine(onNextI); }); // 例2:従来のイベントハンドラをObservableにラップした場合 var srcObs2 = Observable.FromEvent( h => (sender, e) => h(e), h => btn.Click += h, h => btn.Click -= h); srcObs2.Subscribe(onNextE => { // 実質イベントハンドラの記述と同じ。 Trace.WriteLine($"button clicked : {onNextE}"); }); // 例3:Httpリクエスト非同期オブジェクトをObservableに変換した場合 var srcObs3 = hc.GetAsync(url).ToObservable(); srcObs3.Subscribe(onNextResponse => { // 実質非同期メソッドに対するawait後の記述と同じ var str = onNextResponse.Content.ReadAsStringAsync().Result; Trace.WriteLine($"response : {str}"); }); ``` ________________________________________ ## 3. 頻出演算子(RxJSの場合) ________________________________________ 前提:C# の注意点 - System.Reactive.LinqのThrottleは、Debounceしている(後続のRxライブラリはThrottleとDebounceが明確に別操作である) pipeの中で使う形(Completeに関わらない) 演算子 |要約 ------------|------------------------------- map |受け取った値を任意の関数で加工して返す mergeMap |受け取った値を元に任意のobservableソースを用意して返す。全てのobservableソースは合成される switchMap |受け取った値を元に任意のobservableソースを用意して返す。直近のobservableソースだけ維持され、それ以前のobservableソースは無くなる retry |throwされてきた時、指定した回数だけ再度初めからやり直す。v7.3以降はdelayオプションが可能 scan |reduceのような動きをするが、各結果を流す - pipeの中で使う形(Completeに関わる) 演算子 |要約 ------------|------------------------------- take |指定した回数だけ受け取ったら強制的にCompleteする reduce |Completeするまで受け取った値をreduceする。Completeするまで後続に流れない - Observableチェーン 演算子 |要約 ------------|------------------------------- pipe |pipeの中で使う演算子のために使う bufferTime |一定時間ごとに配列にまとめたものを流す。(その間に入力がなければ空の配列が流れる) auditTime |間引き系。待機状態から値を受け取ったら、一定時間待ってから最後に受け取った値だけを流す debounceTime|間引き系。一定時間新たな値を受け取らなかったら、最後に受け取った値だけ流す(連打する限り後続に流れない) throttleTime|間引き系。待機状態から値を受け取ったら、それは流してその後一定時間受け取った値は捨てる - 合成 演算子 |要約 ------------|------------------------------- merge |複数のobservableを1つにまとめたobservableを作る -