Asked by:
Release Notes for Reactive Extensions 1.1.11011.11 (Experimental Release)

General discussion
-
Release Notes for Reactive Extensions 1.1.11011.11 (Experimental Release)
This new release of Reactive Extensions (Rx) is an experimental release mainly focusing on support for the new async support in .NET Framework 4.5, and the Windows Runtime (WinRT) capabilities.
Improved Exception Management
.NET Framework 4.5 introduces a new type called ExceptionDispatchInfo, discussed in length here. This type helps to improve on exception propagation in framework libraries like the TPL.
One of the issues libraries like those (including Rx) encounter is the need to pass exceptions “out of band” with regards to regular control flow (using the throw keyword), for example by storing an exception object on a Task<T> or by propagating an exception through the OnError channel on IObserver<T>. All is fine up to the point where the exception needs to be rethrown, for example when accessing the Result of a task, or when performing a blocking call on an observable sequence, e.g. First. When throwing the exception object again, the CLR blows away existing info that’s kept in the object, including the call stack and Watson (Windows Error Reporting) data. This degrades diagnostics and ease of debugging.
With ExceptionDispatchInfo, diagnostic information associated with an exception object can be kept around and reapplied at the point the exception needs to be rethrown. This functionality was introduced on .NET Framework 4.5 to improve the programming experience when using the new await support in the C# 5.0 and Visual Basic 11 languages. In Rx, we leverage this facility now to cure all exception objects that have been flowing through the OnError channel on IObserver<T> in order to restore diagnostic information prior to rethrowing the exception.
Example
The following code sample causes the exception object flowing through the observable sequence obtained from Start (originating from the underlying Bar call) to be rethrown by the synchronous call to First.
static void Main()
{
try
{
Observable.Start<int>(Bar).First();
}
catch (Exception ex)
{
Console.WriteLine(ex.StackTrace);
}
}
static int Bar(){
throw new InvalidOperationException("Foo!");
}When running this code on frameworks prior to .NET Framework 4.5, the following is printed:
at System.Reactive.ExceptionHelpers.Throw[T](Exception exception)
at System.Reactive.Linq.Observable.FirstOrDefaultInternal[TSource](IObservable`1 source, Boolean throwOnEmpty)
at System.Reactive.Linq.Observable.First[TSource](IObservable`1 source)
at Program.Main()Using Rx on .NET Framework 4.5, the behavior is different:
at ConsoleApplication6.Program.Bar()
at System.Reactive.Linq.Observable.<>c__DisplayClass6b`1.<>c__DisplayClass6d.<ToAsync>b__6a()
--- End of stack trace from previous location where exception was thrown ---
at System.Reactive.Linq.Observable.FirstOrDefaultInternal[TSource](IObservable`1 source, Boolean throwOnEmpty)
at Program.Main()Abstract Base Class ObservableBase<T>
From time to time, the question on how to implement the IObservable<T> interface correctly pops up. In 90% of the cases, the simple answer is to use Observable.Create in order to create an anonymous instance of the interface, as shown below:
static IObservable<int> BarObservable(IScheduler scheduler)
{
return Observable.Create<int>(observer =>
{
return scheduler.Schedule(() =>
{
var x = default(int);
try
{
x = Bar();
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}
observer.OnNext(x);
observer.OnCompleted();
});
});
}
static int Bar()
{
return 42;
}However, as more 3<sup>rd</sup> parties are developing APIs that expose types implementing IObservable<T>, the functional style approach of Observable.Create isn’t always a perfect fit. In this release, we introduce an abstract base class called ObservableBase<T> that helps with those scenarios:
namespace System.Reactive
{
public abstract class ObservableBase<T> : IObservable<T>
{
public IDisposable Subscribe(IObserver<T> observer) { … }
public abstract IDisposable SubscribeCore(IObserver<T> observer);
}
}Internally, the non-virtual Subscribe method wraps the call to the abstract SubscribeCore method where the implementer can write the core of the subscribe operation. The non-trivial wrapper provided by the Subscribe method ensures the notification contract of observable sequences (e.g. OnError is a terminal message) is honored, guarantees proper clean-up upon receiving terminal messages, and interacts with Rx’s scheduling facilities to ensure the current thread scheduler can perform non-blocking operations (by inserting a so-called trampoline if needed).
Example
The following example shows the equivalent of using Observable.Create but using a named type.
class BarObservable : ObservableBase<int>
{
private readonly IScheduler _scheduler;
public BarObservable(IScheduler scheduler)
{
_scheduler = scheduler;
}
public override IDisposable SubscribeCore(IObserver<int> observer)
{
return _scheduler.Schedule(() =>
{
var x = default(int);
try
{
x = Bar();
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}
observer.OnNext(x);
observer.OnCompleted();
});
}
static int Bar()
{
return 42;
}
}Note: In case implementers of a type cannot use ObservableBase<T> as a base type (because of single inheritance restrictions), we recommend using Observable.Create (which in fact implement the abstract class internally) and exposing the created observable sequence using containment.
using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }Thursday, October 13, 2011 7:52 AM
All replies
-
Prefer Async Policy
Starting with .NET Framework 4.5, single-value asynchrony is standardized on the Task<T> type, in favor of old approaches such as the Asynchronous Programming Model (APM). Support for asynchronous methods and the await expression keyword – added to the C# and Visual Basic languages – makes dealing with asynchrony much easier than ever before. In this release of Rx, we encourage users of .NET Framework 4.5 and beyond to avoid blocking as much as possible, and hence we’re reducing the amount of blocking calls in the library.
Users of Rx on .NET Framework 4.5 will encounter obsolete warnings for the following query operators:
· First, FirstOrDefault
· Last, LastOrDefault
· Single, SingleOrDefault
Asynchronous variants suffixed with Async have been added, returning IObservable<T> sequences with one element (or producing an error). Thanks to await support for observable sequence (see later), one can await the result of those operators.
The same obsolete warning applies to using the blocking ForEach method, in favor of ForEachAsync which returns a Task object that can be awaited to observe the end of the sequence.
To block on an observable sequence, Rx provides a single Wait method that has the semantics of the blocking Last operator, mirrored after the same method that appears on Task. Its behavior is to block till the end of the sequence, rethrow the OnError exception (if any) or return the last element of the sequence. If no element appears in the sequence, an InvalidOperationException is thrown. To wait for the completion of an empty sequence, one can compose the original sequence with DefaultIfEmpty to ensure the presence of an element.
Example
The code shown below uses the synchronous, blocking Last method, permanently blocking the caller.
static int Foo()
{
return Observable.Never<int>().Last();
}This code compiles with an obsolete warning when using Rx for .NET Framework 4.5:
test.cs(15,17): warning CS0618:
'System.Reactive.Linq.Observable.First<TSource>(System.IObservable<TSource>)' is obsolete:
'This blocking operation is no longer supported. Instead, use the async version in combination with C#
and Visual Basic async/await support. In case you need a blocking operation, use Wait or convert the
resulting observable sequence to a Task object and block.'Using the new await keyword, one can write the following code now, avoiding blocking the caller infinitely:
static async Task<int> Foo()
{
return await Observable.Never<int>().LastAsync();
}Await Support for Observable Sequences
Rx for .NET Framework 4.5 includes support to await an observable sequence using the new C# 5.0 and Visual Basic 11 language features. Awaiting an observable sequence asynchronously retrieves the last element of the sequence or throws an InvalidOperationException if the sequence is empty. In order to collect more than just the last element of the sequence, use combinators like ToList or ToArray to collect – asynchronously – the elements of the sequence in an in-memory collection.
Example
The following code illustrates a simple use of await on an observable time-delayed sequence, within an event handler for a button click.
static async void button_Click()
{
int x = await Observable.Return(42).Delay(TimeSpan.FromSeconds(5));
// x with value 42 is returned after 5 seconds
label.Text = x.ToString();
}Starting with this release of Rx, the continuation of the await expression (i.e. the assignment to x and code following this statement) runs on the same SynchronizationContext as where the await operation was initiated. This behavior is expected for implementers of await supporting types. For the sample code above, this means we can safely update the UI after the asynchronous operation completes.
Notice use of await makes an observable sequence hot by causing a subscription to take place. Included in this release is await support for IConnectableObservable<T>, which causes connecting the sequence to its source as well as subscribing to it. Without the Connect call, the await operation would never complete.
Example
The next sample shows await support for connectable observable sequences that result from multicast operators like Publish.
static async void Foo()
{
var xs = Observable.Defer(() =>
{
Console.WriteLine("Operation started!");
return Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
});
var ys = xs.Publish();
// This doesn't trigger a connection with the source yet.
ys.Subscribe(x => Console.WriteLine("Value = " + x));
// During the asynchronous sleep, nothing will be printed.
await Task.Delay(5000);
// Awaiting causes the connection to be made. Values will be printed now,
// and the code below will return 9 after 10 seconds.
var y = await ys;
Console.WriteLine("Await result = " + y);
}Due to the implicit subscription caused by awaiting an observable sequence, there’s no direct way to put you hand on the IDisposable object returned from the Subscribe call. In order to make cancellation work, we provide a new bridging operator called StartAsync that accepts a CancellationToken that can be used to signal cancellation. This operator makes the sequence hot (backed by an AsyncSubject<T> underneath in order to ensure the result isn’t lost due to race conditions) and wires the internal subscription to the token that’s passed in. When cancellation occurs, the underlying subscription is disposed and an OperationCanceledException is propagated to the subscriber (typically the caller of the await operation).
Example
static async void Bar()
{
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(30));
try
{
Console.WriteLine("The answer is " + await Foo(cts.Token));
}
catch (OperationCanceledException)
{
Console.WriteLine("The operation was canceled.");
}
}
static async Task<int> Foo(CancellationToken ct)
{
var xs = Observable.Return(21).Delay(TimeSpan.MaxValue);
return 2 * await xs.StartAsync(ct);
}
using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }Thursday, October 13, 2011 7:56 AM -
Interoperability with Task<T>
Due to the standardization of single-value asynchronous operations based on Task<T>, the need for bridge operators like FromAsyncPattern (based on the Asynchronous Programming Model) has been reduced. Starting with this release, users of Rx for .NET Framework 4.5 will see obsolete warnings when using those bridge operators, in favor of using Task<T> and to ToObservable conversion operator. Given the BCL contains Task-based asynchronous methods starting with .NET Framework 4.5, changes should be straightforward.
Example
The following example demonstrates the classic Rx sample of composing an event stream of user input (terms) with a web service based dictionary lookup service call to find matching entries in a dictionary.
static IObservable<DictionaryWord[]> LookupAsync(IObservable<string> terms)
{
var svc = new DictServiceSoapClient();
var matchInDictAsync = Observable.FromAsyncPattern<string, string, string, DictionaryWord[]>(
svc.BeginMatchInDict, svc.EndMatchInDict);
var res = from term in terms
from words in matchInDictAsync("wn", term, "prefix")
select words;
return res;
}To convert this code to the new Task-based asynchronous methods that are generated on service proxies in .NET Framework 4.5 and beyond, we replace the obsolete FromAsyncPattern call:
static IObservable<DictionaryWord[]> LookupAsync(IObservable<string> terms)
{
var svc = new DictServiceSoapClient();
var res = from term in terms
from words in svc.MatchInDictAsync("wn", term, "prefix").ToObservable()
select words;
return res;
}In fact, because composing a multi-value event stream with a single-value asynchronous computation is very common, we provide overloads to SelectMany (used in the above by the multiple from clauses) that accept a Task<T> as the return type of the result selector. This allows us to simplify the code above even further to:
static IObservable<DictionaryWord[]> LookupAsync(IObservable<string> terms)
{
var svc = new DictServiceSoapClient();
var res = from term in terms
from words in svc.MatchInDictAsync("wn", term, "prefix")
select words;
return res;
}Note: In case no Task -based asynchronous method is available and you still need to bridge with a Begin/End method pair, replace FromAsyncPattern by Task.Factory.FromAsync, standardizing on Task<T> for single-value asynchronous computations. Using ToObservable, the resulting task object can be lifted into a stream.
Conversions for IProgress<T>
.NET Framework 4.5 introduces the IProgress<T> type used to report progress, e.g. from an asynchronous operation that’s in flight. The interface looks as follows:
interface IProgress<in T>
{
void Report(T progress);
}Given the resemblance with the IObserver<T> interface, it’s not surprising conversions can be provided. This allows using Rx for producing and consuming progress streams. Using ToObserver, a progress object can be wrapped as an observer which can subsequently be passed to an observable sequence that generates progress messages. The other way around, the ToProgress conversion can be used to turn an observer into a progress object, for example to use in combination with Observable.Create to expose progress as an observable sequence.
Example
In order to expose progress on an observable sequence, the code below uses a simple sampling technique to report integer values that reveal the number of elements observed in the sequence.
static IObservable<T> WithProgress<T>(this IObservable<T> source, TimeSpan progressFrequency, IProgress<int> progress)
{
return source.Publish(src =>
{
src.Select((_, i) => i).Sample(progressFrequency).Subscribe(progress.ToObserver());
return src;
});
}
NotifyOn on IObserver<T>
An extension method called NotifyOn has been added for the IObserver<T> interface, allowing notification messages on an observer to be run on a given scheduler. This method can come in handy when converting progress objects, to ensure messages are observed in the right scheduling context.
- Edited by Bart De Smet [MSFT] Thursday, October 13, 2011 8:05 AM Progress
Thursday, October 13, 2011 8:02 AM -
FromEventPattern Support for WinRT Events
The C# 5.0 and Visual Basic 11 compilers add support to consume WinRT events which are exposed using the same metadata as .NET events, but with different add and remove method signatures. When attaching event handlers using familiar language syntax, the required plumbing is totally invisible to the end-user. However, in order to make the Rx conversion from events to observable sequences work, we had to tweak the reflection-based FromEventPattern overloads. To the user of Rx, this feature can be best described as “it just works”.
Example
In the sample below, we import a KeyDown event on a UIElement in a Windows 8 Metro style application.
var vkeys = from key in Observable.FromEventPattern<KeyEventArgs>(txt, "KeyDown")
select key.EventArgs.Key;
var subscription = vkeys.Subscribe(vkey => {
// Handle key event
});Background
Classic .NET events consist of add and remove methods that accept a delegate. The event itself basically acts as a metadata wrapper around this pair of methods. For example, consider the following declaration:
event EventHandler<MyEventArgs> MyEvent;
Internally, two methods are defined that hook up or unhook the event handler from the underlying source, typically maintained as a multicast delegate:
void add_MyEvent(EventHandler<MyEventArgs> handler) { … }
void remove_MyEvent(EventHandler<MyEventArgs> handler) { … }An implication of this design is the need to remember the handler delegate (or the method and object it refers to) that was passed to the add method (exposed as += in C#), in order to be able to unhook it using the remove method (exposed as -= in C#). This differs from the Rx approach where the Subscribe call returns an IDisposable object that can be used to unsubscribe the observer that was passed in, without having to keep track of it.
WinRT uses an event registration mechanism that’s closer to Rx’s. When attaching an event handler, a handler delegate is passed, but an EventRegistrationToken is returned. Given that token, the handler can be removed at a later point in time. Under the covers, the add and remove methods look as follows:
EventRegistrationToken add_MyEvent(EventHandler<MyEventArgs> handler) { … }
void remove_MyEvent(EventRegistrationToken registrationToken) { … }In this release of Rx, this pattern is recognized when using the reflection-based overloads to FromEventPattern, wiring the observer to the event upon subscription and keeping track of the EventRegistrationToken to be used during the Dispose call on the IDisposable returned from the Subscribe method.
- Edited by Bart De Smet [MSFT] Thursday, October 13, 2011 8:17 AM More background
Thursday, October 13, 2011 8:02 AM -
FromEventPattern Relaxation of Constraints
With WinRT interoperability being a core theme for .NET Framework 4.5, the EventHandler<T> delegate type has been relaxed, no longer requiring its generic parameter to derive from the BCLs’ System.EventArgs base type. This allows for using event argument objects exposed by the Windows 8 APIs, which don’t inherit from this BCL type.
This release of Rx removes the generic constraint on the event arguments generic parameter in all overloads of the FromEventPattern conversion operator. For the reflection-based overloads, the subtyping relationship for discovered events is no longer enforced. This relaxation only applies to the Rx build for .NET Framework 4.5 and higher, mirroring the constraints (or absence thereof) in the framework.
Example
The sample below illustrates importing a .NET event that doesn’t have the EventArgs subtyping constraint.
static void Main()
{
var bar = new Bar();
var foo = Observable.FromEventPattern<DateTime>(h => bar.Foo += h, h => bar.Foo -= h);
// Use the event through an observable sequence
}
class Bar
{
public event EventHandler<DateTime> Foo;
}FromEventPattern Overloads with Strongly Typed TSender Parameter
WinRT introduces a TypedEventHandler delegate type that allows for strong typing of the sender parameter, in addition to the event arguments parameter. While this delegate type is not available in the .NET Framework 4.5 BCL directly (see further for information on System.Reactive.WindowsRuntime), we added support for strong typing of sender parameters in Rx as well.
The main change facilitating this enhancement is found in the new EventPattern<TSender, TEventArgs> type, and a series of FromEventPattern overloads that have an additional TSender generic parameter. The event pattern type is shown below:
public class EventPattern<TSender, TEventArgs> : IEquatable<EventPattern<TSender, TEventArgs>>
{
public EventPattern(TSender sender, TEventArgs e);
public TSender Sender { get; }
public TEventArgs EventArgs { get; }
...
}New overloads to FromEventPattern include the following:
public static IObservable<EventPattern<TSender, TEventArgs>>
FromEventPattern<TDelegate, TSender, TEventArgs>(Action<TDelegate> addHandler,
Action<TDelegate> removeHandler);
public static IObservable<EventPattern<TSender, TEventArgs>>
FromEventPattern<TSender, TEventArgs>(object target, string eventName);
public static IObservable<EventPattern<TSender, TEventArgs>>
FromEventPattern<TSender, TEventArgs>(Type type, string eventName);
Users of Rx should be immediately familiar with those overloads. The System.Reactive.WindowsRuntime assembly (see further) includes additional overloads that leverage the TypedEventHandler delegate type that’s available in WinRT, as well as a ToEventPattern conversion method:
public static IEventPatternSource<TSender, TEventArgs> ToEventPattern<TSender, TEventArgs>(
this IObservable<EventPattern<TSender, TEventArgs>> source);
This method returns an object with a strongly typed event, based on the following interface:
public interface IEventPatternSource<TSender, TEventArgs>
{
event TypedEventHandler<TSender, TEventArgs> OnNext;
}
using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }Thursday, October 13, 2011 8:15 AM -
Various Changes to System.Reactive
Minor changes to the System.Reactive assembly include:
· Addition of a ForEachAsync method with a loop body that includes an integer-valued index.
· New Create overloads, allowing the use of await inside the subscribe delegate.
· The IsEmpty operator is no longer marked as experimental.
· Rename of the (experimental) Start method to ToListObservable, to avoid confusion with the new StartAsync method discussed earlier in this document.
using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }Thursday, October 13, 2011 8:19 AM -
The New System.Reactive.WindowsRuntime Assembly
This release of Rx includes support for WinRT interop and to build Metro style applications leveraging Rx for event processing. WinRT-specific facilities are factored out in the new System.Reactive.WindowsRuntime assembly, which exists alongside a build of System.Reactive for the Metro profile of .NET Framework 4.5. Rx users that dealt with Windows Forms, WPF, or Silverlight will be familiar with a similar factoring of the Rx API surface.
WinRT Schedulers
In the System.Reactive.WindowsRuntime.Concurrency namespace, two IScheduler implementations are included that wrap WinRT scheduler functionality. The first one is called CoreDispatcherScheduler and wraps the CoreDispatcher used in Metro style UI applications (analogous to Dispatcher in WPF and Silverlight). The second one is called ThreadPoolScheduler and wraps the Windows.System.Threading.ThreadPool available in WinRT (analogous to ThreadPool in CLR).
Alongside those two scheduler types, extension methods are provided to synchronize on the CoreDispatcher UI message loop, called ObserveOnDispatcher and SubscribeOnDispatcher.
Example
The example below shows the use of the IScheduler abstraction to produce an observable sequence on the WinRT threadpool and observe its results using the CoreDispatcher associated with the current CoreWindow.
var time = from _ in Observable.Interval(TimeSpan.FromSeconds, ThreadPoolScheduler.Instance)
select DateTime.Now;
time.ObserveOnDispatcher().Subscribe(now => clock.Text = now.ToString());WinRT Events
Part of the support for WinRT-style events is built into System.Reactive, as we discussed earlier. The parts that depend on WinRT-specific types are factored out in the System.Reactive.WindowsRuntime assembly which can take dependencies on WinMD files. Those facilities include a set of FromEventPattern overloads defined on the WindowsObservable static class:
public static IObservable<EventPattern<TSender, TResult>>
FromEventPattern<TSender, TResult>(
Action<TypedEventHandler<TSender, TResult>> addHandler,
Action<TypedEventHandler<TSender, TResult>> removeHandler);
public static IObservable<EventPattern<TSender, TResult>>
FromEventPattern<TDelegate, TSender, TResult>(
Func<TypedEventHandler<TSender, TResult>, TDelegate> conversion,
Action<TDelegate> addHandler, Action<TDelegate> removeHandler);
A conversion to an object with a WinRT-style typed event is provided through the ToEventPattern conversion, discussed earlier.
Example
The following piece of code shows how to import the PointerMoved typed event defined on CoreWindow.
var pointerMoved = Observable.FromEventPattern<CoreWindow, PointerEventArgs>(
h => window.PointerMoved += h, h => window.PointerMoved -= h);- Edited by Bart De Smet [MSFT] Thursday, October 13, 2011 8:34 AM Web
Thursday, October 13, 2011 8:20 AM -
IAsyncInfo Conversions
WinRT’s asynchronous operations are based on the IAsyncInfo interface and its derived interface types. While the .NET Framework 4.5 BCL provides conversions to Task<T>, there’s value in providing IObservable<T> conversions as well for two reasons.
First of all, the IAsyncInfo interfaces provide a cold model for asynchrony, just like the IObservable<T> interface. Mapping an IAsyncInfo object onto an observable sequence can wrap the Start call inside the observable’s Subscribe call.
Secondly, WinRT provides for progress reporting operations, which can be modeled nicely as an event stream using Rx. In addition to this, some asynchronous operations allow for incremental retrieval of partial results, which maps nicely onto an observable sequence as well.
Conversions to IObservable<T>
The following conversions for asynchronous actions exist:
public static IObservable<Unit> ToObservable(this IAsyncAction source);
public static IObservable<Unit> ToObservable<TProgress>(
this IAsyncActionWithProgress<TProgress> source);
public static IObservable<Unit> ToObservable<TProgress>(
this IAsyncActionWithProgress<TProgress> source, IProgress<TProgress> progress);
public static IObservable<Unit> ToObservable<TProgress>(
this IAsyncActionWithProgress<TProgress> source, out IObservable<TProgress> progress);
public static IObservable<TProgress> ToObservableProgress<TProgress>(
this IAsyncActionWithProgress<TProgress> source);
Subscribing to the resulting sequences triggers execution of the underlying asynchronous action, either surfacing a single result (of type Unit) or the progress that has been reported.
For asynchronous operations, the following conversions are included:
public static IObservable<TResult> ToObservable<TResult>(this IAsyncOperation<TResult> source);
public static IObservable<TResult> ToObservable<TResult, TProgress>(
this IAsyncOperationWithProgress<TResult, TProgress> source);
public static IObservable<TResult> ToObservable<TResult, TProgress>(
this IAsyncOperationWithProgress<TResult, TProgress> source, IProgress<TProgress> progress);
public static IObservable<TResult> ToObservable<TResult, TProgress>(
this IAsyncOperationWithProgress<TResult, TProgress> source, out IObservable<TProgress> progress);
public static IObservable<TProgress> ToObservableProgress<TResult, TProgress>(
this IAsyncOperationWithProgress<TResult, TProgress> source);
public static IObservable<TResult> ToObservableMultiple<TResult, TProgress>(
this IAsyncOperationWithProgress<TResult, TProgress> source);
public static IObservable<TResult> ToObservableMultiple<TResult, TProgress>(
this IAsyncOperationWithProgress<TResult, TProgress> source, IProgress<TProgress> progress);
public static IObservable<TResult> ToObservableMultiple<TResult, TProgress>(
this IAsyncOperationWithProgress<TResult, TProgress> source, out IObservable<TProgress> progress);The first five methods have similar behavior to the conversions for asynchronous actions. The last three methods are used to wrap asynchronous operations that allow retrieval of partial results. During each progress notification on the underlying asynchronous operation, the GetResults method is called to retrieve partial results that are exposed in the returned observable sequence.
Conversions from IObservable<T>
Exposing an observable sequence for WinRT consumers (e.g. JavaScript) is supported as well, by mapping the sequence on an asynchronous operation object. The first conversion is fairly straightforward:
public static IAsyncOperation<TSource> ToAsyncOperation<TSource>(this IObservable<TSource> source);
Upon calling Start to the resulting asynchronous operation, a Subscribe call is made to the underlying source. The last result in the sequence (or an exception) is exposed on the operation.
A second conversion allows for progress reporting for each element produced in the source sequence, using integer-valued progress messages based on OnNext sequence numbering.
public static IAsyncOperationWithProgress<TSource, int> ToAsyncOperationWithProgress<TSource>(
this IObservable<TSource> source);To provide custom progress messages, the following overload exists:
public static IAsyncOperationWithProgress<TSource, TProgress> ToAsyncOperationWithProgress<TSource, TProgress>(
this IObservable<TSource> source,
Func<IObservable<TSource>, IObservable<TProgress>> getProgress);In the getProgress parameter, the source sequence is made available (using an internal Publish call to share the sequence) to create a progress sequence. An example use is to infer progress from source sequence sampling:
var operation = source.ToAsyncOperationWithProgress(src => src.Select((_, i) => i)
.Sample(TimeSpan.FromSeconds(1)));A more general-purpose overload is provided to allow for multiple result retrieval:
public static IAsyncOperationWithProgress<TResult, TProgress> ToAsyncOperationWithProgress<TSource, TProgress, TResult>(
this IObservable<TSource> source,
Func<IObservable<TSource>, IObservable<TProgress>> getProgress,
Func<IObservable<TSource>, IEnumerable<TResult>> getResults);The getResults parameter is wired up to the resulting operation’s GetResults method. By supplying a push-to-pull conversion operator, different behaviors can be implemented. For example, operators like MostRecent or Latest can be used.
One specialized overload is available to allow for incremental retrieval of chunks of the source sequence:
public static IAsyncOperationWithProgress<IList<TSource>, int> ToAsyncOperationWithProgressChunks<TSource>(this IObservable<TSource> source);
Consumers of the resulting asynchronous operation can call GetResults at various points in time (including during progress events) to retrieve the elements of the sequence that were received between the current and previous calls to GetResults. In other words, the concatenation of all the resulting list objects (some of which can be empty) equals the original sequence.
Chunkify and Collect Push-to-Pull Conversions
To enable retrieval of incremental results through the WinRT IAsyncOperationWithProgress type, two experimental operators have been added to System.Reactive, generalizing push-to-pull conversions. The first of those is Chunkify:
public static IEnumerable<IList<TSource>> Chunkify<TSource>(this IObservable<TSource> source);
The resulting enumerable sequence surfaces chunks of the underlying observable sequence as lists. Every MoveNext call retrieves the elements from the source sequence that were produced after the previous MoveNext call (or the beginning of the sequence). An example is shown below:
var xss = Observable.Interval(TimeSpan.FromSeconds(1))
.Chunkify();
foreach (var xs in xss)
{
Console.WriteLine(string.Join(", ", xs));
Console.ReadLine();
}The Collect operator generalizes all push-to-pull conversions
public static IEnumerable<TResult> Collect<TSource, TResult>(this IObservable<TSource> source,
Func<TResult> newCollector,
Func<TResult, TSource, TResult> merge);
public static IEnumerable<TResult> Collect<TSource, TResult>(this IObservable<TSource> source,
Func<TResult> getInitialCollector,
Func<TResult, TSource, TResult> merge,
Func<TResult, TResult> getNewCollector);For each element produced in the source sequence, a merge call is made to insert the element in the current collector object (e.g. a List<TResult> object). When the consumer calls MoveNext, the current collector object is returned and a new collector is allocated by invoking the newCollector function. An example to compute a running sum (with a long value for the collector) is shown below:
var res = Observable.Interval(TimeSpan.FromSeconds(1))
.Collect(() => 0L, (sum, x) => sum + x, sum => sum);
foreach (var x in res)
{
Console.WriteLine(x);
Console.ReadLine();
}
using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }Thursday, October 13, 2011 8:28 AM -
Awesome job. That was really fast for rolling out support for WinRT. You guys rockFriday, October 14, 2011 12:26 AM
-
Great!
And thank you for a detailed commentary.
There is an opinion about XxxAsync.
I see XxxAsync oddly.
For example, All,Max,ToList,etc returns IObservable<T>.
Therefore First,Last,Single should returns IObservable<T>.
Of course I understand that a breaking change is difficult.
By the way, Windows Phone 7.1 have IQueryable.
But Experimental(v1.1.11011.11) Windows Phone 7.1 doesn't contains System.Reactive.Providers.dll.
Is there the plan to include in the future?
Yoshifumi Kawai / neueccFriday, October 14, 2011 9:11 AM -
Thanks for the feedback. At this point - given we've had a stable release of the Rx API surface - we won't be able to remove the Async suffix on First, Last and Single. I agree that's unfortunate, but we'll have to deal with this for the time being.
Concerning packaging for Windows Phone 7.1, we'll likely have a stable release for new supported platforms in the near future, and will look for maximum API coverage at that point. This may include IQbservable support.
using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }Friday, October 14, 2011 10:04 AM -
Where does the installer put it's assemblies? I can't find them in C:\Program Files (x86)\Microsoft Reactive Extensions SDK\, nor in C:\Program Files (x86)\Microsoft Cloud Programmability\Reactive Extensions. Thanks.Monday, October 31, 2011 6:46 PM
-
Hi,
On my computer it's here:
C:\Program Files (x86)\Microsoft Reactive Extensions SDK\v1.1.11011\
- Dave
http://davesexton.com/blogMonday, October 31, 2011 7:11 PM -
Thank you.
I think I was running an old installer.
I've found them now that I've downloaded and run the latest.
Monday, October 31, 2011 7:17 PM