私は、観察可能なものの最新の20個の値を取得し、それをブロッキングが発生しないプロパティとして公開しようとしています。現時点では、私のコードは次のようになります:
class Foo
{
private IObservable observable;
public Foo(IObservable bar)
{
this.observable = bar;
}
public IEnumerable MostRecentBars
{
get
{
return this.observable.TakeLast(20).ToEnumerable();
}
}
}
しかし、MostRecentBarsゲッターが呼び出されると、おそらくToEnumerableが観測値が20個以上になるまで戻りませんので、これはブロックされています。
ブロックせずに観測値の最新の値を最大20個まで公開する組み込みの方法はありますか?観測値が20未満の場合は、すべての値を返す必要があります。
私はあなたに2つの選択肢を与えます。 Rx Scan
演算子を使用していますが、読みやすくするためには少し複雑です。もう1つは標準の Queue
をロックして使用します。選んでいいですよ。
(1)
class Foo
{
private int[] bars = new int[] { };
public Foo(IObservable bar)
{
bar
.Scan(
new int[] { },
(ns, n) =>
ns
.Concat(new [] { n, })
.TakeLast(20)
.ToArray())
.Subscribe(ns => bars = ns);
}
public IEnumerable MostRecentBars
{
get
{
return bars;
}
}
}
(2)
class Foo
{
private Queue queue = new Queue();
public Foo(IObservable bar)
{
bar.Subscribe(n =>
{
lock (queue)
{
queue.Enqueue(n);
if (queue.Count > 20)
{
queue.Dequeue();
}
}
});
}
public IEnumerable MostRecentBars
{
get
{
lock (queue)
{
return queue.ToArray();
}
}
}
}
私はこれらの助けを願う。
私はリアクティブエクステンションを使ってビルドしたプロジェクトにアタッチする傾向があるいくつかの拡張があります。その1つはスライディングウィンドウです:
public static IObservable> SlidingWindow(this IObservable o, int length)
{
Queue window = new Queue();
return o.Scan>(new T[0], (a, b) =>
{
window.Enqueue(b);
if (window.Count > length)
window.Dequeue();
return window.ToArray();
});
}
これは、直近のN個のアイテムの配列を返します(まだN個のアイテムがない場合はそれよりも少なくなります)。
あなたの場合、あなたはできるはずです:
class Foo
{
private IObservable observable;
private int[] latestWindow = new int[0];
IDisposable slidingWindowSubscription;
public Foo(IObservable bar)
{
this.observable = bar;
slidingWindowSubscription = this.observable.SlidingWindow(20).Subscribe(a =>
{
latestWindow = a;
});
}
public IEnumerable MostRecentBars
{
get
{
return latestWindow;
}
}
}
私はあなたの要件に合った組み込みのRxオペレータを考えることはできません。このように実装することができます:
class Foo
{
private IObservable observable;
private Queue buffer = new Queue();
public Foo(IObservable bar)
{
this.observable = bar;
this.observable
.Subscribe(item =>
{
lock (buffer)
{
if (buffer.Count == 20) buffer.Dequeue();
buffer.Enqueue(item);
}
});
}
public IEnumerable MostRecentBars
{
get
{
lock (buffer)
{
return buffer.ToList(); //Create a copy.
}
}
}
}
あなたはすでにあなたの答えを得ていますが、私はバッファでReplay Subjectを使ってこれを解決しようと考えていて、次のようなものを考え出しました:
class Foo
{
private ReplaySubject replay = new ReplaySubject(20);
public Foo(IObservable bar)
{
bar.Subscribe(replay);
}
public IEnumerable MostRecentBars
{
get
{
var result = new List();
replay.Subscribe(result.Add); //Replay fill in the list with buffered items on same thread
return result;
}
}
}
これがあなたの問題に合っているかどうか教えてください。