none
Rx across processes

ตอบทั้งหมด

  • Hi,

    Yes, you can use .NET remoting to communicate across process boundaries.  Use the Remotable extension from Rx Experimental to convert your observable into an instance of MarshalByRefObject.  Process A can return the observable as part of a property on another MarshalByRefObject, which you'll create manually and register as a well-known service.  Process B can then acquire a reference to that observable via the transparent proxy that .NET generates automatically when you connect to the well-known service.

    Note, however, that the current implementation of Remotable does not provide a way for you to specify a lifetime service, so the default policy in .NET is used.  Consequently, unless your application continuously communicates within the default timeout period, you may find that your observable is sometimes prematurely disposed.  To avoid this you can create your own Remotable extension and use an appropriate lifetime service (e.g., return null in the InitializeLifetimeService method), or you can add some kind of keep-alive service that pushes ping notifications during times of application inactivity.

    - Dave


    http://davesexton.com/blog

    22 กุมภาพันธ์ 2555 18:52
  • Hi,

    Here's a working config-free example consisting of 3 separate projects.  Two of the projects build executable console applications (client and server) and another project builds a class library that is shared by the others.

    1. You must add a reference to Rx Experimental in all 3 projects to get the Remotable extension.  The NuGet package is named Rx_Experimental-Main, if you decide to use NuGet.
    2. You must add a reference to System.Runtime.Remoting.dll in all 3 projects below to get TcpChannel and related classes.  You'll find this .NET assembly in the standard Add Reference dialog.
    3. You may be prompted by Windows Firewall when running the applications.  Click Allow to let the applications communicate with each other.  If you're using a different software firewall, then you may have to manually open up the required TCP ports (they are embedded in the code below).  Note that you can use HttpChannel instead of TcpChannel, in case that helps.

    Project: ClassLibrary1
    File: MyService.cs

    using System;
    using System.Collections.Generic;
    using System.Reactive.Linq;
    using System.Runtime.Remoting.Channels;
    using System.Runtime.Remoting.Channels.Tcp;
    
    namespace ClassLibrary1
    {
    	public sealed class MyService : MarshalByRefObject
    	{
    		public static readonly Uri Url = new Uri("tcp://localhost:5012/MyService");
    
    		public TimeSpan DueTime
    		{
    			get;
    			set;
    		}
    
    		public IObservable<long> CountByTens
    		{
    			get
    			{
    				Console.WriteLine("Observable requested.");
    				Console.WriteLine();
    
    				return Observable
    					.Interval(DueTime)
    					.Select(value => value * 10)
    					.Take(5)
    					.Do(
    						value => Console.WriteLine("Generated: " + value),
    						ex => Console.WriteLine("Error: " + ex),
    						() => Console.WriteLine("Completed"))
    					.Finally(() => Console.WriteLine("Finally"))
    					.Remotable();
    			}
    		}
    
    		public static IChannel RegisterServerChannel()
    		{
    			return RegisterChannel(Url.Port);
    		}
    
    		public static IChannel RegisterChannel(int port)
    		{
    			var server = new BinaryServerFormatterSinkProvider()
    			{
    				TypeFilterLevel = System.Runtime.Serialization.Formatters.TypeFilterLevel.Full
    			};
    
    			var channel = new TcpChannel(
    				new Dictionary<string, object>()
    				{
    					{ "port", port }
    				},
    				new BinaryClientFormatterSinkProvider(),
    				server);
    
    			Console.WriteLine("Registering " + channel);
    
    			ChannelServices.RegisterChannel(channel, ensureSecurity: false);
    
    			Console.WriteLine("Registered.");
    			Console.WriteLine();
    
    			return channel;
    		}
    	}
    }

    Project: ProcessA
    Project Reference: ClassLibrary1
    File: Program.cs

    using System;
    using System.Runtime.Remoting;
    using System.Runtime.Remoting.Channels;
    using ClassLibrary1;
    
    namespace ProcessA
    {
    	public class Program
    	{
    		static void Main()
    		{
    			Console.WriteLine("Process A (Server)");
    			Console.WriteLine();
    
    			var channel = MyService.RegisterServerChannel();
    
    			var service = new MyService();
    
    			try
    			{
    				Console.WriteLine("Starting service...");
    				Console.WriteLine();
    
    				var uri = MyService.Url.GetComponents(UriComponents.Path, UriFormat.Unescaped);
    
    				var objRef = RemotingServices.Marshal(service, uri);
    
    				Console.WriteLine("Service started at " + objRef.URI);
    				Console.WriteLine();
    				Console.WriteLine("Press any key to stop.");
    
    				Console.ReadKey();
    			}
    			finally
    			{
    				RemotingServices.Disconnect(service);
    				ChannelServices.UnregisterChannel(channel);
    			}
    		}
    	}
    }

    Project: ProcessB
    Project Reference: ClassLibrary1
    File: Program.cs

    using System;
    using System.Runtime.Remoting.Channels;
    using ClassLibrary1;
    
    namespace ProcessB
    {
    	public class Program
    	{
    		static void Main()
    		{
    			Console.WriteLine("Process B (Client)");
    			Console.WriteLine();
    
    			var channel = MyService.RegisterChannel(port: 8020);
    
    			var uri = MyService.Url.AbsoluteUri;
    
    			try
    			{
    				Console.WriteLine("Press any key to connect to the server...");
    				Console.WriteLine();
    				Console.ReadKey();
    
    				Console.WriteLine("Connecting to " + uri);
    
    				var programA = (MyService) Activator.GetObject(typeof(MyService), uri);
    
    				Console.WriteLine("Connected.");
    				Console.WriteLine();
    				Console.WriteLine("Subscribing to observable...");
    				Console.WriteLine();
    
    				programA.DueTime = TimeSpan.FromSeconds(1);
    
    				var subscription = programA.CountByTens.Subscribe(
    					value => Console.WriteLine("Observed: " + value),
    					ex => Console.WriteLine("Error: " + ex),
    					() => Console.WriteLine("Completed"));
    
    				using (subscription)
    				{
    					Console.WriteLine("Press any key to stop...");
    					Console.ReadKey();
    				}
    			}
    			finally
    			{
    				ChannelServices.UnregisterChannel(channel);
    			}
    		}
    	}
    }

    - Dave


    http://davesexton.com/blog

    22 กุมภาพันธ์ 2555 20:41
  • Notice Rx v2.0 Beta has improvements in the field of remoting, with the Remotable operator now accepting an optional ILease object to control the sequence's lifetime. Other artifacts' lifetime - i.e. subscriptions and observers - are managed automatically and rely on aspect such as auto-detach behavior to reclaim resources.

    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }

    19 มีนาคม 2555 20:43
  • this is fantastic, however i need it to be a little bit more performant for it to be completely useful.

    do you happen to have an example that uses SharedMemory instead ? I can live with observables having to live within the machine boundary.

    26 มีนาคม 2555 13:05
  • how about extending Remotable in in such a way that the programmer gets to choose the remoting context..

    similar to the way that ObserveOn/SubscribeOn that takes a Scheduler,  extend the Remotable in such a way that it takes a  remoting context 

    something like: 

    RemotableOn(RemotingContext.SharedMemory, [extra args..])

    RemotableOn(RemotingContext.TcpChannel, [extra args..])

    26 มีนาคม 2555 13:19
  • Thanks for the feedback. We'll be looking into more remoting scenarios going forward and keep you posted on where we go.

    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }

    28 มีนาคม 2555 21:20