locked
UnSubscribe to a service RRS feed

  • Question

  • If a service drops, if I send a drop message to all of the services that I subscribed to, how can I then send a notice to the subscribe manager to let it know that the service has dropped and is no longer partnered?

     

     

    Sunday, May 27, 2007 6:48 PM

Answers

  • Thanks to Don for sending the repro case we have isolated the issue and indeed the subscription manager was not removing notifications on a specific type of subscription

    The fix will be available in the upcoming runtime update package for 1.5 Until then, just ignore the debug/trace warning message Smile It has no other side-effects

     

    g

    Thursday, November 29, 2007 2:49 AM

All replies

  • Before sending the subscribe request, specify a ShutDown port for the subscribe message. For example the Simple Dashboard does it like this:

    Code Snippet

     

    Port<Shutdown> _motorShutdown = null;

     

    IEnumerator<ITask> OnConnectMotorHandler(OnConnectMotor onConnectMotor)

    {

        _motorShutdown = new Port<Shutdown>();

     

        drive.Subscribe subscribe = new drive.Subscribe(

            new SubscribeRequestType()

        );

        subscribe.NotificationPort = _driveNotify;

        subscribe.NotificationShutdownPort = _motorShutdown;

     

        _drivePort.Post(subscribe);

        yield return Arbiter.Choice(

            subscribe.ResponsePort,

            delegate(SubscribeResponseType response)

            {

                LogInfo("Subscribed to " + onConnectMotor.Service);

            },

            delegate(Fault fault)

            {

                _motorShutdown = null;

                LogError(fault);

            }

        );

    }

     

    And whenever you wanted to unsubscribe, simply post a message to the ShutDown port.

    Code Snippet

    void DropHandler(DsspDefaultDrop drop)

    {

        PerformShutdown(ref _motorShutdown);

        base.DefaultDropHandler(drop);

    }

     

    void PerformShutdown(ref Port<Shutdown> port)

    {

        if (port == null)

            return;

        Shutdown shutdown = new Shutdown();

        port.Post(shutdown);

        port = null;

    }

     

    Monday, May 28, 2007 11:21 PM
  • Thank you!
    Tuesday, May 29, 2007 2:59 AM
  • After looking at it some more, I still have a question.

     

    Before, I just thought that the port.Post(shutdown) did something to automatically remove the subscription, but now I'm wondering how this happens.

     

    In your example, how does the Motor service get the message? The Motor service doesn't have a receiver for a Shutdown message or anything. How could the Motor service's subscription manager know about this shutdown post?

     

    Thanks,

    Don

    Tuesday, July 24, 2007 4:59 AM
  • Hi Don.  You are correct. 

     

    During the post of the subscription, the service forwarder infrastructure understands the subscription action and adds an Activation on the port you provided (_motorShutdown).  When you post a Shutdown message to that port, the infrastructure takes care of removing your subscription entry from the subscription manager.  All of this happens in the context of your service, as if you had done the activation yourself.

     

    Code Snippet

        subscribe.NotificationPort = _driveNotify;

        subscribe.NotificationShutdownPort = _motorShutdown;

        _drivePort.Post(subscribe);

     

    David 

    Tuesday, July 24, 2007 7:08 PM
  • Don, as David said the runtime is taking care of the activation. You can see the activation in reflector by looking at CreateSubscribeTarget method in DssRuntime.dll::Microsoft.Dss.Services.Forwarders.Dssp.DsspForwarder.

     

    Code Snippet
    private void CreateSubscriptionTarget(DsspOperation Sub, ForwarderEnvelope env)
    {
        ...
        if (subOp.NotificationShutdownPort != null)
        {
            Receiver<Shutdown>[] CS$0$0004 = new Receiver<Shutdown>[] { Arbiter.Receive<Shutdown>(false, subOp.NotificationShutdownPort, delegate (Shutdown S) {
                this.DeleteSubscription(body.Subscriber);
            }) };
            base.Activate<Receiver<Shutdown>>(CS$0$0004);
        }
        ...
    }

     

     Omid

    Tuesday, July 24, 2007 9:04 PM
  • Ah, thank you both very much, that's what I originally thought happened.

    Friday, July 27, 2007 1:10 PM
  • Hi, I'm now trying to do this with a SelectiveSubscribe, which builds up an Insert rather than using Subscribe Helper. The code is something like this:

     

    Code Block

    [ServiceHandler(ServiceHandlerBehavior.Exclusive)]

    public IEnumerator<ITask> SelectiveSubscribeHandler(testA.SelectiveSubscribe subscribeRequest)

    {

    submgr.InsertSubscription ins = new submgr.InsertSubscription(

    new submgr.InsertSubscriptionMessage(

    subscribeRequest.Body.Subscriber,

    subscribeRequest.Body.Expiration,

    0));

     

    ins.Body.NotificationCount = subscribeRequest.Body.NotificationCount;

     

    List<submgr.QueryType> subscribeFilter = new List<submgr.QueryType>();

     

    foreach (string message in subscribeRequest.Body.Messages)

    {

    subscribeFilter.Add(new submgr.QueryType(message));

    }

     

    ins.Body.QueryList = subscribeFilter.ToArray();

    _subMgr.Post(ins);

     

    yield return Arbiter.Choice(ins.ResponsePort,

    delegate(SubscribeResponseType rsp)

    {

    subscribeRequest.ResponsePort.Post(rsp);

    },

    delegate(soap.Fault fault)

    {

    subscribeRequest.ResponsePort.Post(fault);

    });

    yield break;

    }

     

     

    The subscriberequest object contains the required NotificationShutdownPort, yet I don't see how to pass that to the ins object, which is ultimately what gets sent to the subscription manager. Does anyone know how to do this?

     

    Thanks,

    Don

    Wednesday, November 21, 2007 5:40 AM
  • Hi Don, underneath everything is about manipulating the state of some service. In this case, the subscription manager is nothing more than a document that keeps a list of subscriptions.

     

    So if you manually insert, then you can also manually send a DeleteSubscription!

     

    In your case, when you want to drop the subscription, and since the infra is not going to be helping you (we only do this when you use our Subscribe operation) you will need to send directly a DeleteSubscription msg to the subscription manager

     

    thanx

    g

     

    Wednesday, November 21, 2007 11:39 PM
  • Ah OK, that shouldn't be too hard I guess.

     

    Thanks as always,

    Don

    Thursday, November 22, 2007 5:57 AM
  • Hi,

     

    I'm having more trouble with this than I thought.

     

    I'm trying to do something like this:

     

    Code Block

    [ServiceHandler(ServiceHandlerBehavior.Exclusive)]

    public IEnumerator<ITask> SelectiveSubscribeHandler(testA.SelectiveSubscribe subscribeRequest)

    {

    submgr.InsertSubscription ins = new submgr.InsertSubscription(

    new submgr.InsertSubscriptionMessage(

    subscribeRequest.Body.Subscriber,

    subscribeRequest.Body.Expiration,

    0));

     

    Activate(Arbiter.Receive<ccr.Shutdown>(false, subscribeRequest.NotificationShutdownPort,

    delegate(ccr.Shutdown shutdown)

    {

    LogInfo("Subscription Removed");

    submgr.DeleteSubscription del = new submgr.DeleteSubscription(

    new submgr.DeleteSubscriptionMessage(subscribeRequest.Body.Subscriber));

    _subMgr.Post(del);

     

    }));

     

    ins.Body.NotificationCount = subscribeRequest.Body.NotificationCount;

     

    List<submgr.QueryType> subscribeFilter = new List<submgr.QueryType>();

     

    foreach (string message in subscribeRequest.Body.Messages)

    {

    subscribeFilter.Add(new submgr.QueryType(message));

    }

     

    ins.Body.QueryList = subscribeFilter.ToArray();

    _subMgr.Post(ins);

     

    yield return Arbiter.Choice(ins.ResponsePort,

    delegate(SubscribeResponseType rsp)

    {

    subscribeRequest.ResponsePort.Post(rsp);

    },

    delegate(soap.Fault fault)

    {

    subscribeRequest.ResponsePort.Post(fault);

    });

    yield break;

    }

     

     

    But this doesn't work, now I can't even subscribe, I get a returned Fault.

     

    Any suggestions?

     

    -Don

    Friday, November 23, 2007 4:35 AM
  • i would make sure the insert subscribe message you send has the right fields set, from the incoming subscribe.

     

    Also make sure this is not a duplicate subscription. The fault message should have the details on what happened and the console/output service should have some spew.

     

     May I ask why you are not just using our SubscribeHelper routine, like our service tutorials do? While what you are doing is fine, it will save you a bunch of code.

     

    To unsubscribe, you can still manually send the DeleteSubscription.

    Friday, November 23, 2007 4:58 AM
  • George,

     

    I realize I was not being very specific in my last post.

     

    The subscribe routine runs fine in the absence of this addition:

     

    Code Block

    Activate(Arbiter.Receive<CCR.< FONT>Shutdown>(false, subscribeRequest.NotificationShutdownPort,

    delegate(ccr.Shutdown shutdown)

    {

    LogInfo("Subscription Removed");

    submgr.DeleteSubscription del = new submgr.DeleteSubscription(

    new submgr.DeleteSubscriptionMessage(subscribeRequest.Body.Subscriber));

    _subMgr.Post(del);

     

    }));

     

     

    I'm only sending a single subscribe request. I have basically setup two test services that do nothing but test this functionality. So the fault is occuring due to the code above. My purpose for adding this is to create a handler for a Shutdown() message that has been posted on the subscribeRequest.NotificationShutdownPort, which was set in the other service. The subscribing service posts a shutdown, which I have gotten help on previously:

     

    Code Block

    [ServiceHandler(ServiceHandlerBehavior.Teardown)]

    public virtual void DropHandler(DsspDefaultDrop drop)

    {

    LogError("Service A Shutdown");

    PerformShutdown(ref _CCShutdown);

    base.DefaultDropHandler(drop);

    }

    void PerformShutdown(ref Port<Shutdown> port)

    {

    if (port == null)

    {

    LogInfo("port was null!");

    return;

    }

     

    Shutdown shutdown = new Shutdown();

    port.Post(shutdown);

    port = null;

    }

     

     

     

    Here, _CCShutdown port was assigned to the subscribeRequest.NotificationShutdownPort.

     

    The reason I'm not using the SubscribeHelper, is because I was not aware that I could use the subscribe helper when I wanted to apply filters. I believe the code I have above (absent the Receive on the shutdown port) was inspired by the NXT example.

     

    My whole problem comes down to this: How do I setup a handler dynamically upon a subscribe that will handle a Shutdown post from the subscribing service. Keep in mind that the service being subscribed to does not have any knowledge about the subscring service, other than the information contained in the subscribeRequest message. I am curious as to how the subscription manager handles the receiving of the Shutdown port post.

    Friday, November 23, 2007 3:56 PM
  •  

    Ahha, the reason the subscription was failing is because I wasn't properly catching an exception.

     

    Turns out the subscribeRequest.NotificationShutdownPort is null on the receiving end. However, when sending the subscribe request, I do:

     

    Code Block

    ...

    if (_CCShutdown == null) LogError("Shutdown Port is Null");

    subReq.NotificationShutdownPort = _CCShutdown;

    ...

     

     

    And I do not see this LogError post on the debug page. The port isn't null when I attach it, but it's not making it through to the receiver! Now I'm even more confused.

     

    The entire subscribeRequest setup looks like this:

     

    Code Block

    testA.CustomSubscribeRequestType request = new testA.CustomSubscribeRequestType();

    request.Messages = entry.selectiveMessages;

     

    testA.SelectiveSubscribe subReq = new testA.SelectiveSubscribe();

    subReq.Body = request;

    subReq.NotificationPort = entry.serviceNotify;

     

    if (_CCShutdown == null) LogError("Shutdown Port is Null");

    subReq.NotificationShutdownPort = _CCShutdown;

     

    subReq.Body.NotificationCount = 0;

    subReq.NotificationPortConfiguration = new DsspOperationPortConfiguration(

    DsspOperationQueuingPolicy.DiscardWithFault,

    10,

    0);

     

    iPort.PostUnknownType(subReq);

     

    yield return Arbiter.Choice(subReq.ResponsePort,

    delegate(SubscribeResponseType response)

    {

    LogInfo(ServiceName + " subscribed to " + entry.friendlyName);

    entry.subscribed = true;

    entry.servicePort = iPort;

    },

    delegate(soap.Fault fault)

    {

    LogError(ServiceName + " Unable to subscribe to " + entry.friendlyName);

    entry.subscribed = false;

    }

    );

     

     

    It turns out that all of the subscribeRequest members are null!

     

    subscribeRequest.NotificationPort is null

    subscribeRequest.NotificationPortConfiguration is null

    subscirbeRequest.NotificationShutdownPort is null

     

    yet I set all of these to non null objects!

     

    Could the problem be related to the fact that I'm using PostUnknownType()?

     

    Now I'm even more confused. If .NotificationPort is null when the subscribe target receives the message, then how does the subscription manager know what port to use for sending messages back to the subscribing service.

     

    Sigh,

    Don

    Friday, November 23, 2007 5:04 PM
  • Let me explain a bit from the start.

     

    Subscriptions are handled at two levels:

    1) the first level, is the DSS forwarders that notice an outbound Subscribe *operation*. The SUbscribe Operation contains two fields that only make sense local to a node and a subscription manager NEVER sees: NotificationPort and NotificationShutdownPort. These two ports are *local* and are just helpers essential so the service subscribing can have a convenient way to receive notifications and also cancel the subscription.

     

    2) The second level is the subscription manager. THe subscription manager *never* sees a NotficationShutDownPort or a NotificatioNport instance since these fields have no meaning across node boundaries. Instead, all it sees is a SubscribeRequestType instance, with a URI that represents the subscriber (its a child URI of the subscriber service) where the subMgr will send notifications.

     

    Now, level 1, the DSS infra, will send a DeleteSubscription message when you post a Shutdown on the NotificationShutdownPort, in the context of the service that subscribed. So its just a local listener doing what your code was doing above (waiting for a shutdown before it send a DeleteSubscription)

     

    I have to emphasize that the SUbscribe operation and its fields are only meaningful for the service that send the original subscribe. If you send a subscribe to some other service, these fields in the Operation type (Subscribe) will ofcourse be null and are meaningless in some remote service. Only the SUbscribe.Body fields are useful.

     

    Also, you *can* use the SubscribeHelper and still support filters since the filters are part of your subscribeRequest body and we dont look at them, only the subscription manager does.

     

    Attached are same examples of subscribing with filters and also a handler that supports them:

     

    (Subscriber code, defining a custom Subscribe request type)

    Code Block

                Subscribe subscribe = new Subscribe();
                subscribe.Body.Tags = new string[] { Tag_Minute };
                subscribe.NotificationPort = notify;
                subscribe.NotificationShutdownPort = shutdown;

           publisher.Post(subscribe);

     

     

    Publisher service, implementing subscribe, using our subMgr as a partner:

               

    Code Block

    void SubscribeHandler(Subscribe subscribe)

    {

                submgr.InsertSubscriptionMessage request =

                    new submgr.InsertSubscriptionMessage(
                    subscribe.Body.Subscriber,
                    subscribe.Body.Expiration,
                    subscribe.Body.UseRegex ? submgr.FilterType.Regex :

                    submgr.FilterType.Default,
                    new submgr.QueryType(subscribe.Body.Tags)
                );
                submgr.InsertSubscription insert =

                    new submgr.InsertSubscription(request, subscribe.ResponsePort);

    // forward to our subMgr

                _submgrPort.Post(insert);

     

     

    And here is the type definition for subscribe

     

    Code Block

        [DataContract]
        public class SubscribeRequest : SubscribeRequestType
        {
            [DataMember]
            public int SubscriptionManagerInstance;

            [DataMember]
            public string[] Tags;

            [DataMember]
            public bool UseRegex;

            public SubscribeRequest()
            {
            }

     

     


     

    Saturday, November 24, 2007 4:57 AM
  •  

    Hi George, thanks for the explanation.

     

    I had almost the same thing you posted above, but I changed a few lines to make the code more succinct. I had one problem though.

     

    When I use:

    Code Block

     

    submgr.InsertSubscription insert =

                    new submgr.InsertSubscription(request, subscribe.ResponsePort);

     

     

    The yeild return which listens on the insert.Responseport hangs, and the service comes to a halt.

     

    When I instead use:

     

    Code Block

    submgr.InsertSubscription insert =

                    new submgr.InsertSubscription(request);

     

     

    the yeild return which listens on the insert.Responseport is successful, and everything continues correctly.

     

    I'm not sure why this could be, or what providing the subscribe.ResponsePort adds to the Insert message. My ResponsePort is a PortSet<SubscribeReponseType, Fault>, which is the correct type. I don't know why it fails.

     

    In any event... This pattern causes the subscription manager to correctly remove the subscription only when the two services are on the same DSS Nodes Perhaps I didn't make this clear before, but just incase it's relavent, my services are on different DSS Nodes. When the subscribing service drops, the sub Manager on the other node doesn't get the memo, and I get a barrage of notifications on my debug page saying:

     

    9 16:24:50 * ### TransportBig Smilessp.tcp://dgfslqc1:40001/:TcpTransportService:Error: Could not find port associated with URI (dssp.tcp://dgfslqc1:40001/subscribetesta/NotificationTarget/1a051c52-69ad-444f-871a-df290787b623) is not configured for inbound messages
    10 16:24:55 * ### TransportBig Smilessp.tcp://dgfslqc1:40001/:TcpTransportService:Error: Could not find port associated with URI (dssp.tcp://dgfslqc1:40001/subscribetesta/NotificationTarget/1a051c52-69ad-444f-871a-df290787b623) is not configured for inbound messages
    11 16:25:00 * ### TransportBig Smilessp.tcp://dgfslqc1:40001/:TcpTransportService:Error: Could not find port associated with URI (dssp.tcp://dgfslqc1:40001/subscribetesta/NotificationTarget/1a051c52-69ad-444f-871a-df290787b623) is not configured for inbound messages

    ...

     

     

     

    Sunday, November 25, 2007 9:26 PM
  • Hi don, you must yield to the response on an *outbound* operation, one that you just sent. Not the one you just received. That response port is for you to post a response, not yield. 

     

    This is why your second code snippet works. That operation is just sent out, and DSS will deliver the msg on its associated response port.

     

    Now on the delete subscription issue:

     

    After you send the delete Subscription message, can you please verify (do an HTTP get in your browser for example on the submgr URI) that the subscription is indeed removed?? It should not matter if the two nodes are local or remote but just in case, i would like to narrow this down.

     

    thanx

    g

     

    Monday, November 26, 2007 4:45 AM
  • George,

     

    I'm confused, in both cases I'm yeilding to a post to the subscription manager. I would expect to receive a response from the subscription manager in both cases, no?

     

    Here is the code, with the difference:

     

    case 1:

     

    Code Block

    public IEnumerator<ITask> SelectiveSubscribeHandler(testA.SelectiveSubscribe subscribeRequest)

    {

    submgr.InsertSubscriptionMessage request =

    new submgr.InsertSubscriptionMessage(

    subscribeRequest.Body.Subscriber,

    subscribeRequest.Body.Expiration,

    submgr.FilterType.Default,

    new submgr.QueryType(subscribeRequest.Body.Messages.ToArray())

    );

     

    submgr.InsertSubscription ins = new submgr.InsertSubscription(request);

    ins.Body.NotificationCount = subscribeRequest.Body.NotificationCount;

    _subMgr.Post(ins);

     

    yield return Arbiter.Choice(ins.ResponsePort,

    delegate(SubscribeResponseType rsp)

    {

    subscribeRequest.ResponsePort.Post(rsp);

    },

    delegate(soap.Fault fault)

    {

    subscribeRequest.ResponsePort.Post(fault);

    });

    yield break;

    }

     

     

    case 2:

     

    Code Block

    public IEnumerator<ITask> SelectiveSubscribeHandler(testA.SelectiveSubscribe subscribeRequest)

    {

    submgr.InsertSubscriptionMessage request =

    new submgr.InsertSubscriptionMessage(

    subscribeRequest.Body.Subscriber,

    subscribeRequest.Body.Expiration,

    submgr.FilterType.Default,

    new submgr.QueryType(subscribeRequest.Body.Messages.ToArray())

    );

     

    submgr.InsertSubscription ins = new submgr.InsertSubscription(request, subscribeRequest.ResponsePort);

    ins.Body.NotificationCount = subscribeRequest.Body.NotificationCount;

    _subMgr.Post(ins);

     

    yield return Arbiter.Choice(ins.ResponsePort,

    delegate(SubscribeResponseType rsp)

    {

    subscribeRequest.ResponsePort.Post(rsp);

    },

    delegate(soap.Fault fault)

    {

    subscribeRequest.ResponsePort.Post(fault);

    });

    yield break;

    }

     

     

    the only difference here is the insert declaration. case 1 succeeds, case 2 fails. I don't understand the difference.

     

    I have verified by using an HTTP get that the subscription is indeed removed (even when using different nodes). However, from the error message, it appears that some sort of message is still being forwarded.

    Monday, November 26, 2007 7:03 AM
  • The difference is that in case 2, you are reusing the responsePort from the subscribeOperation, and that is not valid. That response Port is associated with the inbound subscribe.

     

    submgr.InsertSubscription ins = new submgr.InsertSubscription(

        request,

        subscribeRequest.ResponsePort); // Dont do this!

     

    While we enable you to specify explicitly a responsePort, you can use one that is already part of another, *different* operation.

     

    So the right way, and most common pattern is this:

     

     

    submgr.InsertSubscription ins = new submgr.InsertSubscription(

        request);

     

    We will attempt a repro of what you are seeing and see if this is a DSS issue.

    Tuesday, November 27, 2007 12:11 AM
  • I george, thanks for the clarification on the insert. The reason I tried case 2, or whichever case it was that failed, is because on your sample code block from a previous post, you included subscribe.ResponsePort as the second parameter in the Insert declaration.

     

    As to the other problem. I created a few services to test this and other functionality. I wanted to get everything right before I started my new project. It basically consists of three services TestA, TestB, TestC. If you want, I can send you the files to make it easier on you to track down...

     

    Thanks,

    Don 

    Tuesday, November 27, 2007 1:05 AM
  • please send me the files (if they are just simple samples) and i can try to repro.

     

    Our BVT and stress test show that after a subscription is deleted, any more notifications are NOT sent, as expected. So we dont see this issue. One thing to do is is a GET on the subscription manager before the DeleteSubscription is sent, then a GET again after, and then verify the right subscriber entry is removed from the list of active subscriptions. Otherwise i cant see how this could happen

     

    Tuesday, November 27, 2007 10:39 PM
  • Thanks to Don for sending the repro case we have isolated the issue and indeed the subscription manager was not removing notifications on a specific type of subscription

    The fix will be available in the upcoming runtime update package for 1.5 Until then, just ignore the debug/trace warning message Smile It has no other side-effects

     

    g

    Thursday, November 29, 2007 2:49 AM
  • I was wondering if the issue of subscription manager not removing notifications sometimes has been fixed. I use the MRDS July CTP and in certain cases the subscription manager doesn't remove things even though a shutdown message has been posted.
    Monday, December 1, 2008 10:14 PM