How to unsubscribe observer on a server side
-
Thursday, January 10, 2013 1:40 PM
Hi,
I have an application which uses RX via remoting with one server (observable) and many clients (observers). Server and clients are different computers.
My problem is when a client(observer) disconnects not properly without performing ubsubscribe(dispose), the OnNext() function on the server is starting to throw remoting exception.
Is there any mechanism to unsubsribe the problematic observer on the server side?
The part of client code:
internal void SetRemoting(bool refreshInstance)
{
string channelName = "RemotingClientUI";
IDictionary dict = new Hashtable();
dict["port"] = 9988;
dict["name"] = channelName;
var bcp = new BinaryClientFormatterSinkProvider();
var channel = new TcpClientChannel(dict, bcp);
ChannelServices.RegisterChannel(channel, false);
_remoteServer = (IRemoteServerService) Activator.
GetObject(typeof (IRemoteServerService),
tcp://...");
}
private void SubscribeToRemoteEvents(bool unSubscrubeFirst)
{
_jobRowUpdate = _remoteServer.JobRowUpdate.Subscribe(UpdateJobQueueRow);
_packageRowUpdate = _remoteServer.PackageRowUpdate.
Subscribe(UpdatePackageQueueRow);
_miscUpdate = _remoteServer.MiscAction.Subscribe(MiscRemoteActions);
}
The part of server code:
public class RemoteServiceService
{
public RemoteServiceService()
{
JobRowUpdate = LoggerFactory.GetLogger(
LoggerType.RemoteService, this).JobRowUpdate.Remotable();
PackageRowUpdate = LoggerFactory.GetLogger(
LoggerType.RemoteService, this).PackageRowUpdate.Remotable();
MiscAction = LoggerFactory.GetLogger(
LoggerType.RemoteService, this).MiscActions.Remotable();
}
}
public class RemoteLoggerForService
{
private RemoteLoggerForService(IService service)
{
_jobRowUpdate = new Subject<IJobQueueRow>();
_packageRowUpdate = new Subject<IPackageQueueRow>();
_miscActions = new Subject<MiscRemoteObjects>();
_service = service;
}
#region Overrides of LoggerBase
public override void WriteToLog<T>(T stringFormatOrObject,
params object[] args)
{
lock (this)
try
{
lock (LockLogger)
{
if (stringFormatOrObject is IJobQueueRow &&
_jobRowUpdate != null)
{
_jobRowUpdate.OnNext(
stringFormatOrObject as IJobQueueRow);
}
if (stringFormatOrObject is IPackageQueueRow &&
_packageRowUpdate != null)
{
_packageRowUpdate.OnNext(
stringFormatOrObject as IPackageQueueRow);
}
if (stringFormatOrObject is MiscRemoteObjects &&
_miscActions != null)
{
_miscActions.OnNext(
stringFormatOrObject as MiscRemoteObjects);
}
}
}
catch(Exception ex)
{
LoggerFactory.GetLogger(LoggerType.File, null).
WriteToLog(
Utils.GetFullException("RemoteLoggerForService", ex));
}
}
#endregion
}
All Replies
-
Thursday, January 10, 2013 5:25 PM
Hi,
In the future, it would be helpful if you posted a short but complete program that reproduces the problem.
To solve your problem, consider injecting a proxy that catches SocketException before calling Remotable.
For example, I was able to reproduce the problem first with the following program:
using System; using System.Collections.Generic; using System.Net.Sockets; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Runtime.Remoting; using System.Runtime.Remoting.Channels; using System.Runtime.Remoting.Channels.Tcp; using System.Runtime.Serialization.Formatters; namespace ConsoleApplication1 { class Program : MarshalByRefObject { public IObservable<long> Xs { get { return Observable.Create<long>(observer => { Console.WriteLine("Client subscribed."); var subscription = xs.Subscribe(observer); return new CompositeDisposable( Disposable.Create(() => Console.WriteLine("Client unsubscribed.")), subscription); }) .Remotable(null); } } private readonly Subject<long> xs = new Subject<long>(); private const int serverPort = 42424; private const int clientPort = serverPort + 1; private const string objectUri = "Program.rem"; private static readonly string url = "tcp://localhost:" + serverPort + "/" + objectUri; private IDisposable Start() { return Observable .Interval(TimeSpan.FromSeconds(1)) .Subscribe(xs); } static void Main() { Console.Write("Run as client? (Y/N)> "); string response = Console.ReadLine(); if (response.Length == 1) { switch (char.ToUpperInvariant(response[0])) { case 'Y': RunClient(); break; case 'N': RunServer(); break; } } } private static void RunClient() { Program program = null; using (CreateChannel(clientPort)) { program = (Program) Activator.GetObject(typeof(Program), url); var subscription = program.Xs.Subscribe(Console.WriteLine); var dispose = false; try { Console.WriteLine("Subscribed to remote observable."); Console.WriteLine("Press Q to stop gracefully."); Console.WriteLine("Press any other key to stop without notifying the server."); dispose = char.ToUpperInvariant(Console.ReadKey(intercept: true).KeyChar) == 'Q'; } finally { if (dispose) { Console.WriteLine("Shutting down gracefully..."); subscription.Dispose(); } } } } private static void RunServer() { ObjRef objRef = null; using (CreateChannel(serverPort)) { try { var program = new Program(); objRef = RemotingServices.Marshal(program, objectUri, typeof(Program)); using (program.Start()) { Console.WriteLine("Service started."); Console.WriteLine("Press any key to stop."); Console.ReadKey(intercept: true); } } finally { RemotingServices.Unmarshal(objRef); } } } private static IDisposable CreateChannel(int port) { var properties = new Dictionary<string, object>() { { "typeFilterLevel", TypeFilterLevel.Full.ToString() } }; var serverProvider = new BinaryServerFormatterSinkProvider(properties, null); var clientProvider = new BinaryClientFormatterSinkProvider(properties, null); var channelProperties = new Dictionary<string, object>() { { "port", port }, { "name", "program" } }; var channel = new TcpChannel(channelProperties, clientProvider, serverProvider); ChannelServices.RegisterChannel(channel, ensureSecurity: false); return Disposable.Create(() => ChannelServices.UnregisterChannel(channel)); } } }I started the program once as a server and again multiple times as clients. The first client I closed gracefully and the server had no problem. The second client I closed abruptly and the server crashed with a SocketException.
To solve the problem, I expanded the wrapper to catch SocketExceptions thrown by OnNext and relied on Rx's auto-detach behavior to remove the faulty client subscription, as follows.
var subscription = xs.Subscribe(observer);
becomes
var subscription = xs.Subscribe(value => { try { observer.OnNext(value); } catch (SocketException) { // Use the auto-detach behavior provided by Rx. observer.OnCompleted(); } }, observer.OnError, observer.OnCompleted);Does that solve your problem?
- Dave
- Marked As Answer by Yevgpr Sunday, January 13, 2013 10:46 AM
-
Sunday, January 13, 2013 10:16 AM
Hi Dave,
It's working.
Thank you very much!!
The only problem was that observer.OnCompleted() trying to send message to the client and crushes also.
So additional try..cach needed around it.Not very elegant, but I can't figure out about something else
Thank you again.

