過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
- Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」
- Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」
- Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」
- Reactive Extensions再入門 その39「Subject系クラス」
- Reactive Extensions再入門 その40「IObservableの合成はじめました」
- Reactive Extensions再入門 その41「どんどん合成するよ」
- Reactive Extensions再入門 その42「StartWithメソッドとJoinメソッド」
- Reactive Extensions再入門 その43「GroupJoinメソッド」
- Reactive Extensions再入門 その44「And, Then, Whenメソッド」
- Reactive Extensions再入門 その45「Scheduler」
- Reactive Extensions再入門 その46「 Reactive Extensions 入門 」
- Reactive Extensions再入門 その47「 Reactive Extensions 入門 ソースコードスニペット集」
はじめに
しばらく間があいたRx関連の記事ですがぼっちぼっち思い出したようにやっていこうと思います。Rx2.0の正式版が出たら、また加速するかも?ということで、今回はRxを使ったら簡単にこんな処理かけるよ!というのをやってみようと思います。Rx使わない版はだれか書いてくれるとうれしいな♪
連続して発生する値の監視
下記のようなケースを考えます。
0〜1秒間隔で0〜9の値を発行するものがあります。発行された値を1秒間隔で10秒単位にまとめて発行された値の総和が100を超えた場合は異常と表示して、100より小さい場合は平常と表示しなさい。
非常に単純な例ですが、時間がからんでくるので非常に厄介です。Reactive Extensionsは、このような時間とともに発生する値を処理するのが非常に得意です。コード例を下記に示します。
// ダミーデータ作成メソッド static IEnumerable<int> GetSource() { var r = new Random(); while (true) { yield return r.Next(10); Thread.Sleep(r.Next(1000)); } }
まずは、要件を満たす値を生成するメソッドを定義しました。これをベースに処理を書いていきます。
var subscriber = GetSource() // 別スレッドで値を発行するIObservableのシーケンスにする .ToObservable(Scheduler.NewThread) // 1秒間隔で10秒間値をためる .Window(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1)) // 10秒ためた値の合計をとって .SelectMany(o => o.Sum()) // 異常か正常か判断 .Select(i => new { Value = i, Message = i < 100 ? "正常" : "異常" }) // 結果を表示 .Subscribe(v => Console.WriteLine("{0:HH:mm:ss.fff} {1}", DateTime.Now, v)); // Enterを押すまで待機 Console.ReadLine(); // 購読解除 subscriber.Dispose();
1秒間隔で10秒間の値を抜き出すという厄介な処理は、Reactive ExtensionsではWindowメソッドの時間を指定するメソッドで一発で行えます。そのあとSelectManyで合計に変換して異常か正常か判定して表示しています。実行結果を以下に示します。
22:00:06.758 { Value = 84, Message = 正常 } 22:00:07.771 { Value = 85, Message = 正常 } 22:00:08.785 { Value = 96, Message = 正常 } 22:00:09.799 { Value = 99, Message = 正常 } 22:00:10.813 { Value = 95, Message = 正常 } 22:00:11.827 { Value = 102, Message = 異常 } 22:00:12.841 { Value = 106, Message = 異常 } 22:00:13.855 { Value = 88, Message = 正常 } 22:00:14.869 { Value = 92, Message = 正常 } 22:00:15.883 { Value = 109, Message = 異常 }
期待通りに動いていることが確認できます。Reactive Extensionsを使うと受け取った値に対してリアルタイムに集計して判定を行い結果を表示するという処理がとてもシンプルに記述できます。