Throttle with count and time and result returned immidiattelly
-
domingo, 03 de junio de 2012 11:15
My requirement is:
process no more than 3 messages per 1 minute.
So let say I have sequence of messages:
1. 00:10 "Message"
2. 00:14 "Message"
3. 00:25 "Message"
4. 00:35 "Message" // <-- should be discarded
5. 00:55 "Message" // <-- should be discarded
6. 00:56 "Message" // <-- should be discarded
7. 01:01 "New message"
I wan't to receive in output sequence only messages 1, 2, 3, 7, but I do not wan't to buffer messages, they should appear in output immediately as appear.
I have tried to do this task with Observable Window, but I can't figure out how to implement skipping remaining items which do not conform to requirements
var qq = Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.NewThread) .Window(TimeSpan.FromMinutes(1), 3)
If I understand correctly how Window work (marble diagram):
x - x - x - x - x - x - x
x - x - x |
x - x - x | <--- This window should be skipped
x
I saw several discussion where similar problem solved, be can't figure out how to implement this.
Todas las respuestas
-
lunes, 04 de junio de 2012 10:52
Well I was able to build an immidiate throttle, with forum help.
I bring the modified solution from this thread http://social.msdn.microsoft.com/Forums/en/rx/thread/ec692dce-7213-42a6-9dba-296938326328 and update it to Rx 1.0
The extension:
public static IObservable<T> IntervalCountThrottle<T>(this IObservable<T> source, TimeSpan interval, int count, IScheduler scheduler) { return Observable.Create<T>(o => { var timer = Observable.Timer(TimeSpan.Zero, interval).Publish(); var src = source.Publish(); var main = src.SkipUntil(timer).Take(count).Repeat().Subscribe(o.OnNext, o.OnError, o.OnCompleted); return new CompositeDisposable(timer.Connect(), src.Connect(), main); }); }The test sequence:
var input = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp(); using (input.IntervalCountThrottle(TimeSpan.FromSeconds(10), 3).Subscribe( x => { Console.WriteLine("Value : {0:mm:ss} - {1}", x.Timestamp, x.Value); })) { Console.WriteLine("Press any key to continue!"); Console.ReadKey(); }It produce the output as expected:
Value : 51:33 - 0 Value : 51:34 - 1 Value : 51:35 - 2 Value : 51:43 - 10 Value : 51:44 - 11 Value : 51:45 - 12
- Marcado como respuesta StouneUA lunes, 04 de junio de 2012 15:43

