none
Counting non-specific enter/exit

    Question

  • Hi,

     

    I am trying utilize Rx for counting the number of people in a room. We don't know the exact person entering or leaving, we just know that 'a' person entered or left. The challenge is also that we are not always counting accurate. In the light of that we should:

    1) automatically assume a person has left the room after a given time out period.

    2) Ignore a person exiting when we think the room is empty (no negative number of people).

    SO I set up two Subjects (for testing) named PersonInEvents and PersonOutEvents (Subject<Unit>). TimeOut is the time out (in ms) after which we assume a person exited. I got up to the following code.

    I am sure this not the prettiest ever (I am only just starting with Rx and it takes a little time to get used to it :)). The one problem I have left is that I really only want the PersonOutEvents to release at most one waiting customer. Right now, if 2 people are in the room, one person leaving causes the observed number of people to be zero. I know this is due to the event being sent to all subscribers, which is the expected behavior. But how do you ensure that only one subscriber receives the event? I played around with Take(1).Prune(), but that didn't give me the desired result.

     

     var exit = Observable.CreateWithDisposable<int>(o => {
    
        return PersonInEvents.Subscribe(x => {
    
         
    
         Observable.Range(-1, 1).Delay(new TimeSpan(0,0,0,0,TimeOut)).Select(y=>new Unit()).Amb(PersonOutEvents).Do(y => { o.OnNext(-1); }).Subscribe();
    
         
    
        });
    
         });
    
       
    
       return PersonInEvents.Select(x => 1).Merge(exit).Scan(0, (acum, item) => acum + item);
    
    
    Wednesday, March 16, 2011 5:01 AM

All replies

  • Hi,

    This is an interesting problem.  It seems like you have to build a SelectMany query dynamically as people are observed moving in and out of the room.  Not sure if there's a simpler way.

    Here's an example using a stack.  Be careful about scheduling though because I didn't consider it.

    return Observable.Defer(() =>
    {
    	var assumeLeft = Observable
    		.Timer(TimeSpan.FromMilliseconds(TimeOut))
    		.Select(_ => new Unit());
    
    	var whenLeft = assumeLeft.Amb(PersonOutEvents.Take(1));
    
    	var waitUntilLeft = new Stack<IObservable<Unit>>();
    
    	waitUntilLeft.Push(whenLeft);
    
    	return
    		(from _ in PersonInEvents
    			from __ in waitUntilLeft.Peek().Take(1)
    			select -1)
    		.Merge(PersonInEvents.Select(_ => 1))
    		.Scan(0, (count, direction) =>
    		{
    			if (direction > 0)
    			{
    				waitUntilLeft.Push(waitUntilLeft.Peek().SelectMany(whenLeft));
    			}
    			else if (count > 0)
    			{
    				waitUntilLeft.Pop();
    			}
    
    			return count == 0 && direction < 0
    				? 0
    				: count + direction;
    		});
    });
    

    - Dave


    http://davesexton.com/blog
    • Edited by Dave Sexton Wednesday, March 16, 2011 9:32 AM Removed Do operators
    Wednesday, March 16, 2011 9:31 AM
  • Hi,

     

    Another approach is to create a chain of dependencies, as each person cannot leave the room until the previous person has left the room, so if you wait for the previous person to leave, you can then start listening for the exit event for the next person. The code below shows an example:

    var testScheduler = new TestScheduler();
    var scheduler = testScheduler;
    
    var inEvents = scheduler.CreateHotObservable(
                    new Recorded<Notification<Unit>>(201, new Notification<Unit>.OnNext(new Unit())),
                    new Recorded<Notification<Unit>>(205, new Notification<Unit>.OnNext(new Unit())),
                    new Recorded<Notification<Unit>>(210, new Notification<Unit>.OnNext(new Unit())),
                    new Recorded<Notification<Unit>>(213, new Notification<Unit>.OnNext(new Unit())),
                    new Recorded<Notification<Unit>>(220, new Notification<Unit>.OnNext(new Unit())),
                    new Recorded<Notification<Unit>>(221, new Notification<Unit>.OnCompleted()));
    
    var outEvents = scheduler.CreateHotObservable(
                    new Recorded<Notification<Unit>>(202, new Notification<Unit>.OnNext(new Unit())),
                    new Recorded<Notification<Unit>>(203, new Notification<Unit>.OnNext(new Unit())),
                    new Recorded<Notification<Unit>>(206, new Notification<Unit>.OnNext(new Unit())),
                    new Recorded<Notification<Unit>>(217, new Notification<Unit>.OnNext(new Unit())),
                    new Recorded<Notification<Unit>>(224, new Notification<Unit>.OnCompleted()));
    
    var timeout = TimeSpan.FromTicks(5);
    
    // Make sure inEvents and outEvents are hot, publish if needed
    var timedOrderedOuts = 
        inEvents.Timestamp(scheduler).Scan(Observable.Return(new Unit()),
            (a, ev) => a.Concat(outEvents.Timeout(ev.Timestamp + timeout, Observable.Return(new Unit()), scheduler))
                        .Skip(1) // Ignore output from previous person
                        .Take(1) // Take first timeout/out event
                        .Prune().RefCount() // Allow use by next person and output
                        ).Merge();
    
    var final =
        inEvents.Select(i => 1)
                .Merge(timedOrderedOuts.Select(i => -1))
                .Scan(0, (a, i) => a + i >= 0 ? a + i : 0) // May not need conditional here
                .Concat(Observable.Return(0)); // For final OnCompleted of out events
                    
    var results = testScheduler.Run(() => final, 100, 200, 1000).ToList();
    
    var expectedResults = new[] { 
                new Recorded<Notification<int>>(201, new Notification<int>.OnNext(1)),
                new Recorded<Notification<int>>(202, new Notification<int>.OnNext(0)),
                new Recorded<Notification<int>>(205, new Notification<int>.OnNext(1)),
                new Recorded<Notification<int>>(206, new Notification<int>.OnNext(0)),
                new Recorded<Notification<int>>(210, new Notification<int>.OnNext(1)),
                new Recorded<Notification<int>>(213, new Notification<int>.OnNext(2)),
                new Recorded<Notification<int>>(215, new Notification<int>.OnNext(1)),
                new Recorded<Notification<int>>(217, new Notification<int>.OnNext(0)),
                new Recorded<Notification<int>>(220, new Notification<int>.OnNext(1)),
                new Recorded<Notification<int>>(224, new Notification<int>.OnNext(0)),
                new Recorded<Notification<int>>(224, new Notification<int>.OnCompleted())}.ToList();
    
    Assert.AreEqual(expectedResults, results);
    
    

    As Dave pointed out, you will need to be careful about synchronization of in & out events, otherwise you will get difficult to debug race conditions.

     

    Yours

    Martin

    Wednesday, March 16, 2011 12:14 PM
  • Wow ....

     

    You guys rock! I obviously still have a lot to learn about Rx :). Note to self is learning about Concat, Prune and RefCount. Again thanks, without a great user community like this, the learning curve would probably be too steep for most.

    Wednesday, March 16, 2011 7:23 PM
  • There are several blog posts out there now that should help you to grok all the Rx info out there.

    I have a series out there

    http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html

    Bart DeSmet (on the team that Writes Rx) has a bunch of great posts out there. I would read the "More LINQ with System.Interactive..." blog post found here

    http://community.bartdesmet.net/blogs/bart/archive/tags/Rx/default.aspx?PageIndex=2

    http://community.bartdesmet.net/blogs/bart/archive/tags/Rx/default.aspx

    and then when you are feeling brave hit this post

    http://community.bartdesmet.net/blogs/bart/archive/2010/01/01/the-essence-of-linq-minlinq.aspx

    Once you have read all of that, you will be a complete Linq and Rx ninja.

     

    Hope that helps

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com
    Saturday, March 26, 2011 12:56 PM