LINQ to Cloud: TCP Qbservable Provider
-
Tuesday, April 24, 2012 7:07 PM
Hi everyone,
I've just deployed a new TCP Qbservable Provider library to the Rxx project, including complete source code and example applications. I'd appreciate any feedback you may have.
The library enables you to easily expose IQbservable<T> services over TCP. When a client subscribes, its query is serialized to the server as an expression tree and then executed on the server.
Warning: This is only a pre-Alpha release, so security hasn't been fully considered. Do not expose a Qbservable TCP service on a public server or client without taking the necessary precautions to secure them first. Unless of course you want anonymous clients to execute arbitrary code on your server, including basic stuff like being able to format your C drive :)
Features
- Simple server factory methods for hosting a Qbservable TCP service from an existing IObservable<T> or IQbservable<T>.
- Simple client factory methods for acquiring a Qbservable TCP service.
- Automatically serialized Expression trees.
- Automatically serialized anonymous types.
- Immediate evaluation of local members and closures (optional; default behavior)
- Full duplex local members and iterator blocks (optional; must opt-in on server)
- Full duplex observables.
- Designed with extensibility in mind; e.g., supports custom Qbservable service providers, protocols and sinks.
Example
The following example shows how to host an observable sequence over TCP and then consume that sequence with a LINQ query on a client computer. Subscribing to the query on the client causes the LINQ query to execute entirely on the server.
See the documentation for a longer explanation.
Server:
IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1)); var service = source.ServeQbservableTcp(new IPEndPoint(IPAddress.Loopback, 3205)); using (service.SubscribeEither( client => Console.WriteLine("Client shutdown."), ex => Console.WriteLine("Error: {0}", ex.Message), ex => Console.WriteLine("Fatal error: {0}", ex.Message), () => Console.WriteLine("This will never be printed because a service host never completes."))) { Console.ReadKey(); }Client:
var client = new QbservableTcpClient<long>(new IPEndPoint(IPAddress.Loopback, 3205)); IQbservable<long> query = from value in client.Query() where value <= 5 || value >= 8 select value; using (query.Subscribe( value => Console.WriteLine("Client observed: " + value), ex => Console.WriteLine("Error: {0}", ex.Message), () => Console.WriteLine("Completed"))) { Console.ReadKey(); }- Dave
All Replies
-
Wednesday, April 25, 2012 12:16 PMAwesome. :)
-
Wednesday, April 25, 2012 4:51 PM
Hi everyone,
I've just written a fairly in-depth blog post that you may find useful:
http://davesexton.com/blog/post/LINQ-to-Cloud-IQbservable-Over-the-Wire.aspx
Edit: I've started a series of posts listed here:
http://davesexton.com/blog/page/TCP-Qbservable-Provider-Series.aspx
And thanks for your support Richard!
- Dave
- Edited by Dave Sexton Wednesday, May 09, 2012 5:44 PM Added link to series
-
Wednesday, April 25, 2012 6:44 PM
Hi everyone,
Here's another example - the canonical chat application.
But this time there's a twist. The server exposes "hooks" for clients to query. This allows the client to do things like blacklisting on the server. Theoretically, a client could also ignore all messages and only send messages (like a Twitter feed), or only receive messages and never send any (eavesdropping). The semantics of the query is up to the client. The server merely respects the clients' wishes and ensures that messages are dispatched appropriately.
Overall, it's quite a bit of functionality and potential for very little code.
To test the apps, run a single server and then run multiple clients. Please let me know if you run into any trouble.
Shared Library:
using System; using System.Reactive; using System.Reactive.Linq; using System.Reactive.Subjects; namespace SharedLibrary { public sealed class ChatServiceHooks { public IObservable<string> OutgoingMessages { get { return messageDispatch.AsObservable(); } } public IObserver<string> IncomingMessages { get { return Observer.Create<string>( message => { messageDispatch.OnNext(userName + " said: " + message); }); } } private readonly string userName; private readonly ISubject<string> messageDispatch; public ChatServiceHooks(string userName, ISubject<string> messageDispatch) { this.userName = userName; this.messageDispatch = messageDispatch; } } }Server App:
using System; using System.Net; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Reactive.Subjects; using QbservableProvider; using SharedLibrary; namespace QbservableServer { class ChatService { private static readonly IPEndPoint endPoint = new IPEndPoint(IPAddress.Loopback, port: 19841); static void Main() { var messageDispatch = new Subject<string>(); var service = QbservableTcpServer.CreateService<string, ChatServiceHooks>( endPoint, QbservableServiceOptions.EnableDuplex, request => from userName in request from hooks in Observable.Create<ChatServiceHooks>( observer => { messageDispatch.OnNext(userName + " is online."); var hooks = new ChatServiceHooks(userName, messageDispatch); Scheduler.CurrentThread.Schedule(() => observer.OnNext(hooks)); return () => messageDispatch.OnNext(userName + " is offline."); }) select hooks); using (service.SubscribeEither( client => Console.WriteLine("Chat service acknowledged client shutdown."), ex => Console.WriteLine("Chat service error: " + ex.Message), ex => Console.WriteLine("Chat service fatal error: " + ex.Message), () => Console.WriteLine("This will never be printed because a service host never completes."))) { Console.ReadKey(); } } }Client App:
using System; using System.Linq; using System.Reactive.Linq; using System.Reactive.Subjects; using QbservableProvider; using SharedLibrary; namespace QbservableClient { class ChatClient { static void Main() { var client = new QbservableTcpClient<ChatServiceHooks>(new IPEndPoint(IPAddress.Loopback, port: 19841), typeof(ChatServiceHooks)); Console.WriteLine(); Console.Write("Enter your user name> "); string userName = Console.ReadLine(); Console.Write("Enter blacklist names separated by commas> "); string[] userNamesToIgnore = Console.ReadLine().Split(','); var myMessages = new Subject<string>(); IObservable<string> outgoingMessages = myMessages; IQbservable<string> query = (from hooks in client.Query(userName) .Do(hooks => outgoingMessages.Subscribe(hooks.IncomingMessages)) from message in hooks.OutgoingMessages where !userNamesToIgnore.Any(message.StartsWith) select message); using (query.Subscribe( message => Console.WriteLine(message), ex => Console.WriteLine("Chat client error: " + ex.Message), () => Console.WriteLine("Chat client completed"))) { string message; while ((message = Console.ReadLine()).Length > 0) { myMessages.OnNext(message); } } } } }- Dave
- Edited by Dave Sexton Thursday, April 26, 2012 6:39 AM Refactored one small line to improve readability
-
Thursday, April 26, 2012 12:30 AMThat is beautiful code ... so busy ... must find time ... but I will test it out.
-
Thursday, April 26, 2012 2:03 PM
Hi everyone,
I've just refreshed the release to fix a few small usability bugs and to include the latest timer and chat examples in the example apps.
Details of the update can be found on the Rxx Source Code tab in the notes of my last several check-ins.
- Dave
-
Tuesday, May 15, 2012 3:09 PM
Nice one Dave. I was helping a friend do a Uni C++ TPC Chat project and I was thinking that it was gagging to be done in LINQ/Rx. Great work on all of the Rxx project.
Lee Campbell http://LeeCampbell.blogspot.com

