I need to repeat all values of a hot observable based on a duration, so I wrote the next extension. However it has a memory leak, can someone help me write it correctly?
public static IObservable<T> RepeatAllDuringSilence<T>(this IObservable<T> source, TimeSpan maxQuietPeriod,
IScheduler scheduler=null, IObservable<T> inner=null) {
if (scheduler==null)
scheduler=Scheduler.Default;
if (inner == null)
inner = source;
return Observable.Create<T>(observer => {
var replay = inner.Replay(scheduler);
var sequence = source.Select(x => Observable.Interval(maxQuietPeriod, scheduler).SelectMany(_ => replay).StartWith(scheduler,x)).Switch();
return new CompositeDisposable( replay.Connect(), sequence.Subscribe(observer));
});
}