none
Reactive Parser Combinators

    General discussion

  • I have the need to recognise patterns in a sequence of events and react by emitting new events. For example, recognise an event of type A followed by zero or more B’s followed by another A (i.e. “A B* A”). I think this is analogous to parsing a set of input symbols (characters) and emitting an abstract syntax tree, only I can’t wait to consume the entire input before producing some output.

    Following the lead of Luke Hoban’s excellent post on building monadic parser combinators using LINQ syntax, I think it would be great to be able to express the patterns the same way – i.e. create a new monad for “reactive” parser combinators. However, I’m having some trouble figuring out the signature for this kind of monad.

    Consider the non-reactive parser combinator (from LukeH):

    delegate Tuple<TValue, IEnumerable<TInput>> Parser<TInput, TValue>(IEnumerable<TInput> input);

    This delegate takes a list of symbols, consumes a number of symbols, and returns a tuple containing the entire AST (TValue) and any remaining unconsumed symbols. The “dual” to this signature appears to be something like:

    delegate IObservable<Tuple<TValue, IObservable<TInput>>> Parser<TInput, TValue>(IObservable<TInput> input);

    However this formulation is not “lazy” enough, as the entire AST must be generated before producing the tuple. I want to begin producing parts of the result as the input is consumed.

    I have also considered a more “continuation passing style” approach where the parser is provided a continuation representing the “rest” of the parse:

    delegate IObservable<TValue> Parser<TInput, TValue, TCont>(TInput input, Func<TInput, IObservable<TCont>> continuation);

    However, I don’t think this is a monad or at least I haven’t been able to write SelectMany() for it.

    Has anyone else looked at this problem? Or perhaps someone might be able to suggest another avenue to try? I would think that this kind of event pattern recognizer would be a generally useful addition to Rx.

    Saturday, May 01, 2010 7:45 PM

All replies

  • Hi,

    Are you looking for more advanced capabilities of the basic Join calculus implementation that is currently provided by Rx?

    Join Patterns in Rx
    http://social.msdn.microsoft.com/Forums/en-US/rx/thread/3ca3a0d4-ac61-4325-9fa2-ed622830b518

    It seems your parser concept would be possible with the addition of a new join operator like Or and new operators that allow you to specify cardinality; e.g., zero or more, one or more, at least N, etc.

    - Dave


    http://davesexton.com/blog
    Saturday, May 01, 2010 8:38 PM
  • Dave,

    As I understand it, the Join Patterns are used to react to patterns in events across channels, whereas I am interested in looking for patterns of events on the same channel. An example would be finding patterns in a stream of stock trade events, say "multiple upticks followed by a significant downtick".

    Furthermore I'm looking to express the patterns to match and the resulting production using a LINQ comprehension. For example,

    var alerts = from ups in Many(Uptick)
           from down in Downtick
           where down.Change > 10.0
           select new Alert { UpCount = ups.Count(), Correction = down.Change };
    

    Now perhaps there is a way to do this kind of thing with Join Patterns. If so I'd be very interested.

    - Steve.

    Sunday, May 02, 2010 1:10 AM
  • Hi SC Taylor,

    although you probably could use SelectMany() using a conversion function that has side-effects this would have one major backdraw: You could only subscribe once to the resulting observable because all subscriptions would share the same state of the conversion function passed to SelectMany()...

    Maybe the following thread could help you: Parsing a sequence, producing a sequence

    Two options are shown there: One by Dave using the "Builder-pattern" which passes on (accumulates) state and another one by me building up a buffer. Both solutions use Observable.Create() or Observable.CreateWithDisposable().


    Andreas

    Monday, May 03, 2010 9:13 AM
  • Hi,

    Good point Andreas.  I think "parsing" is generally a catamorphism, thus some form of Aggregate is appropriate here.  I've used Scan in my example below and also a custom version of BufferUntil that is reactive to another observable.

    However, SC, it doesn't seem like "parsing" or "patterns" are the correct terms to use when describing your requirements.  I still think the Join calculus would be appropriate for a true reactive parsing/patterns implementation, but what you seem to need is a general query expression that uses some buffering (as Andreas points out).

    That said, I've interpreted the aforementioned alerts query as follows:

      "When observing at least two Upticks followed by a reversal greater than 10, return the ups and down."

    Let's include an additional rule for alerts:

      "When observing at least two Downticks followed by a reversal greater than 20, return the downs and up."

    The following program is a console application that displays random ticks and alerts based on the rules above.  Although SelectMany is not used, the appearance of the alertDown and alertUp queries below are similar to what was shown in the requested alerts query.  Hopefully the several prerequisite queries that I've included illustrates why the requested alerts query is actually too simplistic to express the requirements to Rx.  (Unless I've just missed something, of course :)

    Note: The following code uses different colors in the console window to highlight changes.  I recommend running the program to get its full effect.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    
    namespace ReactiveProgrammingConsole
    {
     class StockTickerLab
     {
     static void Main()
     {
      var random = new Random();
      var ticks = Observable
      .Interval(TimeSpan.FromSeconds(1))
      .Select(_ => random.Next(1, 50));
    
      var deltas = ticks.Scan(
      new { Change = 0, Current = 0 },
      (acc, cur) => new { Change = cur - acc.Current, Current = cur })
      .Publish();
    
      var neutralTicks = deltas.Where(tick => tick.Change == 0);
      var upTicks = deltas.Where(tick => tick.Change > 0);
      var downTicks = deltas.Where(tick => tick.Change < 0);
    
      var upOrNeutralTicks = upTicks.Amb(neutralTicks);
      var downOrNeutralTicks = downTicks.Amb(neutralTicks);
    
      var consecutiveUpTicks = upTicks
      .BufferUntil(downOrNeutralTicks)
      .Where(ups => ups.Count > 0)
      .Repeat();
      
      var consecutiveDownTicks = downTicks
      .BufferUntil(upOrNeutralTicks)
      .Where(downs => downs.Count > 0)
      .Repeat();
    
      var reversalUpTicks = upTicks
      .SkipUntil(downOrNeutralTicks)
      .TakeUntil(neutralTicks)
      .Take(1)
      .Repeat();
    
      var reversalDownTicks = downTicks
      .SkipUntil(upOrNeutralTicks)
      .TakeUntil(neutralTicks)
      .Take(1)
      .Repeat();
    
      var reversalFromConsecutiveUpTicks = consecutiveUpTicks
      .TakeUntil(neutralTicks)
      .Repeat()
      .Zip(reversalDownTicks, 
      (ups, down) => new
      {
       Prefix = ups.Select(tick => tick.Current).ToList(),
       Current = down.Current,
       Correction = down.Change
      });
    
      var reversalFromConsecutiveDownTicks = consecutiveDownTicks
      .TakeUntil(neutralTicks)
      .Repeat()
      .Zip(reversalUpTicks,
      (downs, up) => new
      {
       Prefix = downs.Select(tick => tick.Current).ToList(),
       Current = up.Current,
       Correction = up.Change
      });
    
      const int alertUpMinCount = 2, alertDownMinChange = 11;
      const int alertDownMinCount = 2, alertUpMinChange = 21;
    
      var alertDown = reversalFromConsecutiveUpTicks
      .Where(tick => tick.Prefix.Count >= alertUpMinCount)
      .Where(tick => -tick.Correction >= alertDownMinChange);
      
      var alertUp = reversalFromConsecutiveDownTicks
      .Where(tick => tick.Prefix.Count >= alertDownMinCount)
      .Where(tick => tick.Correction >= alertUpMinChange);
    
      var alerts = alertDown.Merge(alertUp);
      
      using (deltas.Subscribe(tick => WriteTick(tick.Current, tick.Change)))
      using (alerts.Subscribe(alert => WriteAlert(alert.Prefix, alert.Current, alert.Correction)))
      using (deltas.Connect())
      {
      Console.ReadKey();
      Console.ResetColor();
      }
     }
    
     static readonly object sync = new object();
    
     static void WriteTick(int tick, int delta)
     {
      lock (sync)
      {
      if (delta > 0)
       Console.ForegroundColor = ConsoleColor.Green;
      else if (delta < 0)
       Console.ForegroundColor = ConsoleColor.Yellow;
      else
       Console.ResetColor();
    
      Console.WriteLine("{0,2} {1:+####;-####;}", tick, delta);
      Console.ResetColor();
      }
     }
    
     static void WriteAlert(IEnumerable<int> prefix, int current, int correction)
     {
      string prefixValue = prefix.Aggregate(
      new StringBuilder(),
      (builder, value) => builder.Append(value).Append(','),
      builder => (builder.Length == 0) ? string.Empty : builder.ToString(0, builder.Length - 1));
    
      lock (sync)
      {
      Console.BackgroundColor = (correction > 0) ? ConsoleColor.Green : ConsoleColor.Yellow;
      Console.ForegroundColor = ConsoleColor.Red;
      Console.WriteLine("{{{0}}} -> {1,2} {2:+####;-####;}", prefixValue, current, correction);
      Console.ResetColor();
      }
     }
     }
    
     public static partial class ObservableEx
     {
     public static IObservable<IList<TSource>> BufferUntil<TSource, TOther>(
      this IObservable<TSource> source,
      IObservable<TOther> other)
     {
      return Observable.Defer<IList<TSource>>(() =>
      {
       var list = new List<TSource>();
    
       return
       from completeSignal in source
        .TakeUntil(other)
        .Do(value => { lock (list) list.Add(value); })
        .Select(_ => false)
        .Concat(Observable.Return(true))
       where completeSignal
       select list.AsReadOnly();
      });
     }
     }
    }
    

    - Dave


    http://davesexton.com/blog
    Monday, May 03, 2010 11:42 PM
  • Dave,

    Thanks for the sample code. I certainly agree that there will always be a way to approach such problems by operating directly over the input stream, as you have done. At the same time, I think your solution to this fairly simple use case illustrates just how awkward it currently is to tackle sequence matching problems.

    What is great about parser combinators is that they allow an expression to be built-up that describes the pattern to be matched, similar to how EBNF describes a grammar. In my alerts example Uptick, Downtick, the result of Many() and the expression result are all parser combinators, not IObservables. Like IObservable they are monads, so they have their own SelectMany function. In this case it combines two parsers into a new parser that matches a sequence. So the expression:

    Uptick.SelectMany(_ => Downtick)
    is a new parser that matches an Uptick event followed by a Downtick event. (In this case "SelectMany" is a poor name for the function: "Bind", or Haskell's ">>=" would be a better name).

    Think of it as composing a "regular expression" for events. Once you have built the expression, it can then be applied to an input (an IObservable in this case). In my example, I would do something like:

    IObservable<Delta> msftTicks = GetDeltas(...);
    IObservable<Alert> msftAlerts = alerts(msftTicks);
    msftAlerts.Subscribe(...);
    

    It is the signature of the alerts method (the parser combinator) that I am struggling with.

    If you can read a bit of Haskell, I would recommend a paper co-authored by Erik Meijer on the subject of parser combinators. I have to think Erik has at least thought about the possibility of a "reactive parser combinator" applied to to problem of matching sequences of events. Erik??

    - Steve.

    Tuesday, May 04, 2010 5:39 PM
  • This sounds similar to Incremental Parsers that can parse incomplete inputs and produce ASTs as they go. There are some implementations of Incremental Parsers on Haskell which might provide you with some inspiration.
    Tuesday, May 04, 2010 9:55 PM
  • Hi Steve,

    I have implemented a simple regular-language that supports exact and wildcard matches, repetition of sub-expressions and oring sub-expressions. To make it clear: The implementation is absolute *naive* and probably cannot be used in real world scenarios due to its inefficiency. But at least for simple testing and experimenting it works.

    Here are two examples how to patterns are created:

    static void Main(string[] args)
    {
      var inputSequence = "something abc2xx1yy1aaabcx more here".ToObservable().Slice();
    
      //abc((1|2)..){0,4}abc
      var xs = inputSequence.Match('a').Match('b').Match('c')
        .MatchTimes(x => RegRx.Or(x.Match('1'), x.Match('2')).MatchAny().MatchAny(), 0, 4)
        .Match('a').Match('b').Match('c');
    
      xs.Subscribe(v => Console.WriteLine("Found Match: {0}", new String(v.Item1.ToArray())));
    
      // bc.{0,10}bc
      var ys = inputSequence.Match('b').Match('c').MatchTimes(x => x.MatchAny(), 0, 10).Match('b').Match('c');
      ys.Subscribe(v => Console.WriteLine("Found Match: {0}", new String(v.Item1.ToArray())));
      Console.ReadKey();
    }

     

    The first operator Slice() generates for each incoming symbol a new observable sequence that begins with this symbol.

    If the input sequence would be "abcde" the resulting observables would represent the sequences:

    abcde
    bcde
    cde
    de
    e

    That means from one input sequence are many sub-sequences created each starting at a diffrent position (actually always the input sequence is returned only at different points in time .. but I think seeing it the other way simplifies it a little bit).

    Match()/MatchAny() simply steel/eat one symbol and return the sequence without that symbol. If Match() gets a sequence with a character that does not match it simply calls OnComplete() effectively abandoning that search-thread...

    Oring to sub-Expressions is simply done by Merging().

    The most complicated part is probably the recursive sub-expression matching which I used for the repetition. As I said it is a really naive implementation that can create many new search threads - especially in cases with lower minimum than maximum repetition count ... I have not analyzed the runtime complexity .. but it is surely no good. :-) For short constraint repetitions (lets say up to ten 10) it should of course work.. A problem of the repetition operator is that my implementation does not correctly determine when the sequence is complete (therefore currently they never complete)...

    Here is the code for the used extension methods:

    public static class RegRx
    {
      public static IObservable<Tuple<IEnumerable<T>, IObservable<T>>> Slice<T>(
        this IObservable<T> input
      )
      {
        return input
          .Publish(s => s.StartWith(Scheduler.Immediate, default(T))
            .Select(x => Tuple.Create(Enumerable.Empty<T>(), s)));
      }
    
      private static IObservable<Tuple<IEnumerable<T>, IObservable<T>>> MatchPred<T>(
        this Tuple<IEnumerable<T>, IObservable<T>> source, 
        Func<T, bool> predicate
      )
      {
        return Observable.CreateWithDisposable<Tuple<IEnumerable<T>, IObservable<T>>>(o =>
        {
          bool first = true;
          var subscription = new MutableDisposable();
          subscription.Disposable = source.Item2.Subscribe(n =>
          {
            if (first)
            {
              first = false;
              if (predicate(n))
                o.OnNext(Tuple.Create(source.Item1.Concat(new T[] { n }), source.Item2));
              subscription.Dispose();
              o.OnCompleted();
            }
          }, o.OnError, o.OnCompleted);
          return subscription;
        });
      }
    
      public static IObservable<Tuple<IEnumerable<T>, IObservable<T>>> Match<T>(
        this IObservable<Tuple<IEnumerable<T>, IObservable<T>>> source, T value
      )
      {
        return source.SelectMany(t => t.MatchPred(n => n.Equals(value)));
      }
      
      public static IObservable<Tuple<IEnumerable<T>, IObservable<T>>> MatchAny<T>(
        this IObservable<Tuple<IEnumerable<T>, IObservable<T>>> source
      )
      {
        return source.SelectMany(t => t.MatchPred(n => true));
      }
    
      private static IDisposable MatchTimesOneR<T>(Tuple<IEnumerable<T>, IObservable<T>> source, 
        Func<IObservable<Tuple<IEnumerable<T>, IObservable<T>>>, IObservable<Tuple<IEnumerable<T>, IObservable<T>>>> subClause, 
        int minCount, 
        int maxCount, 
        int matchCount, 
        IObserver<Tuple<IEnumerable<T>, IObservable<T>>> observer
      )
      {
        var output = subClause(Observable.Return(source, Scheduler.Immediate));
    
        if (matchCount == minCount)
          observer.OnNext(source);
    
        return output.Subscribe(n =>
        {
          if (matchCount + 1 >= minCount)
            observer.OnNext(n);
          if (matchCount + 1 < maxCount)
            MatchTimesOneR(n, subClause, minCount, maxCount, matchCount + 1, observer);
        },
        observer.OnError,
        () => { });
      }
    
      private static IObservable<Tuple<IEnumerable<T>, IObservable<T>>> MatchTimesOne<T>(
        Tuple<IEnumerable<T>, IObservable<T>> source, 
        Func<IObservable<Tuple<IEnumerable<T>, IObservable<T>>>, IObservable<Tuple<IEnumerable<T>, IObservable<T>>>> subClause, 
        int minCount, int maxCount)
      {
        return Observable.CreateWithDisposable<Tuple<IEnumerable<T>, IObservable<T>>>(
          o => MatchTimesOneR<T>(source, subClause, minCount, maxCount, 0, o)
        );
      }
    
      public static IObservable<Tuple<IEnumerable<T>, IObservable<T>>> MatchTimes<T>(
        this IObservable<Tuple<IEnumerable<T>, IObservable<T>>> source, 
        Func<IObservable<Tuple<IEnumerable<T>, IObservable<T>>>, IObservable<Tuple<IEnumerable<T>, IObservable<T>>>> subClause, 
        int minCount,
        int maxCount
      )
      {
        return source.SelectMany(s => MatchTimesOne(s, subClause, minCount, maxCount));
      }
    
      public static IObservable<Tuple<IEnumerable<T>, IObservable<T>>> Or<T>(
        params IObservable<Tuple<IEnumerable<T>, IObservable<T>>>[] sources
      )
      {
        return Observable.Merge(sources);
      }
    }

    I did only very basic testing up to now. I have no idea if it really works for more complex patterns. Please let me know when you find problems.

    Andreas

     

    Wednesday, May 05, 2010 11:51 AM
  • Hi Steve,

    You've inspired me to do some research :)  After fiddling around with some test cases yesterday I've actually come up with a working solution based largely on Luke Hoban’s code and Erik's papers (with some struggle; sadly, I don't now Haskell).  The codes been extended quite a bit, adding lots of infrastructure and several new combinators.

    I've created a new MSDN Code Gallery project and will publish it there once I work out a few major performance kinks.

    The project will include the complete .NET 4.0 source code, the binary (.dll) for download, and a sample console app.

    It also includes an interactive version that uses IEnumerable as the sequence of input instead of requiring concrete parser implementations to define "AnyChar".  My general cursor combinator is named: Next.

    Here's a teaser:

    class ObservableArithmeticParser : ObservableTextParser<int>
    {
     private readonly ParseObservable<char, char> Sign;
     private readonly ParseObservable<char, int> Number;
     private readonly ParseObservable<char, int> PaddedNumber;
     private readonly ParseObservable<char, char> Operator;
     private readonly ParseObservable<char, OpTerm> Operation;
    
     public ObservableArithmeticParser()
     {
      Sign = Char('-').Or(Char('+')).WithDefault('+');
    
      Number = Sign.And(Char(char.IsNumber).OneOrMore())
             .Join(value => int.Parse(value, NumberStyles.AllowLeadingSign));
    
      PaddedNumber = InsignificantWhitespace.IgnoreBefore(Number).IgnoreTrailing(InsignificantWhitespace);
    
      Operator = Char('+').Or(Char('-'));
    
      Operation = from left in PaddedNumber
            from op in Operator
            from right in PaddedNumber
            select new OpTerm(left, op, right);
     }
    
     private int Evaluate(OpTerm op)
     {
      switch (op.Operator)
      {
       case '+':
        return op.Left + op.Right;
       case '-':
        return op.Left - op.Right;
       default:
        throw new InvalidOperationException("Invalid operator: " + op.Operator);
      }
     }
    
     protected override ParseObservable<char, int> Start
     {
      get
      {
       return from result in Operation.Peek(Output, Evaluate).OneOrMore()
           from op in result
           select Evaluate(op);
      }
     }
    }

    - Dave


    http://davesexton.com/blog
    Wednesday, May 05, 2010 11:52 AM
  • Hi Dave,

    your code looks very clean. I like the way in which you build together the token-definitions.

    Just one question: Does your parser allow to define ambigious patterns or can it it always unambiguously determine whether the current input matches or not?

    Lets say the pattern to match should be "[.*]" and the input would be "[a[b]a]". The code I posted above tries to find all possible patterns, e.g.:

     

     

    var inputSequence = "[a[b]a]".ToObservable().Slice();
    //[.{0,5}]
    var xs = inputSequence.Match('[').MatchTimes(x => x.MatchAny(), 0, 5).Match(']');
    xs.Subscribe(v => Console.WriteLine("Found Match: {0}", new String(v.Item1.ToArray())));

     

    prints:

     

    Found Match: [b]
    Found Match: [a[b]
    Found Match: [b]a]
    Found Match: [a[b]a]

     

    I am not sure whether that is good or bad .. at least it makes the parse process quite complicated because all possible open cases have to be followed and matched for any incoming symbol. I think I will take the time now to really read through Luke Hoban's blog entry...

    Andreas

     

    Wednesday, May 05, 2010 2:25 PM
  • Hi Andreas,

    Wow, we posted nearly at the same time :)

    You pose a good question.  It might be possible somehow to describe ambiguous (recursive?) patterns with backtracking although I haven't researched it yet.

    I just whipped up an example to see if I could get it to work but I had no luck.  To be honest, even though I wrote these combinators I'm not sure I know how to use them properly yet ;)

    Here's the subscribing code that I executed for my test:

    using (
     "[a[b]a]"
     .ToObservable(Scheduler.ThreadPool)
     .Parse(new ObservableGroupParser())
     .Subscribe(Console.WriteLine, () => Console.WriteLine("Done")))
    {
     Console.ReadKey();
    }

    It takes a string and converts it to an IObservable<char> (this is possible since String implements IEnumerable<char>).  Then I called the Parse combinator and passed it a specific parser implementation.  That returns an IObservable<T>, where T depends upon the type of the specified parser class.  Last, a typical observable subscription is made.

    Here's the ObservableGroupParser implementation, in its entirety:

    class ObservableGroupParser : ObservableTextParser<string>
    {
     protected override ParseObservable<char, string> Start
     {
     get
     {
      return from results in Groups
        from grp in results
        select grp;
     }
     }
    
     private readonly ParseObservable<char, char> OpenBracket;
     private readonly ParseObservable<char, char> CloseBracket;
     private readonly ParseObservable<char, string> Word;
     private readonly ParseObservable<char, string> Group;
     private readonly ParseObservable<char, string> RecursiveGroup;
     private readonly ParseObservable<char, IObservable<string>> Groups;
    
     public ObservableGroupParser()
     {
     OpenBracket = Char('[');
     CloseBracket = Char(']');
     Word = Char(char.IsLetter).OneOrMore().Join();
     Group = OpenBracket.Group(Word, CloseBracket);
     RecursiveGroup = OpenBracket.Group(Group.Or(Word).OneOrMore(), CloseBracket);
     Groups = RecursiveGroup.OneOrMore();
     }
    }

    And here's the output:

    a
    b
    a
    Done

    Pretty neat right?  It was actually a lot of fun writing the combinators.  The ones being used here are OneOrMore, Join, Group and Or.  There are several others.  The Char method is actually defined on the base class - there are a few of those as well that aren't actually combinators but they return the parser delegate so they may be used when defining rules.

    However, it seems that backtracking isn't a supported operation yet.  Hopefully it's possible and I'm just missing a few combinators.

    Steve, here's the ParseObservable delegate that I'm using for my library:

    public delegate IObservable<ParseObservableResult<TSource, TResult>>
     ParseObservable<TSource, TResult>(IObservable<TSource> source);

    Something else that might be of interest: I've implemented a Peek combinator that allows the sequence to be explicitly partitioned so that it may be observed mid-parse.  Its still got some bugs though (related to infrastructure bugs - Peek just makes them visible).

    Here's the required modification if we want to observe recursive groups as each is parsed.  I simply added the Peek combinator to the existing RecursiveGroup rule and passed it a reference to the base class's Output channel.  (Obviously, it's a Subject<T> :)

    RecursiveGroup = 
     OpenBracket.Group(
     Group.Or(Word).Peek(Output, value => "[" + value + "]").OneOrMore(), 
     CloseBracket);

    And here are the current (buggy) results:

    [a]
    [b]
    [a]
    [a]
    [b]
    [a]
    [a]
    a
    b
    a
    Done

    Note that the subscription doesn't have to change at all; it simply receives values as they are parsed, and still receives the final results (I'm also working on a Push operator that consumes while peeking).  Optionally, the final results can be hidden with Peek by using Take(0) on the outer query.

    It's that extra repetition (show above) that's causing the performance issue and the weird Peek behavior.  If I can't figure something about by tonight I'll probably just submit what I've got for now.  If you, or anybody else, would like to contribute to the library please let me know.

    - Dave


    http://davesexton.com/blog
    Wednesday, May 05, 2010 5:15 PM
  • Dave, Andreas,

    I'm glad I was able to pique your interest! I'm amazed at the progress you've been able to make in such a short time!

    Dave, I'm interesting in your implementation if OneOrMore as this was an area I was struggling with. The tricky bit was how to avoid emitting the "rest" of the input until all the matching results are produced.

    I like the idea of the Peek operator to emit intermediate results. In practice many "grammars" will include a top level OneOrMore (like your example) and it is usually desirable to emit each result as it is matched.

    One thing I've been playing with is to remove the requirement for backtracking by performing a breadth-first (rather than depth-first) search of all alternatives. Basically in the definition of Or you feed the input to both parser alternatives and observe the result from both simultaneously using something like the Amb method - i.e. whichever one returns a positive result first wins. Perhaps there is a way to handle ambiguous patterns in this manner?

    - Steve

    Wednesday, May 05, 2010 9:00 PM
  • Hi Steve,

    Actually I used Amb as my first attempt but that quickly fell apart for me.  The issue was actually related to OneOrMore, so I guess it's no coincidence that you're having a similar issue.  Here's my Or operator as it stands now:

    public static ParseObservable<TSource, TResult> Or<TSource, TResult>(
     this ParseObservable<TSource, TResult> parser,
     ParseObservable<TSource, TResult> nextParser)
    {
     Contract.Requires(parser != null);
     Contract.Requires(nextParser != null);
     Contract.Ensures(Contract.Result<ParseObservable<TSource, TResult>>() != null);
    
     // The semantics of Amb are slightly different than what is required; e.g., NoneOrMore uses parser.Or(Success)
     // to return success when parser fails, but Amb will always see Success first and the parser will not be executed.
    
     return source => Observable.CreateWithDisposable<ParseObservableResult<TSource, TResult>>(
      observer =>
      {
       var disposables = new CompositeDisposable();
       bool usingLeft = false, usingInnerSubscription = false;
    
       // This is a weird one, but there are two possible ways that the observable can be short-circuited;
       // thus, Or needs to account for both. The first possible situation is that the source observable
       // has no more values, in which case OnCompleted will be called. In that case, nextParser must 
       // still get a chance to examine the source - it could conceivably return a result even on an empty
       // source; e.g., the ZeroOrMore operator returns SuccessMany(). The second possibility is when source 
       // does have another value but the left parser completes without any values. This is typically how 
       // the Or operator is envisioned - in this case it switches to nextParser, much like Observable.Concat.
    
       // The Next() combinator uses Replay(...) so Take(1) acts like a peek function in all uses below.
    
       disposables.Add(source.Take(1).Subscribe(
        value =>
        {
         var innerSource = parser(source);
    
         usingInnerSubscription = true;
    
         disposables.Add(innerSource.Take(1).Subscribe(
          leftValue =>
          {
           usingLeft = true;
           disposables.Add(innerSource.Subscribe(observer));
          },
          ex => observer.OnError(ex),
          () =>
          {
           if (!usingLeft)
            disposables.Add(nextParser(source).Subscribe(observer));
          }));
        },
        ex => observer.OnError(ex),
        () =>
        {
         if (!usingLeft && !usingInnerSubscription)
          disposables.Add(nextParser(source).Subscribe(observer));
        }));
    
       return disposables;
      });
    }

    And here are all of my OneOrMore / NoneOrMore combaintor overloads.  Note that they support parsing out separators as well, based on an idea I found in one of Erik's papers.

    public static ParseObservable<TSource, IObservable<TResult>> NoneOrMore<TSource, TResult>(
     this ParseObservable<TSource, TResult> parser)
    {
     Contract.Requires(parser != null);
     Contract.Ensures(Contract.Result<ParseObservable<TSource, IObservable<TResult>>>() != null);
    
     return parser.OneOrMore().Or(SuccessMany<TSource, TResult>());
    }
    
    public static ParseObservable<TSource, IObservable<TResult>> NoneOrMore<TSource, TResult>(
     this ParseObservable<TSource, TResult> parser,
     ParseObservable<TSource, TResult> separator)
    {
     Contract.Requires(parser != null);
     Contract.Requires(separator != null);
     Contract.Ensures(Contract.Result<ParseObservable<TSource, IObservable<TResult>>>() != null);
    
     return parser.OneOrMore(separator).Or(SuccessMany<TSource, TResult>());
    }
    
    public static ParseObservable<TSource, IObservable<TResult>> OneOrMore<TSource, TResult>(
     this ParseObservable<TSource, TResult> parser)
    {
     Contract.Requires(parser != null);
     Contract.Ensures(Contract.Result<ParseObservable<TSource, IObservable<TResult>>>() != null);
    
     return
      from first in parser
      from remainder in parser.NoneOrMore()
      select Concat(first, remainder);
    }
    
    public static ParseObservable<TSource, IObservable<TResult>> OneOrMore<TSource, TResult>(
     this ParseObservable<TSource, TResult> parser,
     ParseObservable<TSource, TResult> separator)
    {
     Contract.Requires(parser != null);
     Contract.Requires(separator != null);
     Contract.Ensures(Contract.Result<ParseObservable<TSource, IObservable<TResult>>>() != null);
    
     return
      from first in parser
      from _ in separator
      from remainder in parser.NoneOrMore()
      select Concat(first, remainder);
    }

    As for the current repetition and performance issues, I had the same problem with my first attempt at writing the interactive version of the parsers using IEnumerable, based on Luke's code and Erik's papers.  That was actually the first thing I did before writing the reactive version.  (I'll be including the interactive version in the library when I deploy to my MSDN Code Gallery project.)

    The solution for the interactive version was to block each operator by returning ICollection instead of IEnumerable.  The performance gain was tremendous.  I suspect this may cause a problem with backtracking though, but so far I didn't need it - the MiniML sample and a few custom ones work great now and are much faster than before.

    I tried to avoid doing something like this with the reactive version, but I guess I have no choice now.  I'll try something things out and publish the results ASAP regardless of whether I solve the current problems.  I'm sure you guys will help if I can't figure it out myself :)

    - Dave


    http://davesexton.com/blog
    Thursday, May 06, 2010 2:08 AM
  • Hi guys,

    Rx Parsers is live:
    http://code.msdn.microsoft.com/RxParsers

    Side note: I mentioned on the home page that it's an F-coalgebra implementation.  I'm trying to sound smart ;)  Actually, I'm pretty sure that it is, but maybe Erik can comment on that?

    I fixed all of the issues that I mentioned above, including several others.  Although, I wouldn't say that it's "stable" yet.  Still requires lots of testing.  Also requires some real-world use cases to see how well the theory will hold up - simply because I don't fully understand it yet.

    A few recommendations if you're interested:

    1. Download the source code and check out the labs.  In particular, the ObservableParseLiteLab class was really helpful for me to understand the complexity of the queries and how all of the combinators fit together at runtime - I solved several bugs using this tiny lab.  Also check out ObservableArithmeticLab - I think you'll find that one quite interesting :)
    2. Play around with the Peek and Push combinators.  They're optional, but they allow reactive queries to produce output mid-stream.  ObservableGroupLab provides a working example.
    3. Write your own combinators.  It's tons of fun.

    - Dave


    http://davesexton.com/blog
    Friday, May 07, 2010 6:12 PM
  • Dave,

    Fantastic work! I'm going to spend some time this week seeing if I can adapt your project to my situation.

    BTW, while samples illustrating the parsing of a grammar (e.g. MiniML) are instructive, they aren't very realistic. Typically all the input is available at the start of such an exercise. I think an example showing how the same combinators could be used to find and react to patterns in an event stream would be more illustrative of the power of the approach.

    When I have further comments, I'll add them to the project discussion board.

    Thanks again.

    Steve.

    Monday, May 10, 2010 12:44 PM
  • Hi Steve,

    Thanks for the feedback.

    Did you check out ObservableArithmeticLab?  It shows how to write a parser that reacts to a stream of lazy expressions.  Is this what you had in mind?

    If you have a better real-word example please let me know.  I'd like to include a lab for it.

    Next release: I've fixed a few bugs already and also added several new operators.  I just finished writing a basic XML grammar for an interactive parser and it seems to be working properly.  Next step, a context-sensitive grammar (XML schema), and finally the reactive XML versions of each.  I'm also working on how to control ambiguity.  I discovered that in my XML parser, if I simply call OneOrMore() twice in a row on the element rule, then I get an ambiguous XML parser - it outputs every possible combination of nested elements!  I've added some new operators that I might call Ambiguous and AmbigousMaybe, but I'm still testing to see exactly how these things should work.

    - Dave


    http://davesexton.com/blog
    Monday, May 10, 2010 1:16 PM
  • Dave,

    The arithmetic lab is also about parsing strings. I am thinking that the library can be using in places where the word "parsing" doesn't really apply. Any pattern of events where the phrase "followed by" is used can be matched by a combinator. My "stock ticker" use case would be an example. Another might be looking for patterns in mouse gestures. Say for instance we'd like to react to a user drawing an approximate square (maybe we'll replace it with a real square, or select a region, etc.). So we'd like to match a pattern of:

    from u in OneOrMore(MouseUp)
    from r in OneOrMore(MouseRight)
    from d in OneOrMore(MouseDown)
    from l in OneOrMore(MouseLeft)
    where <test limits of u,r,d,l to roughly match a square>
    select new SquareGesture { TopLeft = ..., BottomRight = ... };
    

    This is a very rough sketch. But imagine how you might write this logic without a combinator approach and you should have some idea of how much more succinct this approach could be.

    Monday, May 10, 2010 2:12 PM
  • Hi Steve,

    Hmm, but I still think Observable.Join is suitable for your last example.

    Edit: The stock ticker idea is something to follow-up on though - I'll see if I can come up with a solution using Rx Parsers.

    - Dave


    http://davesexton.com/blog
    • Edited by Dave Sexton Monday, May 10, 2010 2:28 PM Stock ticker comments
    Monday, May 10, 2010 2:27 PM
  • Hi Steve,

    Nevermind, you're correct.  It's the cardinality (OneOrMore) that makes it difficult to express your square gesture query, even using Joins, as I understand them.

    - Dave


    http://davesexton.com/blog
    Monday, May 10, 2010 3:10 PM
  • Hi everyone,

    I've just uploaded the next release of Rx Parsers v1.1.  It provides several bug fixes and lots of new combinators, features and labs.  (Release Notes

    Andreas, I've added a basic ambiguity combinator; e.g., a simple grammar that matches two consecutive characters with an input of "abcd" yields "ab", "bc" and "cd".  There are reactive and iteractive labs for this.  I've started working on the more advanced group-delimiter-counting-ambiguity-combinators for the next release.  ;)

    Steve, I've recreated my stock ticker implementation from this thread as an Rx Parser lab.  It uses the new in-line parser feature, as you can see from the code below.  Please let me know if this is what you had in mind.

    static void Main()
    {
      var random = new Random();
    
      IObservable<int> values = Observable
        .Interval(TimeSpan.FromSeconds(1))
        .Select(_ => random.Next(1, 50));
    
      IConnectableObservable<StockTick> ticks = values
        .Scan(StockTick.Empty, (acc, cur) => new StockTick(value: cur, change: cur - acc.Value))
        .Publish();
    
      IObservable<StockAlert> alerts = ticks.Parse(parser => 
        from next in parser
        let ups = next.Where(tick => tick.Change > 0)
        let downs = next.Where(tick => tick.Change < 0)
        select (from up in ups.AtLeast(2)
             from down in downs
             where down.Change <= -11
             select new StockAlert(up.ToEnumerable(), down))
             .Or
             (from down in downs.AtLeast(2)
             from up in ups
             where up.Change >= 21
             select new StockAlert(down.ToEnumerable(), up)));
    
      Console.WriteLine("Press any key to stop...");
      Console.WriteLine();
    
      using (ticks.Subscribe(WriteTick))
      using (alerts.Subscribe(WriteAlert))
      using (ticks.Connect())
      {
        Console.ReadKey();
        Console.ResetColor();
      }
    }

    - Dave


    http://davesexton.com/blog
    • Edited by Dave Sexton Friday, May 21, 2010 2:04 AM Fixed code formatting
    Friday, May 21, 2010 1:58 AM