かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その48「連続して発生する値の監視」

過去記事インデックス

はじめに

しばらく間があいたRx関連の記事ですがぼっちぼっち思い出したようにやっていこうと思います。Rx2.0の正式版が出たら、また加速するかも?ということで、今回はRxを使ったら簡単にこんな処理かけるよ!というのをやってみようと思います。Rx使わない版はだれか書いてくれるとうれしいな♪

連続して発生する値の監視

下記のようなケースを考えます。

0〜1秒間隔で0〜9の値を発行するものがあります。発行された値を1秒間隔で10秒単位にまとめて発行された値の総和が100を超えた場合は異常と表示して、100より小さい場合は平常と表示しなさい。

非常に単純な例ですが、時間がからんでくるので非常に厄介です。Reactive Extensionsは、このような時間とともに発生する値を処理するのが非常に得意です。コード例を下記に示します。

// ダミーデータ作成メソッド
static IEnumerable<int> GetSource()
{
    var r = new Random();
    while (true)
    {
        yield return r.Next(10);
        Thread.Sleep(r.Next(1000));
    }
}

まずは、要件を満たす値を生成するメソッドを定義しました。これをベースに処理を書いていきます。

var subscriber = GetSource()
    // 別スレッドで値を発行するIObservableのシーケンスにする
    .ToObservable(Scheduler.NewThread)
    // 1秒間隔で10秒間値をためる
    .Window(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1))
    // 10秒ためた値の合計をとって
    .SelectMany(o => o.Sum())
    // 異常か正常か判断
    .Select(i => new { Value = i, Message = i < 100 ? "正常" : "異常" })
    // 結果を表示
    .Subscribe(v => 
        Console.WriteLine("{0:HH:mm:ss.fff} {1}", DateTime.Now, v));

// Enterを押すまで待機
Console.ReadLine();
// 購読解除
subscriber.Dispose();

1秒間隔で10秒間の値を抜き出すという厄介な処理は、Reactive ExtensionsではWindowメソッドの時間を指定するメソッドで一発で行えます。そのあとSelectManyで合計に変換して異常か正常か判定して表示しています。実行結果を以下に示します。

22:00:06.758 { Value = 84, Message = 正常 }
22:00:07.771 { Value = 85, Message = 正常 }
22:00:08.785 { Value = 96, Message = 正常 }
22:00:09.799 { Value = 99, Message = 正常 }
22:00:10.813 { Value = 95, Message = 正常 }
22:00:11.827 { Value = 102, Message = 異常 }
22:00:12.841 { Value = 106, Message = 異常 }
22:00:13.855 { Value = 88, Message = 正常 }
22:00:14.869 { Value = 92, Message = 正常 }
22:00:15.883 { Value = 109, Message = 異常 }

期待通りに動いていることが確認できます。Reactive Extensionsを使うと受け取った値に対してリアルタイムに集計して判定を行い結果を表示するという処理がとてもシンプルに記述できます。