none
Looking for an operator like CombineLatest but which waits for both elements RRS feed

  • Question

  • I'm looking for an operator. I know that operators are generic but I am mentioning the types I use in both cases so my use case will be better understood.

    First, a rough marble diagram:

    I have a stream of messages to send and a stream of sockets to use to send them with. I want to combine them such that I get a stream with a combination of a message and a socket. Other code is responsible for establishing the sockets and putting them in the stream of sockets when they are connected.

    Even though I'm initializing the socket first, I can't control when it'll connect successfully, so there may be messages sent to the message stream. I want the operator to wait for a socket to be sent to the socket stream until the combination happens - A1 and B1 only happens when 1 arrives. I also want to be able to somehow handle exceptions and both put the socket stream in the state of knowing it hasn't gotten a working socket (like in the beginning) and if possible resubmit the message at the same position (when socket 1 breaks, D won't be accepted until socket 2 is connected). The X in the marble diagram doesn't mean the socket stream's OnError but that the socket went bad, and probably will be replaced by a null value in the socket stream by the same code outside of the operator that will be responsible for opening a new socket and adding it to the stream.

    If I use CombineLatest, it would have reported [A null, B null]. Most of the joins and windowing I've investigated only works with time, while I want to work either with a predicate or by telling it explicitly when to move on. I have an inkling that Switch and a stream of stream would work somehow, but I don't know how exactly.

    I think I'm going to have to write this operator myself.


    Tuesday, October 1, 2013 2:30 PM

Answers

  • Hi,

    You can still use a simple Join query, you'll just need to adjust the message duration so that a given message is paired with every socket until another observable notifies that it was sent successfully.  The other observable can be accepted as a parameter.  For instance, the caller can pass in a deferred observable (shown below) or a subject, if necessary.

    Example

    using System;
    using System.Reactive;
    using System.Reactive.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace RxLabs.Net45
    {
        class SocketMessageLab
        {
            internal static void Main()
            {
                var messages = Observable
                    .Interval(TimeSpan.FromSeconds(1))
                    .Scan((char)('A' - 1), (acc, i) => (char)((int)acc + 1))
                    .Take(5)
                    .Do(Console.WriteLine);
    
                var failureDuringSendExample = new CancellationTokenSource();
    
                var sockets = Observable.Create<bool>(
                    async observer =>
                    {
                        var available = false;
    
                        await Task.Delay(TimeSpan.FromSeconds(1.5));
    
                        for (var i = 0; i < 4; i++)
                        {
                            try
                            {
                                await Task.Delay(TimeSpan.FromSeconds(1), failureDuringSendExample.Token);
                            }
                            catch (OperationCanceledException)
                            {
                                failureDuringSendExample.Dispose();
                                failureDuringSendExample = new CancellationTokenSource();
                            }
    
                            available = !available;
                            observer.OnNext(available);
                        }
                    })
                    .Scan(
                        new { Socket = 0, HasSocket = false },
                        (acc, available) => !available
                                          ? new { Socket = acc.Socket, HasSocket = false }
                                          : new { Socket = acc.Socket + 1, HasSocket = true })
                    .Select(a => a.HasSocket ? (int?)a.Socket : null)
                    .Do(socket => Console.WriteLine(socket.HasValue ? socket.ToString() : "X"));
    
                IObservable<char> messagesSent = null;
    
                var send = Pair(messages, Observable.Defer(() => messagesSent), sockets)
                    .SelectMany(async (pair, cancel) =>
                    {
                        var socket = pair.Item1;
                        var message = pair.Item2;
    
                        // TODO: socket.SendAsync(message, cancel)
                        await Task.Delay(TimeSpan.FromSeconds(.2), cancel);
    
                        // Simulate failure
                        if (message == 'C' && socket == 1)
                        {
                            // You'll need a way to push null into the sockets sequence.
                            failureDuringSendExample.Cancel();
    
                            return Tuple.Create((int?)null, message);
                        }
                        else
                        {
                            return pair;
                        }
                    })
                    .Publish();
    
                messagesSent = from pair in send
                               where pair.Item1.HasValue
                               select pair.Item2;
    
                using (send.TimeInterval().Subscribe(WriteToConsole))
                using (send.Connect())
                {
                    Console.ReadLine();
                }
            }
    
            static void WriteToConsole(TimeInterval<Tuple<int?, char>> item)
            {
                var socket = item.Value.Item1;
                var message = item.Value.Item2;
    
                Console.WriteLine("{0}: {1}{2}", item.Interval, message, socket.HasValue ? socket.ToString() : "-Failed to send");
            }
    
            static IObservable<Tuple<int?, char>> Pair(IObservable<char> messagesIn, IObservable<char> messagesOut, IObservable<int?> sockets)
            {
                return sockets.Publish(publishedSockets =>
                {
                    var socketsOpened = (from socket in publishedSockets
                                         where socket.HasValue
                                         select socket);
    
                    var socketsClosed = (from socket in publishedSockets
                                         where !socket.HasValue
                                         select Unit.Default);
    
                    return from socket in socketsOpened
                           join message in messagesIn
                           on socketsClosed
                           equals messagesOut.Where(m => m == message)
                           select Tuple.Create(socket, message);
                });
            }
        }
    }

     

    Output

    A
    B
    1
    00:00:02.9907968: A1
    00:00:00.0369974: B1
    C
    X
    00:00:00.3774850: C-Failed to send
    D
    2
    00:00:01.2130269: C2
    00:00:00.0009348: D2
    E
    00:00:00.7946871: E2

     

    - Dave


    http://davesexton.com/blog

    • Marked as answer by JesperTreetop Wednesday, October 2, 2013 9:44 PM
    Wednesday, October 2, 2013 5:14 PM

All replies

  • I ended up implementing it by hand, but with code that clearly "fell off" most Rx abstractions instead of prolonging them, so not an operator. There's probably a better idiomatic solution with streams of streams and if anyone has one handy I'd be happy to take a look, but the problem is reasonably solved for now.
    Tuesday, October 1, 2013 4:43 PM
  • Hi,

    It seems like you need to join by coincidence.  Try the Join operator, as shown in the following example.

    Note that, conceptually, CombineLatest is a specialization of joining by coincidence where the duration of each side of the join is self-defined; e.g.,

    var combineLatest = from a in A join b in B on A equals B

    So it's no coincidence that you tried using CombineLatest first, though Join provides the flexibility that you actually need.

    Example

    using System;
    using System.Reactive;
    using System.Reactive.Linq;
    
    namespace RxLabs.Net45
    {
        class SocketMessageLab
        {
            internal static void Main()
            {
                var messages = Observable
                    .Interval(TimeSpan.FromSeconds(1))
                    .Scan((char)('A' - 1), (acc, i) => (char)((int)acc + 1))
                    .Take(5)
                    .Do(Console.WriteLine);
    
                var sockets = Observable
                    .Timer(TimeSpan.FromSeconds(2.5))
                    .Concat(Observable
                    .Return(-1L)
                    .Delay(TimeSpan.FromSeconds(1)))
                    .Concat(Observable
                    .Timer(TimeSpan.FromSeconds(1)))
                    .Concat(Observable
                    .Return(-1L)
                    .Delay(TimeSpan.FromSeconds(1)))
                    .Scan(
                        new { Socket = 0, HasSocket = false },
                        (acc, i) => i == -1
                                  ? new { Socket = acc.Socket, HasSocket = false }
                                  : new { Socket = acc.Socket + 1, HasSocket = true })
                    .Select(a => a.HasSocket ? (int?)a.Socket : null)
                    .Do(socket => Console.WriteLine(socket.HasValue ? socket.ToString() : "X"));
    
                var query =
                    sockets.Publish(publishedSockets =>
                        {
                            var socketClosed = (from socket in publishedSockets
                                                where !socket.HasValue
                                                select Unit.Default);
    
                            return from socket in publishedSockets.Where(s => s.HasValue)
                                   join message in messages
                                   on socketClosed
                                   equals socketClosed
                                   select new { socket, message };
                        });
    
                using (query.TimeInterval().Subscribe(
                    item => Console.WriteLine("{0}: {1}{2}", item.Interval, item.Value.message, item.Value.socket)))
                {
                    Console.ReadLine();
                }
            }
        }
    }

    Output

    A
    B
    1
    00:00:02.8623601: A1
    00:00:00.0039662: B1
    C
    00:00:00.3795299: C1
    X
    D
    2
    00:00:01.7433086: D2
    E
    00:00:00.2808486: E2
    X

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Tuesday, October 1, 2013 5:47 PM Improved join query
    Tuesday, October 1, 2013 5:12 PM
  • Hi,

    I just updated the join query slightly to avoid a subtle race condition.  As a result, the query is also simpler.

    - Dave


    http://davesexton.com/blog

    Tuesday, October 1, 2013 5:48 PM
  • Interesting. I'll take a look. Thanks.

    Two things:

    1. The Join extension method I found in IntelliSense and the one you're linking to needed me to provide time. Where does it pick up that in the query comprehension?
      Edit:
      Scratch that, the parameters confused me, it's just the join key selectors, the on X equals Y as usual. I thought I had to pass a lot more data than the usual.
    2. In the partially imperative code I wrote, I was able to write it so that if the selector/action threw an exception, the socket was marked as dead and message was held to be picked up again when a new socket came along. I know that I probably didn't mention it in the post and that when you've gotten an item from a subscription you obviously can't retract it, but is there a way of making the same sort of thing happen with the query (i.e. future enumeration/subscription paused and the next time around the same message being there again - as the next item and not as the last)?
      To be clear - if the socket goes away in the middle of trying to send C1, I'd want for C to stick around for socket 2 and then appear as C2. That doesn't sound very pigeon-hole compliant (C magically reappearing out-of-order after being processed), so I'm not sure whether it's palatable to put in the form of a query.

    Actually, 2 sounds rather confusing still. Here's the way I solved it, falling off Rx as I did:

    public static void PairUpAndSend<TMessage, TSocket>( this IObservable<TMessage> messageObservable, IObservable<TSocket> connectedSocketsObservable, Action<TMessage, TSocket> sender, Action markCurrentSocketDead = null) where TSocket : class { // markCurrentSocketDead is filled in at call site // with something that just makes sure the next item // in connectedSocketsObservable will be null and // that a new socket will be initiated and provided markCurrentSocketDead = markCurrentSocketDead ?? new Action(() => { }); TSocket recentSocket = null; List<TMessage> piledUp = new List<TMessage>(); connectedSocketsObservable.Subscribe(socket => { if (recentSocket == socket) return; recentSocket = socket; foreach (var message in piledUp.ToArray()) { try { sender(message, socket); piledUp.Remove(message); } catch (SpecificSocketFailureException ex) { markCurrentSocketDead(); recentSocket = null; // logging the error elided break; } } }); messageObservable.Subscribe(m => { if (recentSocket == null) { piledUp.Add(m); } else { try { sender(m, recentSocket); } catch (SpecificSocketFailureException ex) { markCurrentSocketDead(); recentSocket = null; // logging the error elided piledUp.Add(m); } } }); }

    There are any number of things I don't like about this solution, like recentSocket being able possibly to get out of sync with the connectedSocketObservable.

    Tuesday, October 1, 2013 7:29 PM
  • Hi,

    You can still use a simple Join query, you'll just need to adjust the message duration so that a given message is paired with every socket until another observable notifies that it was sent successfully.  The other observable can be accepted as a parameter.  For instance, the caller can pass in a deferred observable (shown below) or a subject, if necessary.

    Example

    using System;
    using System.Reactive;
    using System.Reactive.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace RxLabs.Net45
    {
        class SocketMessageLab
        {
            internal static void Main()
            {
                var messages = Observable
                    .Interval(TimeSpan.FromSeconds(1))
                    .Scan((char)('A' - 1), (acc, i) => (char)((int)acc + 1))
                    .Take(5)
                    .Do(Console.WriteLine);
    
                var failureDuringSendExample = new CancellationTokenSource();
    
                var sockets = Observable.Create<bool>(
                    async observer =>
                    {
                        var available = false;
    
                        await Task.Delay(TimeSpan.FromSeconds(1.5));
    
                        for (var i = 0; i < 4; i++)
                        {
                            try
                            {
                                await Task.Delay(TimeSpan.FromSeconds(1), failureDuringSendExample.Token);
                            }
                            catch (OperationCanceledException)
                            {
                                failureDuringSendExample.Dispose();
                                failureDuringSendExample = new CancellationTokenSource();
                            }
    
                            available = !available;
                            observer.OnNext(available);
                        }
                    })
                    .Scan(
                        new { Socket = 0, HasSocket = false },
                        (acc, available) => !available
                                          ? new { Socket = acc.Socket, HasSocket = false }
                                          : new { Socket = acc.Socket + 1, HasSocket = true })
                    .Select(a => a.HasSocket ? (int?)a.Socket : null)
                    .Do(socket => Console.WriteLine(socket.HasValue ? socket.ToString() : "X"));
    
                IObservable<char> messagesSent = null;
    
                var send = Pair(messages, Observable.Defer(() => messagesSent), sockets)
                    .SelectMany(async (pair, cancel) =>
                    {
                        var socket = pair.Item1;
                        var message = pair.Item2;
    
                        // TODO: socket.SendAsync(message, cancel)
                        await Task.Delay(TimeSpan.FromSeconds(.2), cancel);
    
                        // Simulate failure
                        if (message == 'C' && socket == 1)
                        {
                            // You'll need a way to push null into the sockets sequence.
                            failureDuringSendExample.Cancel();
    
                            return Tuple.Create((int?)null, message);
                        }
                        else
                        {
                            return pair;
                        }
                    })
                    .Publish();
    
                messagesSent = from pair in send
                               where pair.Item1.HasValue
                               select pair.Item2;
    
                using (send.TimeInterval().Subscribe(WriteToConsole))
                using (send.Connect())
                {
                    Console.ReadLine();
                }
            }
    
            static void WriteToConsole(TimeInterval<Tuple<int?, char>> item)
            {
                var socket = item.Value.Item1;
                var message = item.Value.Item2;
    
                Console.WriteLine("{0}: {1}{2}", item.Interval, message, socket.HasValue ? socket.ToString() : "-Failed to send");
            }
    
            static IObservable<Tuple<int?, char>> Pair(IObservable<char> messagesIn, IObservable<char> messagesOut, IObservable<int?> sockets)
            {
                return sockets.Publish(publishedSockets =>
                {
                    var socketsOpened = (from socket in publishedSockets
                                         where socket.HasValue
                                         select socket);
    
                    var socketsClosed = (from socket in publishedSockets
                                         where !socket.HasValue
                                         select Unit.Default);
    
                    return from socket in socketsOpened
                           join message in messagesIn
                           on socketsClosed
                           equals messagesOut.Where(m => m == message)
                           select Tuple.Create(socket, message);
                });
            }
        }
    }

     

    Output

    A
    B
    1
    00:00:02.9907968: A1
    00:00:00.0369974: B1
    C
    X
    00:00:00.3774850: C-Failed to send
    D
    2
    00:00:01.2130269: C2
    00:00:00.0009348: D2
    E
    00:00:00.7946871: E2

     

    - Dave


    http://davesexton.com/blog

    • Marked as answer by JesperTreetop Wednesday, October 2, 2013 9:44 PM
    Wednesday, October 2, 2013 5:14 PM
  • Great, thanks for the help, I'll look into this. Marking as answer unless I run into trouble.
    Wednesday, October 2, 2013 9:44 PM
  • When I run this on my machine, I get B1 and then A1. Is there a way of maintaining FIFO order?
    Monday, October 14, 2013 2:00 PM