none
Filling gaps in increasing sequence

    Question

  • Let's say the input sequence looks like this

     

    IObservable<int
    > src = new
    [] { 1, 2, 4, 5, 8 }.ToObservable();
    

     

    I need to transform it into 1,2,3,4,5,6,7,8

    What is the "Rx" way of doing this?

    Below is what I could come up with

     

      private static IObservable<int> FillGaps(IObservable<int> source)
      {
       int expected = -1;
    
       return source.SelectMany(x =>
       {
        if (expected < 0) 
         expected = x;
    
        var e = Observable.Range(expected, x - expected + 1);
    
        expected = x + 1;
    
        return e;
       });
      }
    

    Can it be improved?

    Thanks!

     

     

     


    Serge
    Thursday, February 10, 2011 5:25 PM

Answers

  • Hi Serge,

    The new Expand operator seems appropriate, although it's not as natural of a fit as the other solutions posted already.  I think this solution would require some type of Metamorphism, although not just as cata -> ana, but instead para -> apo.  Can the Rx team confirm whether Expand is dual to Scan?  And also whether Scan is in fact a paramorphism and Expand is an apomorphism?

    Here's a working example.  I've assumed that your actual sequence might not start from 1.

    using System;
    using System.Concurrency;
    using System.Linq;
    
    namespace ReactiveProgrammingConsole
    {
    	class MetamorphismLab
    	{
    		static void Main()
    		{
    			var xs = new[] { 4, 5, 7, 8, 11 }.ToObservable();
    
    			var ys = xs.Metamorph(
    				// fold
    				new { Start = (int?) null, End = (int?) null },
    				(acc, cur) => new { Start = acc.End, End = (int?) cur },
    
    				// adapt
    				acc => Tuple.Create(acc.Start + 1, acc.End),
    
    				// unfold
    				gen => (!gen.Item1.HasValue || gen.Item1 == gen.Item2)
    					? Observable.Empty<Tuple<int?, int?>>()
    					: Observable.Return(Tuple.Create(gen.Item1 + 1, gen.Item2)),
    
    				// project
    				gen => (gen.Item1 ?? gen.Item2).Value
    			);
    
    			using (ys.Subscribe(Console.WriteLine))
    			{
    				Console.ReadKey();
    			}
    		}
    	}
    
    	public static partial class ObservableEx2
    	{
    		public static IObservable<TResult> Metamorph<TSource, TAccumulate, TIntermediate, TResult>(
    			this IObservable<TSource> source,
    			TAccumulate seed,
    			Func<TAccumulate, TSource, TAccumulate> accumulator,
    			Func<TAccumulate, TIntermediate> intermediateSelector,
    			Func<TIntermediate, IObservable<TIntermediate>> generator,
    			Func<TIntermediate, TResult> selector)
    		{
    			return source.Scan(seed, accumulator)
    					.Select(intermediateSelector)
    					.Expand(generator, Scheduler.Immediate)
    					.Select(selector);
    		}
    	}
    }
    
    

    - Dave


    http://davesexton.com/blog
    Sunday, February 13, 2011 6:15 AM

All replies

  • My variant:

    return source.StartWith(-1).Zip(source, (l, r) => l == -1 ? EnumerableEx.Return(r) : Enumerable.Range(l, r - l))  
      .SelectMany(list => list)
    

    Anoher one using Scan instead of Zip:

    return source.Scan(new List(), (list, value) =>
      {
      if (list.IsEmpty())
       return new List<int> { value };
      return Enumerable.Range(list.Last() + 1, value - list.Last()).ToList();
      }).SelectMany(o => o);
    

    Probably both can be improved further

    Thursday, February 10, 2011 6:32 PM
  • If you do it with SelectMany then you need to wrap it in Defer otherwise the "expected" value will be shared between multiple subscriptions.

    Btw, if you use SelectMany remember that the outputs are merged together.  So you should either use the immediate scheduler for the Observable.Range or use the new Concat operator.

     

    private static IObservable<int> FillGaps(IObservable<int> source)
     {
     Observable.Defer(() => {
      int expected = -1;
    
      return source.SelectMany(x =>
      {
      if (expected < 0) 
      expected = x;
    
      var e = Observable.Range(expected, x - expected + 1, Scheduler.Immediate);
    
      expected = x + 1;
    
      return e;
      });
     });
    }
    

    Out of curiosity...I expect that the important thing you are trying to do is retain the timing information otherwise you could simply just get the first element and the last element and make a range.  So I guess you want the timing information perhaps for liveliness.

     

    Thursday, February 10, 2011 9:53 PM
  • Thanks, Wes! I did not realize that "expected" variable would get shared between subscribers. Is there a cleaner way of implementing something like this? How would you do this with Concat?

    Out of curiosity...I expect that the important thing you are trying to do is retain the timing information otherwise you could simply just get the first element and the last element and make a range.  So I guess you want the timing information perhaps for liveliness.

    I don't care about timing. The source data comes already timestamped. However, I can't apply Range, the data is not just stream of ints.

     


    Serge
    Thursday, February 10, 2011 10:16 PM
  • The following are some of the ways to generate a sequence given the two end-points:

          var src = new[] { 1, 2, 4, 5, 8 }.ToObservable();
    
          var results = src.Scan(new { Previous = 0, Current = 0 }, (a, value) => new { Previous = a.Current, Current = value })
                   .SelectMany(p => Observable.Range(p.Previous+1, p.Current - p.Previous));
    
    
          var results2 = src.Scan(new { Previous = 0, Current = 0 }, (a, value) => new { Previous = a.Current, Current = value })
                   .SelectMany(p => Observable.Generate(p.Previous+1, i => i <= p.Current, i => i + 1, i => i));
    
          var results3 = src.Scan(new { Previous = 0, Current = 0 }, (a, value) => new { Previous = a.Current, Current = value })
                   .SelectMany(p => MethodReturningIEnumerableOfItems(p.Previous, p.Current));
    
    

    with MethodReturningIEnumerableOfItems being:

     private IEnumerable<int> MethodReturningIEnumerableOfItems(int previous, int current)
        {
          for (int i = previous+1; i <= current; i++)
          {
            yield return i;
          }
        }
    

    Martin

     

    Friday, February 11, 2011 4:16 AM
  • What about;

    (Enumerable)

    from tuple in source.BufferWithCount(2,1)
    from x in tuple.Count == 1
    	? tuple
    	: Enumerable.Range(tuple[0], tuple[1] - tuple[0])
    select x;
    

     (Observable)

    from tuple in source.ToObservable().BufferWithCount(2,1)
    from x in tuple.Count == 1
    	? tuple.ToObservable(Scheduler.Immediate)
    	: Observable.Range(tuple[0], tuple[1] - tuple[0], Scheduler.Immediate)
    select x;
    

    James Miles http://enumeratethis.com
    Saturday, February 12, 2011 11:39 AM
  • Hi Serge,

    The new Expand operator seems appropriate, although it's not as natural of a fit as the other solutions posted already.  I think this solution would require some type of Metamorphism, although not just as cata -> ana, but instead para -> apo.  Can the Rx team confirm whether Expand is dual to Scan?  And also whether Scan is in fact a paramorphism and Expand is an apomorphism?

    Here's a working example.  I've assumed that your actual sequence might not start from 1.

    using System;
    using System.Concurrency;
    using System.Linq;
    
    namespace ReactiveProgrammingConsole
    {
    	class MetamorphismLab
    	{
    		static void Main()
    		{
    			var xs = new[] { 4, 5, 7, 8, 11 }.ToObservable();
    
    			var ys = xs.Metamorph(
    				// fold
    				new { Start = (int?) null, End = (int?) null },
    				(acc, cur) => new { Start = acc.End, End = (int?) cur },
    
    				// adapt
    				acc => Tuple.Create(acc.Start + 1, acc.End),
    
    				// unfold
    				gen => (!gen.Item1.HasValue || gen.Item1 == gen.Item2)
    					? Observable.Empty<Tuple<int?, int?>>()
    					: Observable.Return(Tuple.Create(gen.Item1 + 1, gen.Item2)),
    
    				// project
    				gen => (gen.Item1 ?? gen.Item2).Value
    			);
    
    			using (ys.Subscribe(Console.WriteLine))
    			{
    				Console.ReadKey();
    			}
    		}
    	}
    
    	public static partial class ObservableEx2
    	{
    		public static IObservable<TResult> Metamorph<TSource, TAccumulate, TIntermediate, TResult>(
    			this IObservable<TSource> source,
    			TAccumulate seed,
    			Func<TAccumulate, TSource, TAccumulate> accumulator,
    			Func<TAccumulate, TIntermediate> intermediateSelector,
    			Func<TIntermediate, IObservable<TIntermediate>> generator,
    			Func<TIntermediate, TResult> selector)
    		{
    			return source.Scan(seed, accumulator)
    					.Select(intermediateSelector)
    					.Expand(generator, Scheduler.Immediate)
    					.Select(selector);
    		}
    	}
    }
    
    

    - Dave


    http://davesexton.com/blog
    Sunday, February 13, 2011 6:15 AM