LINQ to Cloud: TCP Qbservable Provider

Allgemeine Diskussion LINQ to Cloud: TCP Qbservable Provider

  • Dienstag, 24. April 2012 19:07
     
      Enthält Code

    Hi everyone,

    I've just deployed a new TCP Qbservable Provider library to the Rxx project, including complete source code and example applications.  I'd appreciate any feedback you may have.

    The library enables you to easily expose IQbservable<T> services over TCP.  When a client subscribes, its query is serialized to the server as an expression tree and then executed on the server.

    Warning: This is only a pre-Alpha release, so security hasn't been fully considered.  Do not expose a Qbservable TCP service on a public server or client without taking the necessary precautions to secure them first.  Unless of course you want anonymous clients to execute arbitrary code on your server, including basic stuff like being able to format your C drive :)

    Features

    • Simple server factory methods for hosting a Qbservable TCP service from an existing IObservable<T> or IQbservable<T>.
    • Simple client factory methods for acquiring a Qbservable TCP service.
    • Automatically serialized Expression trees.
    • Automatically serialized anonymous types.
    • Immediate evaluation of local members and closures (optional; default behavior)
    • Full duplex local members and iterator blocks (optional; must opt-in on server)
    • Full duplex observables.
    • Designed with extensibility in mind; e.g., supports custom Qbservable service providers, protocols and sinks.

    Example

    The following example shows how to host an observable sequence over TCP and then consume that sequence with a LINQ query on a client computer.  Subscribing to the query on the client causes the LINQ query to execute entirely on the server.

    See the documentation for a longer explanation.

    Server:

    IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1));
    
    var service = source.ServeQbservableTcp(new IPEndPoint(IPAddress.Loopback, 3205));
    
    using (service.SubscribeEither(
    	client => Console.WriteLine("Client shutdown."),
    	ex => Console.WriteLine("Error: {0}", ex.Message),
    	ex => Console.WriteLine("Fatal error: {0}", ex.Message),
    	() => Console.WriteLine("This will never be printed because a service host never completes.")))
    {
    	Console.ReadKey();
    }

    Client:

    var client = new QbservableTcpClient<long>(new IPEndPoint(IPAddress.Loopback, 3205));
    
    IQbservable<long> query =
    	from value in client.Query()
    	where value <= 5 || value >= 8
    	select value;
    
    using (query.Subscribe(
    	value => Console.WriteLine("Client observed: " + value),
    	ex => Console.WriteLine("Error: {0}", ex.Message),
    	() => Console.WriteLine("Completed")))
    {
    	Console.ReadKey();
    }

    - Dave


    http://davesexton.com/blog

Alle Antworten

  • Mittwoch, 25. April 2012 12:16
     
     
    Awesome. :)
  • Mittwoch, 25. April 2012 16:51
     
     

    Hi everyone,

    I've just written a fairly in-depth blog post that you may find useful:

    http://davesexton.com/blog/post/LINQ-to-Cloud-IQbservable-Over-the-Wire.aspx

    Edit: I've started a series of posts listed here:

    http://davesexton.com/blog/page/TCP-Qbservable-Provider-Series.aspx

    And thanks for your support Richard!

    - Dave


    http://davesexton.com/blog

    • Bearbeitet Dave Sexton Mittwoch, 9. Mai 2012 17:44 Added link to series
    •  
  • Mittwoch, 25. April 2012 18:44
     
      Enthält Code

    Hi everyone, 

    Here's another example - the canonical chat application.

    But this time there's a twist.  The server exposes "hooks" for clients to query.  This allows the client to do things like blacklisting on the server.  Theoretically, a client could also ignore all messages and only send messages (like a Twitter feed), or only receive messages and never send any (eavesdropping).  The semantics of the query is up to the client.  The server merely respects the clients' wishes and ensures that messages are dispatched appropriately.

    Overall, it's quite a bit of functionality and potential for very little code.

    To test the apps, run a single server and then run multiple clients.  Please let me know if you run into any trouble.

    Shared Library:

    using System;
    using System.Reactive;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    
    namespace SharedLibrary
    {
    	public sealed class ChatServiceHooks
    	{
    		public IObservable<string> OutgoingMessages
    		{
    			get
    			{
    				return messageDispatch.AsObservable();
    			}
    		}
    
    		public IObserver<string> IncomingMessages
    		{
    			get
    			{
    				return Observer.Create<string>(
    					message =>
    					{
    						messageDispatch.OnNext(userName + " said: " + message);
    					});
    			}
    		}
    
    		private readonly string userName;
    		private readonly ISubject<string> messageDispatch;
    
    		public ChatServiceHooks(string userName, ISubject<string> messageDispatch)
    		{
    			this.userName = userName;
    			this.messageDispatch = messageDispatch;
    		}
    	}
    }

    Server App:

    using System;
    using System.Net;
    using System.Reactive;
    using System.Reactive.Concurrency;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using QbservableProvider;
    using SharedLibrary;
    
    namespace QbservableServer
    {
    	class ChatService
    	{
    		private static readonly IPEndPoint endPoint = new IPEndPoint(IPAddress.Loopback, port: 19841);
    
    		static void Main()
    		{
    			var messageDispatch = new Subject<string>();
    
    			var service = QbservableTcpServer.CreateService<string, ChatServiceHooks>(
    				endPoint,
    				QbservableServiceOptions.EnableDuplex,
    				request =>
    					from userName in request
    					from hooks in Observable.Create<ChatServiceHooks>(
    						observer =>
    						{
    							messageDispatch.OnNext(userName + " is online.");
    
    							var hooks = new ChatServiceHooks(userName, messageDispatch);
    
    							Scheduler.CurrentThread.Schedule(() => observer.OnNext(hooks));
    
    							return () => messageDispatch.OnNext(userName + " is offline.");
    						})
    					select hooks);
    
    			using (service.SubscribeEither(
    				client => Console.WriteLine("Chat service acknowledged client shutdown."),
    				ex => Console.WriteLine("Chat service error: " + ex.Message),
    				ex => Console.WriteLine("Chat service fatal error: " + ex.Message),
    				() => Console.WriteLine("This will never be printed because a service host never completes.")))
    			{
    				Console.ReadKey();
    			}
    	}
    }

    Client App:

    using System;
    using System.Linq;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using QbservableProvider;
    using SharedLibrary;
    
    namespace QbservableClient
    {
    	class ChatClient
    	{
    		static void Main()
    		{
    			var client = new QbservableTcpClient<ChatServiceHooks>(new IPEndPoint(IPAddress.Loopback, port: 19841), typeof(ChatServiceHooks));
    
    			Console.WriteLine();
    			Console.Write("Enter your user name> ");
    			string userName = Console.ReadLine();
    
    			Console.Write("Enter blacklist names separated by commas> ");
    			string[] userNamesToIgnore = Console.ReadLine().Split(',');
    
    			var myMessages = new Subject<string>();
    
    			IObservable<string> outgoingMessages = myMessages;
    
    			IQbservable<string> query =
    				(from hooks in client.Query(userName)
    					 .Do(hooks => outgoingMessages.Subscribe(hooks.IncomingMessages))
    				 from message in hooks.OutgoingMessages
    				 where !userNamesToIgnore.Any(message.StartsWith)
    				 select message);
    
    			using (query.Subscribe(
    				message => Console.WriteLine(message),
    				ex => Console.WriteLine("Chat client error: " + ex.Message),
    				() => Console.WriteLine("Chat client completed")))
    			{
    				string message;
    				while ((message = Console.ReadLine()).Length > 0)
    				{
    					myMessages.OnNext(message);
    				}
    			}
    		}
    	}
    }

    - Dave


    http://davesexton.com/blog

    • Bearbeitet Dave Sexton Donnerstag, 26. April 2012 06:39 Refactored one small line to improve readability
    •  
  • Donnerstag, 26. April 2012 00:30
     
     
    That is beautiful code ... so busy ... must find time ... but I will test it out.
  • Donnerstag, 26. April 2012 14:03
     
     

    Hi everyone,

    I've just refreshed the release to fix a few small usability bugs and to include the latest timer and chat examples in the example apps.

    Details of the update can be found on the Rxx Source Code tab in the notes of my last several check-ins.

    - Dave


    http://davesexton.com/blog

  • Dienstag, 15. Mai 2012 15:09
     
     

    Nice one Dave. I was helping a friend do a Uni C++ TPC Chat project and I was thinking that it was gagging to be done in LINQ/Rx. Great work on all of the Rxx project.


    Lee Campbell http://LeeCampbell.blogspot.com