Throttle with count and time and result returned immidiattelly

Answered Throttle with count and time and result returned immidiattelly

  • domingo, 03 de junio de 2012 11:15
     
      Tiene código

    My requirement is:

    process no more than 3 messages per 1 minute.

    So let say I have sequence of messages:

    1. 00:10 "Message"

    2. 00:14 "Message"

    3. 00:25 "Message"

    4. 00:35 "Message" // <-- should be discarded

    5. 00:55 "Message" // <-- should be discarded

    6. 00:56 "Message" // <-- should be discarded

    7. 01:01 "New message"


    I wan't to receive in output sequence only  messages 1, 2, 3, 7, but I do not wan't to buffer messages, they should appear in output immediately as appear.

    I have tried to do this task with Observable Window, but I can't figure out how to implement skipping remaining items which do not conform to requirements

    var qq = Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.NewThread)
                    .Window(TimeSpan.FromMinutes(1), 3)

    If I understand correctly how Window work (marble diagram):

    x - x - x - x - x - x - x

    x - x - x |

               x - x - x |  <--- This window should be skipped         

                        x


    I saw several discussion where similar problem solved, be can't figure out how to implement this.

Todas las respuestas

  • lunes, 04 de junio de 2012 10:52
     
     Respondida Tiene código

    Well I was able to build an immidiate throttle, with forum help.

    I bring the modified solution from this thread http://social.msdn.microsoft.com/Forums/en/rx/thread/ec692dce-7213-42a6-9dba-296938326328 and update it to Rx 1.0

    The extension:

    public static IObservable<T> IntervalCountThrottle<T>(this IObservable<T> source, TimeSpan interval, int count, IScheduler scheduler)
            {
                return Observable.Create<T>(o =>
                {
                    var timer = Observable.Timer(TimeSpan.Zero, interval).Publish();
                    var src = source.Publish();
                    var main = src.SkipUntil(timer).Take(count).Repeat().Subscribe(o.OnNext, o.OnError, o.OnCompleted);
                    return new CompositeDisposable(timer.Connect(), src.Connect(), main);
                });
            }

    The test sequence:

    var input = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp();
    
    using (input.IntervalCountThrottle(TimeSpan.FromSeconds(10), 3).Subscribe(
                    x =>
                        { Console.WriteLine("Value : {0:mm:ss} - {1}", x.Timestamp, x.Value); }))
                {
                    Console.WriteLine("Press any key to continue!");
                    Console.ReadKey();
                }

    It produce the output as expected:

    Value : 51:33 - 0
    Value : 51:34 - 1
    Value : 51:35 - 2
    Value : 51:43 - 10
    Value : 51:44 - 11
    Value : 51:45 - 12

    • Marcado como respuesta StouneUA lunes, 04 de junio de 2012 15:43
    •