none
Creating TPL-independent pipelines

    Question

  • Hi,

    In an effort to ensure my code is more testable, I'm trying to do away with constructs that rely on the TPL. I want complete scheduling control. This means no use of StartAsync, Create with async/await etcetera.

    But the thing is, I'm struggling to understand how I would rewrite some of my existing code to be "pure" Rx. Here's an example:

    public IObservable<Unit> ExecuteAsync(ExecutionContext context)
    {
        var remainingDelay = this.delay;
    
        // TODO: can I do this without async/await?
        return Observable
            .Create<Unit>(
                async observer =>
                {
                    while (remainingDelay > TimeSpan.Zero)
                    {
                        context.CancellationToken.ThrowIfCancellationRequested();
    
                        await context
                            .WaitWhilePausedAsync();
    
                        var delayFor = MathExt.Min(remainingDelay, maximumDelayTime);
    
                        await this
                            .delayService
                            .DelayAsync(delayFor, context.CancellationToken);
    
                        remainingDelay -= delayFor;
                        context.AddProgress(delayFor);
                    }
    
                    observer.OnNext(Unit.Default);
                    observer.OnCompleted();
                })
            .RunAsync(context.CancellationToken);
    }


    The idea of this code is delay until some amount of time has passed. Naturally, for the purposes of tests I want to be able to control time. But the fact that I have a while loop that terminates on some condition is making it really difficult for me to understand how to do this without using async/await.

    UPDATE

    I've managed to get the behavior I want by using Generate and Zip:

    public IObservable<Unit> ExecuteAsync(ExecutionContext context)
    {
        var remaining = Observable
            .Generate(
                remainingDelay,
                r => r > TimeSpan.Zero,
                r =>
                {
                    var delayFor = MathExt.Min(remainingDelay, maximumDelayTime);
                    return r - delayFor;
                },
                r => r)
            .Publish();
        var nextRemaining = remaining
            .Skip(1)
            .Concat(Observable.Return(TimeSpan.Zero));
        var delays = remaining
            .Zip(
                nextRemaining,
                (current, next) => current - next);
        var result = delays
            .SelectMany(delay => context.WaitWhilePausedAsync().Select(_ => delay))
            .SelectMany(delay => this.delayService.DelayAsync(delay, context.CancellationToken).Select(_ => delay))
            .Do(delay => context.AddProgress(delay))
            .Select(_ => Unit.Default)
            .RunAsync(context.CancellationToken);
    
        remaining.Connect();
    
        return result;
    }

    However, I have this question:

    Is there an easier way for me to get an observable of delays from my observable of remaining time spans? Currently I'm using Zip to calculate the difference between the last remaining time span and the current one. Consequently, I have to Concat TImeSpan.Zero onto my remaining sequence. I just feel like it should be possible to do this entire step more cleanly.

    Can anyone offer any advice here?

    Thanks




    Saturday, December 19, 2015 2:21 AM