example of calculation time frame candle (OHLC) via RX

• Question

• Hello!

I'm new to RX.

Please help  to calculate minute (group by minute) frame candles (OHLC = Open, High, Low, Close values of price) from the ticker via RX:

class Tick

{

public DateTime Time;

public double Price;

}

class Candle

{

public DateTime Start;

public double Open;

public double High;

public double Low;

public double Close;

}

IObservable<Tick> ticker = ...;

IObservable<Candle> candles = ticker.????

Friday, August 27, 2010 9:17 PM

• Here is a full working sample.

```using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication21
{
class Program
{
static void Main(string[] args)
{
var interval = TimeSpan.FromSeconds(0.1);
var rand = new Random();
var prices = Observable.GenerateWithTime(100, _ => true, i => i, _ => interval, i => rand.Next(i - i/10, i + i/10));

var query =
from window in prices
.Timestamp()
.BufferWithTime(TimeSpan.FromSeconds(1))
select new
{
Start = window.First().Timestamp.DateTime,
Open = window.First().Value,
High = window.Max(ts => ts.Value),
Low = window.Min(ts => ts.Value),
Close = window.Last().Value
};

query.Subscribe(Console.WriteLine);
}
}
}```

• Proposed as answer by Saturday, August 28, 2010 9:40 AM
• Marked as answer by Saturday, August 28, 2010 2:11 PM
Saturday, August 28, 2010 9:04 AM
• Hi Sergey,

Ok thanks, that explanation of "windowing" makes sense.

Try the following:

```using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Linq;

namespace ReactiveProgrammingConsole
{
class WindowLab
{
static void Main()
{
IObservable<int> xs = Observable.Range(1, 9);

IObservable<IList<int>> windowed = xs.WindowWithCount(3);

using (windowed.Subscribe(window => window
.Do(x => Console.Write(x + " "))
.Run(_ => { }, () => Console.WriteLine())))
{
}
}
}

public static partial class ObservableEx2
{
public static IObservable<IList<TSource>> WindowWithCount<TSource>(this IObservable<TSource> source, int count)
{
Contract.Requires(source != null);
Contract.Requires(count >= 0);

return source.Publish(published =>
from x in published
from buffer in published.StartWith(x).BufferWithCount(count).Take(1)
where buffer.Count == count
select buffer
);
}
}
}

```

- Dave

http://davesexton.com/blog
• Marked as answer by Sunday, August 29, 2010 7:44 AM
Saturday, August 28, 2010 10:27 PM

All replies

• Hi Sergunok,

```from window in prices
.Timestamp()
.BufferWithTime(TimeSpan.FromSeconds(1))
select new
{
Start = window.First().Timestamp.DateTime,
Open = window.First().Value,
High = window.Max(ts => ts.Value),
Low = window.Min(ts => ts.Value),
Close = window.Last().Value
}
```

Saturday, August 28, 2010 8:51 AM
• Here is a full working sample.

```using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication21
{
class Program
{
static void Main(string[] args)
{
var interval = TimeSpan.FromSeconds(0.1);
var rand = new Random();
var prices = Observable.GenerateWithTime(100, _ => true, i => i, _ => interval, i => rand.Next(i - i/10, i + i/10));

var query =
from window in prices
.Timestamp()
.BufferWithTime(TimeSpan.FromSeconds(1))
select new
{
Start = window.First().Timestamp.DateTime,
Open = window.First().Value,
High = window.Max(ts => ts.Value),
Low = window.Min(ts => ts.Value),
Close = window.Last().Value
};

query.Subscribe(Console.WriteLine);
}
}
}```

• Proposed as answer by Saturday, August 28, 2010 9:40 AM
• Marked as answer by Saturday, August 28, 2010 2:11 PM
Saturday, August 28, 2010 9:04 AM
• Hi James,

thanks for the example of OHLC calculation!!!

`BufferWithTime & BufferWithCount ar really cool.`

`Do you know some analogue of BufferWithCount but for "windowing" (by window of some size)?`

Saturday, August 28, 2010 2:17 PM
• Hi Sergey,

> Do you know some analogue of BufferWithCount but for "windowing" (by window of some size)?

Could you be more specific about what you mean by "windowing"?

Without a formal definition, what comes to mind is Sample (time-based) and Where (data-based); however, Sample will only return a single element so you may have to use it in conjunction with BufferWithCount or GroupBy in order to achieve your goal.

- Dave

http://davesexton.com/blog
Saturday, August 28, 2010 7:20 PM
• Hi Dave,

"windowing" is something similar to BufferWithCount but next window is previous one shifted to the right for one element.

Example:

Collection: 1 2 3 4 5 6 7 8 9

BufferWithCount(3) ->

1 2 3

4 5 6

7 8 9

WindowWithCount(3) ->

1 2 3

2 3 4

3 4 5

4 5 6

5 6 7

6 7 8

7 8 9

"windowing" is useful like buffering for calculation of aggregations.

Hi Sergey,

> Do you know some analogue of BufferWithCount but for "windowing" (by window of some size)?

Could you be more specific about what you mean by "windowing"?

Without a formal definition, what comes to mind is Sample (time-based) and Where (data-based); however, Sample will only return a single element so you may have to use it in conjunction with BufferWithCount or GroupBy in order to achieve your goal.

- Dave

http://davesexton.com/blog

Saturday, August 28, 2010 7:58 PM
• Hi Sergey,

Ok thanks, that explanation of "windowing" makes sense.

Try the following:

```using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Linq;

namespace ReactiveProgrammingConsole
{
class WindowLab
{
static void Main()
{
IObservable<int> xs = Observable.Range(1, 9);

IObservable<IList<int>> windowed = xs.WindowWithCount(3);

using (windowed.Subscribe(window => window
.Do(x => Console.Write(x + " "))
.Run(_ => { }, () => Console.WriteLine())))
{
}
}
}

public static partial class ObservableEx2
{
public static IObservable<IList<TSource>> WindowWithCount<TSource>(this IObservable<TSource> source, int count)
{
Contract.Requires(source != null);
Contract.Requires(count >= 0);

return source.Publish(published =>
from x in published
from buffer in published.StartWith(x).BufferWithCount(count).Take(1)
where buffer.Count == count
select buffer
);
}
}
}

```

- Dave

http://davesexton.com/blog
• Marked as answer by Sunday, August 29, 2010 7:44 AM
Saturday, August 28, 2010 10:27 PM
• Hi Dave!

Thank yoou very much!

Almost everything is clear in the code except why Publish call is required.

I have read that it provides single shared subscription but why the example without it prints:

112

212

312

412

...

is the question to me :-)

By the way I suppose the "windowing" is very popular concept (for example in data mining and time series analysis) and many people are glad to see it in RX or in RX Parsers.

Hi Sergey,

Ok thanks, that explanation of "windowing" makes sense.

Try the following:

```using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Linq;

namespace ReactiveProgrammingConsole
{
class WindowLab
{
static void Main()
{
IObservable<int> xs = Observable.Range(1, 9);

IObservable<IList<int>> windowed = xs.WindowWithCount(3);

using (windowed.Subscribe(window => window
.Do(x => Console.Write(x + " "))
.Run(_ => { }, () => Console.WriteLine())))
{
}
}
}

public static partial class ObservableEx2
{
public static IObservable<IList<TSource>> WindowWithCount<TSource>(this IObservable<TSource> source, int count)
{
Contract.Requires(source != null);
Contract.Requires(count >= 0);

return source.Publish(published =>
from x in published
from buffer in published.StartWith(x).BufferWithCount(count).Take(1)
where buffer.Count == count
select buffer
);
}
}
}

```

- Dave

http://davesexton.com/blog

Sunday, August 29, 2010 7:55 AM
• Hi Sergey,

Observable.Range creates a cold observable.  A cold observable allows subscriptions to cause side-effects.  In the case of Range, it means that each new subscription will see its own full range of values (from 1 to 9 in my example).

Publish makes a cold observable hot.  A hot observable does not cause subscription side-effects; i.e., the observable is already warmed up, so a subscription may occur within the middle of the sequence without restarting the entire sequence.  It's kind of like a .NET event.

The example query uses the source observable twice.  Publish is used to ensure that both share the same subscription side-effects; i.e., both uses of published will observe the same range, from 1 to 9.  Without Publish, each would see its own range of elements because Observable.Range creates a cold observable that starts upon every new subscription.

- Dave

http://davesexton.com/blog
Sunday, August 29, 2010 9:49 AM
• updated example (so it works with new buffer commands)

```using System;
using System.Linq;

namespace ConsoleApplication87
{
class Program
{
static void Main(string[] args)
{
var interval = TimeSpan.FromSeconds(0.1);
var rand = new Random();
var prices = Observable.GenerateWithTime(100, _ => true, i => i, i => rand.Next(i - i / 10, i + i / 10), _ => interval);

var query =
from window in prices.Timestamp().BufferWithTime(TimeSpan.FromSeconds(1))
from result in window.BufferWithCount(int.MaxValue)
select new
{
Start = result.First().Timestamp.DateTime,
Open = result.First().Value,
High = result.Max(ts => ts.Value),
Low = result.Min(ts => ts.Value),
Close = result.Last().Value
};

query.Subscribe(Console.WriteLine);

}
}
}

```

James Miles ﻿﻿﻿﻿﻿http://enumeratethis.com
Wednesday, January 19, 2011 5:53 PM
• Hello,

does somebody know why the implementation of WindowWithCount will not work with IEnumerable<TSource>?

```public static IEnumerable<IList<TSource>> WindowWithCount<TSource>(this IEnumerable<TSource> source, int count)
{
return source.Publish(published =>
from x in published
from buffer in published.StartWith(x).BufferWithCount(count).Take(1)
where buffer.Count == count
select buffer
);
```

It works like "published" wasn't be published.

Sergey.
Monday, February 21, 2011 8:41 AM
• Hi Sergey,

Publish isn't quite the same thing in Ix.  I looked in Reflector and it's basically just memoizing the entire sequence.  What's probably happening, although I didn't test it myself, is that the inner enumerator (from buffer in published...) restarts from the beginning of the sequence for each x in the outer enumerator (from x in published).  Essentially, the difference in behavior boils down to the difference between reactive vs. interactive execution models.  The inner enumerator pulls from the source, so for each x it will generate a buffer by moving ahead of the outer enumerator as much as it needs.

Sometimes Share is an appropriate alternative when Publish doesn't work as expected, but in this case Share probably isn't going to work either.  I believe it would cause the outer enumerator's position to be synchronized with the inner enumerator, thus skipping count values in the outer enumerator for each buffer that is generated.

You probably need to use Publish while managing the inner enumerator's position manually, because there's no asynchrony involvedPerhaps something like the following. (Untested)

```public static IEnumerable<IList<TSource>> WindowWithCount<TSource>(this IEnumerable<TSource> source, int count)
{
return source.Publish(published =>
{
int skip = 0;

return from x in published
from buffer in published.Skip(skip++).BufferWithCount(count).Take(1)
where buffer.Count == count
select buffer
});
}
```

- Dave

http://davesexton.com/blog
Monday, February 21, 2011 3:40 PM