Creating TPL-independent pipelines


  • Hi,

    In an effort to ensure my code is more testable, I'm trying to do away with constructs that rely on the TPL. I want complete scheduling control. This means no use of StartAsync, Create with async/await etcetera.

    But the thing is, I'm struggling to understand how I would rewrite some of my existing code to be "pure" Rx. Here's an example:

    public IObservable<Unit> ExecuteAsync(ExecutionContext context)
        var remainingDelay = this.delay;
        // TODO: can I do this without async/await?
        return Observable
                async observer =>
                    while (remainingDelay > TimeSpan.Zero)
                        await context
                        var delayFor = MathExt.Min(remainingDelay, maximumDelayTime);
                        await this
                            .DelayAsync(delayFor, context.CancellationToken);
                        remainingDelay -= delayFor;

    The idea of this code is delay until some amount of time has passed. Naturally, for the purposes of tests I want to be able to control time. But the fact that I have a while loop that terminates on some condition is making it really difficult for me to understand how to do this without using async/await.


    I've managed to get the behavior I want by using Generate and Zip:

    public IObservable<Unit> ExecuteAsync(ExecutionContext context)
        var remaining = Observable
                r => r > TimeSpan.Zero,
                r =>
                    var delayFor = MathExt.Min(remainingDelay, maximumDelayTime);
                    return r - delayFor;
                r => r)
        var nextRemaining = remaining
        var delays = remaining
                (current, next) => current - next);
        var result = delays
            .SelectMany(delay => context.WaitWhilePausedAsync().Select(_ => delay))
            .SelectMany(delay => this.delayService.DelayAsync(delay, context.CancellationToken).Select(_ => delay))
            .Do(delay => context.AddProgress(delay))
            .Select(_ => Unit.Default)
        return result;

    However, I have this question:

    Is there an easier way for me to get an observable of delays from my observable of remaining time spans? Currently I'm using Zip to calculate the difference between the last remaining time span and the current one. Consequently, I have to Concat TImeSpan.Zero onto my remaining sequence. I just feel like it should be possible to do this entire step more cleanly.

    Can anyone offer any advice here?


    Saturday, December 19, 2015 2:21 AM