none
Rx Challenge #3 RRS feed

  • General discussion

  • this is a quest,

    I'm posting it on this forum in order to share different approach for the quest.

    so, the quest is:

    for the following marble diagram (each '-' represent a second and the number
    represent values):

    -1---------2---------3-4-5-6-7--------8---------9------

    you should suspend the output stream (for 15 seconds)
    when value intensively cross the limit of 3 values within 5 seconds.

    for the above input, which represent the values:
    # 1: after 2 second
    # 2: after 12 seconds
    # 3: after 23 seconds
    # 4: after 25 seconds
    # 5: after 27 seconds (intensively cross the limit )
    # 6: after 29 seconds
    # 7: after 31 seconds
    # 8: after 40 seconds (still within the 15 seconds suspension period)
    # 9: after 50 seconds (beyond the suspension period)

    the expected output for
    -1---------2---------3-4-5-6-7--------8---------9------
    is
    -1---------2---------3-4--------------------------9------

    you can get a test method for validating your result in here

    Good luck

     

    Bnaya Eshet


    Friday, March 6, 2015 5:37 PM

All replies

  • on latter reply I will present slightly better solution
    which encapsulate it with extension method

    [TestMethod] public void SuspendResume_Test() { int suspendOnCount = 3; TimeSpan suspendCountDuration = TimeSpan.FromSeconds(5); TimeSpan suspendDuration = TimeSpan.FromSeconds(15); /********************************************************************** * * source -1---------2---------3-4-5-6-7--------8---------9------ * Result -1---------2---------3-4------------------------9------ * **********************************************************************/ // arrange var enableToggle = new SuspensionToken(); var scheduler = new TestScheduler(); var observer = scheduler.CreateObserver<int>(); var source = new Subject<int>(); source .Scan(new SuspendCheck<int>(scheduler, suspendOnCount, suspendCountDuration, suspendDuration), SuspendCheck<int>.OnNext) .Where(m => !m.Suspend) .Select(m => m.Value) .Subscribe(observer); // act /********************************************************************** * * source -1---------2----------3-4-5-6-7--------8---------9------ * **********************************************************************/ scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks); source.OnNext(1); scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); source.OnNext(2); scheduler.AdvanceBy(TimeSpan.FromSeconds(11).Ticks); source.OnNext(3); scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks); source.OnNext(4); scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks); source.OnNext(5); scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks); source.OnNext(6); scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks); source.OnNext(7); scheduler.AdvanceBy(TimeSpan.FromSeconds(9).Ticks); source.OnNext(8); scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); source.OnNext(9); // verify var results = observer. Messages .Where(m => m.Value.Kind == NotificationKind.OnNext) .Select(m => m.Value.Value).ToArray(); Assert.AreEqual(1, results[0]); Assert.AreEqual(2, results[1]); Assert.AreEqual(3, results[2]); Assert.AreEqual(4, results[3]); Assert.AreEqual(9, results[4]); } public class SuspendCheck<T> { int suspendOnCount; TimeSpan suspendCountDuration; TimeSpan suspendDuration; IScheduler scd; private DateTimeOffset? suspend; public SuspendCheck( IScheduler scd, int suspendOnCount, TimeSpan suspendCountDuration, TimeSpan suspendDuration) { PrevStamps = new Queue<DateTimeOffset>(); suspendOnCount = suspendOnCount; suspendCountDuration = suspendCountDuration; suspendDuration = suspendDuration; scd = scd; } public Queue<DateTimeOffset> PrevStamps { get; private set; } public bool Suspend { get { return suspend.HasValue; } } public T Value { get; private set; } // should be clone // in real-life it is better to keep SuspendCheck immutable // and create new instance as return value public static SuspendCheck<T> OnNext(SuspendCheck<T> instance, T item) { instance.PrevStamps.Enqueue(instance.scd.Now); while (instance.PrevStamps.Count > instance.suspendOnCount) { instance.PrevStamps.Dequeue(); } while (instance.PrevStamps.Count != 0) { DateTimeOffset stamp = instance.PrevStamps.Peek(); stamp = stamp.Add(instance.suspendCountDuration); if (stamp > instance.scd.Now) break; instance.PrevStamps.Dequeue(); } if (!instance.suspend.HasValue && instance.PrevStamps.Count == instance.suspendOnCount) { instance.suspend = instance.scd.Now.Add(instance.suspendDuration); } else if (instance.suspend.HasValue && instance.suspend < instance.scd.Now) { instance.suspend = null; } instance.Value = item; return instance; // TODO: clone } }




    Bnaya Eshet

    Saturday, March 14, 2015 8:42 PM
  • this solution is quit similar to the previous one but more stractual

        /// <summary>
        /// Encapsulate time stamping threshold
        /// where stamps are checked against staleness duration (limit duration) 
        /// and non-staled stamp count evaluate against the count limit
        /// </summary>
        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1702:CompoundWordsShouldBeCasedCorrectly", MessageId = "TimeStamp")]
        public class TimeStampThreshold
        {
            // TODO: IoC for Concurrent Queue
            private readonly Queue<DateTimeOffset> _stamps = new Queue<DateTimeOffset>();
            private readonly IScheduler _scheduler; // for testability and threading
            private readonly int _countLimit;
            private readonly TimeSpan _staleness;
            private readonly TimeSpan _suspensionPeriod;
            private DateTimeOffset? _suspendUntil;
            private int _count = 0;
            private bool _sealed;
    
            #region Ctor
    
            /// <summary>
            /// Initializes a new instance of the <see cref="TimeStampThreshold"/> class.
            /// </summary>
            /// <param name="scheduler"></param>
            /// <param name="countLimit">The count limit.</param>
            /// <param name="staleness">Duration of the limit.</param>
            public TimeStampThreshold(
                IScheduler scheduler,
                int countLimit,
                TimeSpan staleness)
                : this(scheduler, countLimit, staleness, staleness)
            {
            }
    
            /// <summary>
            /// Initializes a new instance of the <see cref="TimeStampThreshold"/> class.
            /// </summary>
            /// <param name="scheduler"></param>
            /// <param name="countLimit">The count limit.</param>
            /// <param name="staleness">The staleness period.</param>
            /// <param name="suspensionPeriod">The period of suspension (after crossing the threshold).</param>
            public TimeStampThreshold(
                IScheduler scheduler,
                int countLimit,
                TimeSpan staleness,
                TimeSpan suspensionPeriod)
            {
                _countLimit = countLimit;
                _staleness = staleness;
                _suspensionPeriod = suspensionPeriod;
                _scheduler = scheduler;
                IsSuspended = false;
            }
    
            #endregion Ctor
    
            #region SealedAndStampClone
    
            /// <summary>
            /// Sealed sealed the origin instance and add stamp to cloned instance.
            /// </summary>
            /// <returns></returns>
            public TimeStampThreshold SealedAndStampClone()
            {
                _sealed = true;
                var result = new TimeStampThreshold(_scheduler, _countLimit, _staleness, _suspensionPeriod);
                result._suspendUntil = _suspendUntil;
                DateTimeOffset now = _scheduler.Now;
    
                var threshold = _scheduler.Now.Subtract(_staleness);
                while (_stamps.Count != 0)
                {
                    var stamp = _stamps.Dequeue();
                    if (_stamps.Count >= _countLimit)// anything above the limit is irrelevant
                        continue;
                    if (stamp >= threshold) // check the duration window (anything older is irrelevant)
                        result._stamps.Enqueue(stamp);
                }
                result._stamps.Enqueue(now);
                int count = result._stamps.Count;
                result._count = count;
                result.ShouldSuspend(count);
    
                return result;
            }
    
            #endregion SealedAndStampClone
    
            #region TryStamp
    
            /// <summary>
            /// Set new stamping point and check whether it don't cross the threshold
            /// </summary>
            /// <returns>
            /// true = below the threshold
            /// false = above the threshold
            /// </returns>
            public bool TryStamp()
            {
                return RemoveStaleStamps(true);
            }
    
            #endregion TryStamp
    
            #region RecheckIsBelowThreshold
    
            /// <summary>
            /// Remove stale stamp and check whether below threshold.
            /// </summary>
            /// <returns></returns>
            public bool RecheckIsBelowThreshold()
            {
                return RemoveStaleStamps(false);
            }
    
            #endregion RecheckIsBelowThreshold
    
            #region IsSuspended
    
            /// <summary>
            /// Determines whether in suspended state (above or equals to threshold).
            /// </summary>
            /// <returns></returns>
            public bool IsSuspended { get; private set; }
    
            #endregion IsSuspended
    
            #region RemoveStaleStamps
    
            /// <summary>
            /// Removes the stale stamps.
            /// </summary>
            /// <param name="createNewStamp">if set to <c>true</c> [create new stamp].</param>
            /// <returns></returns>
            /// <exception cref="System.InvalidOperationException">
            /// access to sealed TimeStampThreshold instance is not allowed (sealed is internal state)
            /// or
            /// Concurrent access to non thread-safe method (TimeStampThreshold.Reduce)
            /// </exception>
            [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA2204:Literals should be spelled correctly", MessageId = "TimeStampThreshold")]
            private bool RemoveStaleStamps(bool createNewStamp)
            {
                #region Validation
    
                if (_sealed)
                    throw new InvalidOperationException("access to sealed TimeStampThreshold instance is not allowed (sealed is internal state)");
    
                #endregion Validation
    
                int count = _stamps.Count;
                if (createNewStamp)
                    _stamps.Enqueue(_scheduler.Now);
                while (_stamps.Count > _countLimit)
                    _stamps.Dequeue();
    
                DateTimeOffset threshold = _scheduler.Now.Subtract(_staleness);
    
                while (_stamps.Count > 0)
                {
                    DateTimeOffset stamp = _stamps.Peek();
                    if (stamp < threshold)
                        _stamps.Dequeue();
                    else
                        break;
                }
    
                int newCount = _stamps.Count;
                int previousCount = Interlocked.CompareExchange(ref _count, newCount, count);
                #region Validation
    
                if (previousCount != count)
                    throw new InvalidOperationException("Concurrent access to non thread-safe method (TimeStampThreshold.Reduce)");
    
                #endregion Validation
    
                return !ShouldSuspend(newCount);
            }
    
            #endregion RemoveStaleStamps
    
            #region ShouldSuspend
    
            /// <summary>
            /// Check whether to suspend.
            /// </summary>
            /// <param name="count">The count.</param>
            /// <returns></returns>
            private bool ShouldSuspend(int count)
            {
                bool shouldSuspend = count >= _countLimit;
                DateTimeOffset now = _scheduler.Now;
                if (_suspendUntil.HasValue)
                {
                    if (_suspendUntil < now)
                    {
                        if (shouldSuspend)
                            _suspendUntil = now.Add(_suspensionPeriod); // reset suspension period (ass long as the pressure remains)
                        else
                            _suspendUntil = null;
                    }
                    else
                        shouldSuspend = true;
                }
                else
                {
                    if (shouldSuspend)
                        _suspendUntil = now.Add(_suspensionPeriod); // set suspension period
                }
    
                IsSuspended = shouldSuspend;
                return shouldSuspend;
            }
    
            #endregion ShouldSuspend
    
            #region Now
    
            /// <summary>
            /// Gets the now (from the scheduler).
            /// </summary>
            /// <value>
            /// The now.
            /// </value>
            public DateTimeOffset Now { get { return _scheduler.Now; } }
    
            #endregion Now
        }
        /// <summary>
        /// Suspend Resume Accumulator
        /// </summary>
        internal static class SuspendOnAccumulator
        {
            #region OnNext
    
            /// <summary>
            /// Ongoing accumulation, called for each value.
            /// </summary>
            /// <typeparam name="T"></typeparam>
            /// <param name="instance">The instance.</param>
            /// <param name="item">The item.</param>
            /// <returns></returns>
            public static SuspendOnAccumulator<T> OnNext<T>(SuspendOnAccumulator<T> instance, T item)
            {
                var result = new SuspendOnAccumulator<T>(instance, item); // clone the threshold and add stamp
                return result;
            }
    
            #endregion OnNext
        }
    
        /// <summary>
        /// Suspend Resume Accumulator
        /// </summary>
        /// <typeparam name="T"></typeparam>
        internal class SuspendOnAccumulator<T>
        {
            private readonly TimeStampThreshold _threshold;
    
            #region Ctor
    
            /// <summary>
            /// Initializes a new instance of the <see cref="SuspendOnAccumulator{T}"/> class.
            /// </summary>
            /// <param name="scd">The SCD.</param>
            /// <param name="countLimit">Count limit for triggering suspension period (within the limit duration)</param>
            /// <param name="staleness">The duration which the count limit refer to</param>
            /// <param name="suspensionPeriod">The period of suspension (when suspension trigger occurs)</param>
            public SuspendOnAccumulator(
                IScheduler scd,
                int countLimit,
                TimeSpan staleness,
                TimeSpan suspensionPeriod)
            {
                _threshold = new TimeStampThreshold(scd, countLimit, staleness, suspensionPeriod);
            }
    
            /// <summary>
            /// Initializes a new instance of the <see cref="SuspendOnAccumulator{T}" /> class.
            /// </summary>
            /// <param name="instance">The instance.</param>
            /// <param name="value">The value.</param>
            internal SuspendOnAccumulator(SuspendOnAccumulator<T> instance, T value)
            {
                _threshold = instance._threshold.SealedAndStampClone();
                Value = value;
            }
    
            #endregion Ctor
    
            #region IsSuspended
    
            /// <summary>
            /// Gets a value indicating whether the current notification should be ignored 
            /// (indicate active suspension period).
            /// </summary>
            /// <value>
            ///   <c>true</c> if [on hold]; otherwise, <c>false</c>.
            /// </value>
            public bool IsSuspended { get { return _threshold.IsSuspended; } }
    
            #endregion IsSuspended
    
            #region Value
    
            /// <summary>
            /// Gets the value.
            /// </summary>
            /// <value>
            /// The value.
            /// </value>
            public T Value { get; private set; }
    
            #endregion Value
    
            #region Now
    
            /// <summary>
            /// Gets the now (from the scheduler).
            /// </summary>
            /// <value>
            /// The now.
            /// </value>
            public DateTimeOffset Now { get { return _threshold.Now; } }
    
            #endregion Now
        }
     
        [TestClass]
        public class TimeStampThreshold_Tests
        {
            #region SealedAndStampClone_Test
    
            [TestMethod]
            public void SealedAndStampClone_Test()
            {
                /**********************************************************************
                 *
                 * source           -1---------2---------3-4-5-6-7--------8---------9------
                 * Below            -1---------2---------3-4--------------8---------9------
                 * Above or Equal   -------------------------5-6-7-------------------------
                 *                    
                 **********************************************************************/
    
                // arrange
                var scheduler = new TestScheduler();
                scheduler.AdvanceBy(TimeSpan.FromDays(1).Ticks); // avoid negative time
                var threshold = new TimeStampThreshold(scheduler, 3, TimeSpan.FromSeconds(5));
    
                // act & verify
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 2
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 12
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(11).Ticks); // 23
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 25
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 27 - Suspend
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 29 - Suspend
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 31 - Suspend
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(9).Ticks);  // 40 
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 50
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);
            }
    
            #endregion // SealedAndStampClone_Test
    
            #region TryStamp_Test
    
            [TestMethod]
            public void TryStamp_Test()
            {
                /**********************************************************************
                 *
                 * source           -1---------2---------3-4-5-6-7--------8---------9------
                 * Below            -1---------2---------3-4--------------8---------9------
                 * Above or Equal   -------------------------5-6-7-------------------------
                 *                    
                 **********************************************************************/
    
                // arrange
                var scheduler = new TestScheduler();
                scheduler.AdvanceBy(TimeSpan.FromDays(1).Ticks); // avoid negative time
                var threshold = new TimeStampThreshold(scheduler, 3, TimeSpan.FromSeconds(5));
    
                // act & verify
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 2
                bool belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 12
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(11).Ticks); // 23
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 25
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 27 - Above
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 29 - Above
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 31 - Above
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(9).Ticks);  // 40 
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);
            }
    
            #endregion // TryStamp_Test
    
            #region SealedAndStampClone_Resuspend_Test
    
            [TestMethod]
            public void SealedAndStampClone_Resuspend_Test()
            {
                /******************************************************************************
                 *
                 * source -1---------2---------3-4-5-----------6-7-8---------9---------10------
                 * Result -1---------2---------3-4-------------------------------------10------
                 *                    
                 ******************************************************************************/
    
                // arrange
                var scheduler = new TestScheduler();
                scheduler.AdvanceBy(TimeSpan.FromDays(1).Ticks); // avoid negative time
                var threshold = new TimeStampThreshold(scheduler, 3, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
    
                // act & verify
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 2
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 12
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(11).Ticks); // 23
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 25
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 27 - Suspend
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(12).Ticks);  // 39 - Suspend
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 41 - Suspend
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 43 - Suspend -> above threshold (re-suspend)
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 53 - Suspend (within the new re-suspend period)
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 63
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);
            }
    
            #endregion // SealedAndStampClone_Resuspend_Test
    
            #region TryStamp_Resuspend_Test
    
            [TestMethod]
            public void TryStamp_Resuspend_Test()
            {
                /**********************************************************************
                 *
                 * source           -1---------2---------3-4-5-6-7--------8---------9------
                 * Below            -1---------2---------3-4--------------8---------9------
                 * Above or Equal   -------------------------5-6-7-------------------------
                 *                    
                 **********************************************************************/
    
                // arrange
                var scheduler = new TestScheduler();
                scheduler.AdvanceBy(TimeSpan.FromDays(1).Ticks); // avoid negative time
                var threshold = new TimeStampThreshold(scheduler, 3, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
    
                // act & verify
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 2
                bool belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 12
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(11).Ticks); // 23
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 25
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 27 - Above
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(12).Ticks);  // 39 - Above
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 41 - Above
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 43 - Suspend -> above threshold (re-suspend) 
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 53 - Suspend (within the new re-suspend period) 
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 63 
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);
           }
    
            #endregion // TryStamp_Resuspend_Test
        }
        [TestClass]
        public class RxSuspendResume_Tests
        {
            #region SuspendOn_ResumeAfterSuspensionPeriod_Test
    
            [TestMethod]
            public void SuspendOn_ResumeAfterSuspensionPeriod_Test()
            {
                /**********************************************************************
                 *
                 * source -1---------2---------3-4-5-6-7--------8---------9------
                 * Result -1---------2---------3-4------------------------9------
                 *                    
                 **********************************************************************/
    
                // arrange
                var scheduler = new TestScheduler();
                scheduler.AdvanceBy(TimeSpan.FromDays(1).Ticks); // avoid negative time
    
                var observer = scheduler.CreateObserver<int>();
    
                var source = new Subject<int>();
    
                source
                    .SuspendWhen(3, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15), scheduler)
                    .Subscribe(observer);
    
                // act
                /**********************************************************************
                 *
                 * source -1---------2----------3-4-5-6-7--------8---------9------
                 *                    
                 **********************************************************************/
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 2
                source.OnNext(1);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 12
                source.OnNext(2);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(11).Ticks); // 23
                source.OnNext(3);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 25
                source.OnNext(4);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 27 - Start Until 42
                source.OnNext(5);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 29 - Suspend
                source.OnNext(6);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 31 - Suspend
                source.OnNext(7);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(9).Ticks);  // 40 - Suspend
                source.OnNext(8);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 50
                source.OnNext(9);
    
                // verify
                var results = observer.
                    Messages
                    .Where(m => m.Value.Kind == NotificationKind.OnNext)
                    .Select(m => m.Value.Value).ToArray();
    
                Assert.AreEqual(1, results[0]);
                Assert.AreEqual(2, results[1]);
                Assert.AreEqual(3, results[2]);
                Assert.AreEqual(4, results[3]);
                Assert.AreEqual(9, results[4]);
            }
    
            #endregion // SuspendOn_ResumeAfterSuspensionPeriod_Test
    
            #region SuspendOn_ResumeAfterSuspensionPeriodAndBelowThreshold_Test
    
            [TestMethod]
            public void SuspendOn_ResumeAfterSuspensionPeriodAndBelowThreshold_Test()
            {
                /******************************************************************************
                 *
                 * source -1---------2---------3-4-5-----------6-7-8---------9---------10------
                 * Result -1---------2---------3-4-------------------------------------10------
                 *                    
                 ******************************************************************************/
    
                // arrange
                var scheduler = new TestScheduler();
                scheduler.AdvanceBy(TimeSpan.FromDays(1).Ticks); // avoid negative time
    
                var observer = scheduler.CreateObserver<int>();
    
                var source = new Subject<int>();
    
                source
                    .SuspendWhen(3, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15), scheduler)
                    .Subscribe(observer);
    
                // act
                /******************************************************************************
                 *
                 * source -1---------2---------3-4-5-----------6-7-8---------9---------10------
                 *                    
                 ******************************************************************************/
    
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 2
                source.OnNext(1);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 12
                source.OnNext(2);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(11).Ticks); // 23
                source.OnNext(3);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 25
                source.OnNext(4);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 27 - Start Until 41
                source.OnNext(5);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(12).Ticks); // 39 - Suspend
                source.OnNext(6);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 41 - Suspend
                source.OnNext(7);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 43 - Suspend -> above threshold (re-suspend)
                source.OnNext(8);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 53 - Suspend (within the new re-suspend period)
                source.OnNext(9);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 63  
                source.OnNext(10);
    
                // verify
                var results = observer.
                    Messages
                    .Where(m => m.Value.Kind == NotificationKind.OnNext)
                    .Select(m => m.Value.Value).ToArray();
    
                Assert.AreEqual(1, results[0]);
                Assert.AreEqual(2, results[1]);
                Assert.AreEqual(3, results[2]);
                Assert.AreEqual(4, results[3]);
                Assert.AreEqual(10, results[4]);
            }
    
            #endregion // SuspendOn_ResumeAfterSuspensionPeriodAndBelowThreshold_Test
        }
    

    Bnaya Eshet


    Monday, March 23, 2015 5:28 AM
  • this solution is quit similar to the previous one but more stractual

        /// <summary>
        /// Encapsulate time stamping threshold
        /// where stamps are checked against staleness duration (limit duration) 
        /// and non-staled stamp count evaluate against the count limit
        /// </summary>
        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1702:CompoundWordsShouldBeCasedCorrectly", MessageId = "TimeStamp")]
        public class TimeStampThreshold
        {
            // TODO: IoC for Concurrent Queue
            private readonly Queue<DateTimeOffset> _stamps = new Queue<DateTimeOffset>();
            private readonly IScheduler _scheduler; // for testability and threading
            private readonly int _countLimit;
            private readonly TimeSpan _staleness;
            private readonly TimeSpan _suspensionPeriod;
            private DateTimeOffset? _suspendUntil;
            private int _count = 0;
            private bool _sealed;

            #region Ctor

            /// <summary>
            /// Initializes a new instance of the <see cref="TimeStampThreshold"/> class.
            /// </summary>
            /// <param name="scheduler"></param>
            /// <param name="countLimit">The count limit.</param>
            /// <param name="staleness">Duration of the limit.</param>
            public TimeStampThreshold(
                IScheduler scheduler,
                int countLimit,
                TimeSpan staleness)
                : this(scheduler, countLimit, staleness, staleness)
            {
            }

            /// <summary>
            /// Initializes a new instance of the <see cref="TimeStampThreshold"/> class.
            /// </summary>
            /// <param name="scheduler"></param>
            /// <param name="countLimit">The count limit.</param>
            /// <param name="staleness">The staleness period.</param>
            /// <param name="suspensionPeriod">The period of suspension (after crossing the threshold).</param>
            public TimeStampThreshold(
                IScheduler scheduler,
                int countLimit,
                TimeSpan staleness,
                TimeSpan suspensionPeriod)
            {
                _countLimit = countLimit;
                _staleness = staleness;
                _suspensionPeriod = suspensionPeriod;
                _scheduler = scheduler;
                IsSuspended = false;
            }

            #endregion Ctor

            #region SealedAndStampClone

            /// <summary>
            /// Sealed sealed the origin instance and add stamp to cloned instance.
            /// </summary>
            /// <returns></returns>
            public TimeStampThreshold SealedAndStampClone()
            {
                _sealed = true;
                var result = new TimeStampThreshold(_scheduler, _countLimit, _staleness, _suspensionPeriod);
                result._suspendUntil = _suspendUntil;
                DateTimeOffset now = _scheduler.Now;

                var threshold = _scheduler.Now.Subtract(_staleness);
                while (_stamps.Count != 0)
                {
                    var stamp = _stamps.Dequeue();
                    if (_stamps.Count >= _countLimit)// anything above the limit is irrelevant
                        continue;
                    if (stamp >= threshold) // check the duration window (anything older is irrelevant)
                        result._stamps.Enqueue(stamp);
                }
                result._stamps.Enqueue(now);
                int count = result._stamps.Count;
                result._count = count;
                result.ShouldSuspend(count);

                return result;
            }

            #endregion SealedAndStampClone

            #region TryStamp

            /// <summary>
            /// Set new stamping point and check whether it don't cross the threshold
            /// </summary>
            /// <returns>
            /// true = below the threshold
            /// false = above the threshold
            /// </returns>
            public bool TryStamp()
            {
                return RemoveStaleStamps(true);
            }

            #endregion TryStamp

            #region RecheckIsBelowThreshold

            /// <summary>
            /// Remove stale stamp and check whether below threshold.
            /// </summary>
            /// <returns></returns>
            public bool RecheckIsBelowThreshold()
            {
                return RemoveStaleStamps(false);
            }

            #endregion RecheckIsBelowThreshold

            #region IsSuspended

            /// <summary>
            /// Determines whether in suspended state (above or equals to threshold).
            /// </summary>
            /// <returns></returns>
            public bool IsSuspended { get; private set; }

            #endregion IsSuspended

            #region RemoveStaleStamps

            /// <summary>
            /// Removes the stale stamps.
            /// </summary>
            /// <param name="createNewStamp">if set to <c>true</c> [create new stamp].</param>
            /// <returns></returns>
            /// <exception cref="System.InvalidOperationException">
            /// access to sealed TimeStampThreshold instance is not allowed (sealed is internal state)
            /// or
            /// Concurrent access to non thread-safe method (TimeStampThreshold.Reduce)
            /// </exception>
            [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA2204:Literals should be spelled correctly", MessageId = "TimeStampThreshold")]
            private bool RemoveStaleStamps(bool createNewStamp)
            {
                #region Validation

                if (_sealed)
                    throw new InvalidOperationException("access to sealed TimeStampThreshold instance is not allowed (sealed is internal state)");

                #endregion Validation

                int count = _stamps.Count;
                if (createNewStamp)
                    _stamps.Enqueue(_scheduler.Now);
                while (_stamps.Count > _countLimit)
                    _stamps.Dequeue();

                DateTimeOffset threshold = _scheduler.Now.Subtract(_staleness);

                while (_stamps.Count > 0)
                {
                    DateTimeOffset stamp = _stamps.Peek();
                    if (stamp < threshold)
                        _stamps.Dequeue();
                    else
                        break;
                }

                int newCount = _stamps.Count;
                int previousCount = Interlocked.CompareExchange(ref _count, newCount, count);
                #region Validation

                if (previousCount != count)
                    throw new InvalidOperationException("Concurrent access to non thread-safe method (TimeStampThreshold.Reduce)");

                #endregion Validation

                return !ShouldSuspend(newCount);
            }

            #endregion RemoveStaleStamps

            #region ShouldSuspend

            /// <summary>
            /// Check whether to suspend.
            /// </summary>
            /// <param name="count">The count.</param>
            /// <returns></returns>
            private bool ShouldSuspend(int count)
            {
                bool shouldSuspend = count >= _countLimit;
                DateTimeOffset now = _scheduler.Now;
                if (_suspendUntil.HasValue)
                {
                    if (_suspendUntil < now)
                    {
                        if (shouldSuspend)
                            _suspendUntil = now.Add(_suspensionPeriod); // reset suspension period (ass long as the pressure remains)
                        else
                            _suspendUntil = null;
                    }
                    else
                        shouldSuspend = true;
                }
                else
                {
                    if (shouldSuspend)
                        _suspendUntil = now.Add(_suspensionPeriod); // set suspension period
                }

                IsSuspended = shouldSuspend;
                return shouldSuspend;
            }

            #endregion ShouldSuspend

            #region Now

            /// <summary>
            /// Gets the now (from the scheduler).
            /// </summary>
            /// <value>
            /// The now.
            /// </value>
            public DateTimeOffset Now { get { return _scheduler.Now; } }

            #endregion Now
        }

        /// <summary>
        /// Suspend Resume Accumulator
        /// </summary>
        internal static class SuspendOnAccumulator
        {
            #region OnNext

            /// <summary>
            /// Ongoing accumulation, called for each value.
            /// </summary>
            /// <typeparam name="T"></typeparam>
            /// <param name="instance">The instance.</param>
            /// <param name="item">The item.</param>
            /// <returns></returns>
            public static SuspendOnAccumulator<T> OnNext<T>(SuspendOnAccumulator<T> instance, T item)
            {
                var result = new SuspendOnAccumulator<T>(instance, item); // clone the threshold and add stamp
                return result;
            }

            #endregion OnNext
        }

        /// <summary>
        /// Suspend Resume Accumulator
        /// </summary>
        /// <typeparam name="T"></typeparam>
        internal class SuspendOnAccumulator<T>
        {
            private readonly TimeStampThreshold _threshold;

            #region Ctor

            /// <summary>
            /// Initializes a new instance of the <see cref="SuspendOnAccumulator{T}"/> class.
            /// </summary>
            /// <param name="scd">The SCD.</param>
            /// <param name="countLimit">Count limit for triggering suspension period (within the limit duration)</param>
            /// <param name="staleness">The duration which the count limit refer to</param>
            /// <param name="suspensionPeriod">The period of suspension (when suspension trigger occurs)</param>
            public SuspendOnAccumulator(
                IScheduler scd,
                int countLimit,
                TimeSpan staleness,
                TimeSpan suspensionPeriod)
            {
                _threshold = new TimeStampThreshold(scd, countLimit, staleness, suspensionPeriod);
            }

            /// <summary>
            /// Initializes a new instance of the <see cref="SuspendOnAccumulator{T}" /> class.
            /// </summary>
            /// <param name="instance">The instance.</param>
            /// <param name="value">The value.</param>
            internal SuspendOnAccumulator(SuspendOnAccumulator<T> instance, T value)
            {
                _threshold = instance._threshold.SealedAndStampClone();
                Value = value;
            }

            #endregion Ctor

            #region IsSuspended

            /// <summary>
            /// Gets a value indicating whether the current notification should be ignored 
            /// (indicate active suspension period).
            /// </summary>
            /// <value>
            ///   <c>true</c> if [on hold]; otherwise, <c>false</c>.
            /// </value>
            public bool IsSuspended { get { return _threshold.IsSuspended; } }

            #endregion IsSuspended

            #region Value

            /// <summary>
            /// Gets the value.
            /// </summary>
            /// <value>
            /// The value.
            /// </value>
            public T Value { get; private set; }

            #endregion Value

            #region Now

            /// <summary>
            /// Gets the now (from the scheduler).
            /// </summary>
            /// <value>
            /// The now.
            /// </value>
            public DateTimeOffset Now { get { return _threshold.Now; } }

            #endregion Now
        }

     

        [TestClass]
        public class TimeStampThreshold_Tests
        {
            #region SealedAndStampClone_Test

            [TestMethod]
            public void SealedAndStampClone_Test()
            {
                /**********************************************************************
                 *
                 * source           -1---------2---------3-4-5-6-7--------8---------9------
                 * Below            -1---------2---------3-4--------------8---------9------
                 * Above or Equal   -------------------------5-6-7-------------------------
                 *                    
                 **********************************************************************/

                // arrange
                var scheduler = new TestScheduler();
                scheduler.AdvanceBy(TimeSpan.FromDays(1).Ticks); // avoid negative time
                var threshold = new TimeStampThreshold(scheduler, 3, TimeSpan.FromSeconds(5));

                // act & verify

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 2
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 12
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(11).Ticks); // 23
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 25
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 27 - Suspend
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 29 - Suspend
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 31 - Suspend
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(9).Ticks);  // 40 
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 50
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);
            }

            #endregion // SealedAndStampClone_Test

            #region TryStamp_Test

            [TestMethod]
            public void TryStamp_Test()
            {
                /**********************************************************************
                 *
                 * source           -1---------2---------3-4-5-6-7--------8---------9------
                 * Below            -1---------2---------3-4--------------8---------9------
                 * Above or Equal   -------------------------5-6-7-------------------------
                 *                    
                 **********************************************************************/

                // arrange
                var scheduler = new TestScheduler();
                scheduler.AdvanceBy(TimeSpan.FromDays(1).Ticks); // avoid negative time
                var threshold = new TimeStampThreshold(scheduler, 3, TimeSpan.FromSeconds(5));

                // act & verify

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 2
                bool belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 12
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(11).Ticks); // 23
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 25
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 27 - Above
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 29 - Above
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 31 - Above
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(9).Ticks);  // 40 
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);
            }

            #endregion // TryStamp_Test

            #region SealedAndStampClone_Resuspend_Test

            [TestMethod]
            public void SealedAndStampClone_Resuspend_Test()
            {
                /******************************************************************************
                 *
                 * source -1---------2---------3-4-5-----------6-7-8---------9---------10------
                 * Result -1---------2---------3-4-------------------------------------10------
                 *                    
                 ******************************************************************************/

                // arrange
                var scheduler = new TestScheduler();
                scheduler.AdvanceBy(TimeSpan.FromDays(1).Ticks); // avoid negative time
                var threshold = new TimeStampThreshold(scheduler, 3, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));

                // act & verify

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 2
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 12
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(11).Ticks); // 23
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 25
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 27 - Suspend
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(12).Ticks);  // 39 - Suspend
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 41 - Suspend
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 43 - Suspend -> above threshold (re-suspend)
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 53 - Suspend (within the new re-suspend period)
                threshold = threshold.SealedAndStampClone();
                Assert.IsTrue(threshold.IsSuspended);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 63
                threshold = threshold.SealedAndStampClone();
                Assert.IsFalse(threshold.IsSuspended);
            }

            #endregion // SealedAndStampClone_Resuspend_Test

            #region TryStamp_Resuspend_Test

            [TestMethod]
            public void TryStamp_Resuspend_Test()
            {
                /**********************************************************************
                 *
                 * source           -1---------2---------3-4-5-6-7--------8---------9------
                 * Below            -1---------2---------3-4--------------8---------9------
                 * Above or Equal   -------------------------5-6-7-------------------------
                 *                    
                 **********************************************************************/

                // arrange
                var scheduler = new TestScheduler();
                scheduler.AdvanceBy(TimeSpan.FromDays(1).Ticks); // avoid negative time
                var threshold = new TimeStampThreshold(scheduler, 3, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));

                // act & verify

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 2
                bool belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 12
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(11).Ticks); // 23
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 25
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 27 - Above
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(12).Ticks);  // 39 - Above
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 41 - Above
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 43 - Suspend -> above threshold (re-suspend) 
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 53 - Suspend (within the new re-suspend period) 
                belowThreshold = threshold.TryStamp();
                Assert.IsFalse(belowThreshold);

                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 63 
                belowThreshold = threshold.TryStamp();
                Assert.IsTrue(belowThreshold);
           }

            #endregion // TryStamp_Resuspend_Test
        }

        [TestClass]
        public class RxSuspendResume_Tests
        {
            #region SuspendOn_ResumeAfterSuspensionPeriod_Test

            [TestMethod]
            public void SuspendOn_ResumeAfterSuspensionPeriod_Test()
            {
                /**********************************************************************
                 *
                 * source -1---------2---------3-4-5-6-7--------8---------9------
                 * Result -1---------2---------3-4------------------------9------
                 *                    
                 **********************************************************************/

                // arrange
                var scheduler = new TestScheduler();
                scheduler.AdvanceBy(TimeSpan.FromDays(1).Ticks); // avoid negative time

                var observer = scheduler.CreateObserver<int>();

                var source = new Subject<int>();

                source
                    .SuspendWhen(3, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15), scheduler)
                    .Subscribe(observer);

                // act
                /**********************************************************************
                 *
                 * source -1---------2----------3-4-5-6-7--------8---------9------
                 *                    
                 **********************************************************************/

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 2
                source.OnNext(1);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 12
                source.OnNext(2);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(11).Ticks); // 23
                source.OnNext(3);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 25
                source.OnNext(4);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 27 - Start Until 42
                source.OnNext(5);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 29 - Suspend
                source.OnNext(6);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 31 - Suspend
                source.OnNext(7);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(9).Ticks);  // 40 - Suspend
                source.OnNext(8);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 50
                source.OnNext(9);

                // verify
                var results = observer.
                    Messages
                    .Where(m => m.Value.Kind == NotificationKind.OnNext)
                    .Select(m => m.Value.Value).ToArray();

                Assert.AreEqual(1, results[0]);
                Assert.AreEqual(2, results[1]);
                Assert.AreEqual(3, results[2]);
                Assert.AreEqual(4, results[3]);
                Assert.AreEqual(9, results[4]);
            }

            #endregion // SuspendOn_ResumeAfterSuspensionPeriod_Test

            #region SuspendOn_ResumeAfterSuspensionPeriodAndBelowThreshold_Test

            [TestMethod]
            public void SuspendOn_ResumeAfterSuspensionPeriodAndBelowThreshold_Test()
            {
                /******************************************************************************
                 *
                 * source -1---------2---------3-4-5-----------6-7-8---------9---------10------
                 * Result -1---------2---------3-4-------------------------------------10------
                 *                    
                 ******************************************************************************/

                // arrange
                var scheduler = new TestScheduler();
                scheduler.AdvanceBy(TimeSpan.FromDays(1).Ticks); // avoid negative time

                var observer = scheduler.CreateObserver<int>();

                var source = new Subject<int>();

                source
                    .SuspendWhen(3, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15), scheduler)
                    .Subscribe(observer);

                // act
                /******************************************************************************
                 *
                 * source -1---------2---------3-4-5-----------6-7-8---------9---------10------
                 *                    
                 ******************************************************************************/

                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 2
                source.OnNext(1);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 12
                source.OnNext(2);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(11).Ticks); // 23
                source.OnNext(3);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 25
                source.OnNext(4);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 27 - Start Until 41
                source.OnNext(5);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(12).Ticks); // 39 - Suspend
                source.OnNext(6);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 41 - Suspend
                source.OnNext(7);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);  // 43 - Suspend -> above threshold (re-suspend)
                source.OnNext(8);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 53 - Suspend (within the new re-suspend period)
                source.OnNext(9);
                scheduler.AdvanceBy(TimeSpan.FromSeconds(10).Ticks); // 63  
                source.OnNext(10);

                // verify
                var results = observer.
                    Messages
                    .Where(m => m.Value.Kind == NotificationKind.OnNext)
                    .Select(m => m.Value.Value).ToArray();

                Assert.AreEqual(1, results[0]);
                Assert.AreEqual(2, results[1]);
                Assert.AreEqual(3, results[2]);
                Assert.AreEqual(4, results[3]);
                Assert.AreEqual(10, results[4]);
            }

            #endregion // SuspendOn_ResumeAfterSuspensionPeriodAndBelowThreshold_Test
        }


    Bnaya Eshet

    Monday, March 23, 2015 5:30 AM