none
sliding window

    Question

  •  I’m trying to write query to get sliding window of Ids for 2 days over following event stream:

    public class Source
    {
        public int Id;
        public DateTime Date;
    }

    var source = new[] { new Source{ Id = 1, Date = new DateTime(2011, 11, 1) }, new Source{ Id = 2, Date = new DateTime(2011, 11, 1) }, new Source{ Id = 3, Date = new DateTime(2011, 11, 2) }, new Source{ Id = 4 , Date = new DateTime(2011, 11, 2) }, new Source{ Id = 5 , Date = new DateTime(2011, 11, 2) }, new Source{ Id = 6 , Date = new DateTime(2011, 11, 2) }, new Source{ Id = 7, Date = new DateTime(2011, 11, 3) }, new Source{ Id = 8, Date = new DateTime(2011, 11, 3) }, new Source{ Id = 9, Date = new DateTime(2011, 11, 3) }, new Source{ Id = 10, Date = new DateTime(2011, 11, 4) }, new Source{ Id = 11, Date = new DateTime(2011, 11, 4) }, new Source{ Id = 12, Date = new DateTime(2011, 11, 5) }, }.ToObservable();

    I expect to get following result:

    var expectedResult =
                new[]
                {
                    new[] { 1, 2, 3, 4, 5, 6 }.ToObservable(),
                    new[] { 3, 4, 5, 6, 7, 8, 9 }.ToObservable(),
                    new[] { 7, 8, 9, 10, 11 }.ToObservable(),
                    new[] { 10, 11, 12 }.ToObservable(),	<br/>            }.ToObservable();
    

    I would think that Window function should do the trick for me:

    public static IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(
    	this IObservable<TSource> source,
    	IObservable<TWindowOpening> windowOpenings,
    	Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector
    )r
    

    Unfortunatly, I was unable to build correct query.

    Also, I would like to add that seems like the whole "Programming Streams of Coincidence"-GroupJoin is very important concept but there are so little documentation and very few samples.

    Thanks,
    Dmitry Morozov

     


    • Edited by mitekm Friday, September 16, 2011 8:32 PM .
    Friday, September 16, 2011 8:03 PM

All replies

  • Hi Dmitry,

    The problem with Window in your case is that when a data-based sliding window is closed, the value that closed it is also included in that window.  You need the value to be excluded.

    A GroupJoin query works the same way as Window, except that the particular value that opens a window is within the scope of the projection.

    Try the following query, which compiles to GroupJoin.  Note that I'm using the distinct variable in the final Where clause to filter out values in each window that were responsible for closing them.

    const int days = 2;
    
    var query = source.Publish(published =>
    	from distinct in published.Distinct(value => value.Date)
    	join current in published
    	on published.Where(value => value.Date == distinct.Date.AddDays(days))
    	equals Observable.Empty<object>()
    	into window
    	select window.Where(value => value.Date < distinct.Date.AddDays(days)));
    

    A simpler alternative is to use Window instead of GroupJoin and then project each window into a new query that adds SkipLast(1) to each window; however, this has the unfortunate side-effect of dropping the last value of the input sequence from the last window that is created.

    - Dave


    http://davesexton.com/blog
    Saturday, September 17, 2011 4:58 AM
  • Hi David,

    Thank you for the answer.
    I need some time to understand how it works.

     

    Dmitry. 

    Sunday, September 18, 2011 3:27 AM