none
WCF 通道故障处理 RRS feed

  • 问题

  •   我的项目采用的是基于事件的订阅-发布模式,主要是实现客户端与服务端的即时数据交互,客户端通过订阅事件以待接收数据,服务端通过发布服务将即时数据发布至服务,服务通知订阅了事件的客户端。问题是这样的:软件在网络情况较差的情况下,客户端与服务端的数据交换在一段时间后便停止,这个时候自动如何重新与服务器建立通信。

      当服务端与客户端通讯中止的时候,客户端已经故障的情况下,服务端还是对客户端之前保存在服务端的引用进行数据发布,我的问题是如何在这种情况下服务端主动删除无效的客户端引用。

      我的思路:客户端与服务端数据交换停止,可能是信道故障,但是不知道怎么自动修复,以及该怎么处理。

    2010年5月5日 5:56

答案

  • 这个应该在哪里判断,又该怎么判断呢?因为这个模式 我也是参考的别人的例子,所以有很多不懂的地方。

    static void Invoke(T subscriber, string methodName, object[] args)
        {
          Debug.Assert(subscriber != null);
          Type type = typeof(T);
          MethodInfo methodInfo = type.GetMethod(methodName);
          try
          {
            methodInfo.Invoke(subscriber, args);
          }
          catch (Exception e)
          {
            Trace.WriteLine(e.Message);
          }
        }
    这个方法主要是用于回调的,怎么在回调前加入判断呢?

     


    这样判断:

    if (((IChannel)subscriber).State != CommunicationState.Opened)

    {

       // 客户端通道已关闭或出错,从字典里删除 subscriber.

    }

    2010年5月23日 4:46

全部回复

  • 我有些Wcf的问题想问一下请给点帮助!!已订阅

    前几天,HahaWCF, Mog还有我一起讨论过这个问题。

    就是客户端断开连接以后,服务实例不会被马上回收。

    默认是10分钟,然后回收。

    或许有点帮助。

     

    另外 通道问题,重新连接,你服务实例激活类型是什么,重新连接也需要考虑

     

     


    Frank Xu Lei--谦卑若愚,好学若饥
    专注于.NET平台下分布式应用系统开发和企业应用系统集成
    Focus on Distributed Applications Development and EAI based on .NET
    欢迎访问老徐的中文技术博客:Welcome to My Chinese Technical Blog
    欢迎访问微软WCF中文技术论坛:Welcome to Microsoft Chinese WCF Forum
    欢迎访问微软WCF英文技术论坛:Welcome to Microsoft English WCF Forum
    2010年5月5日 10:11
    版主
  • 服务实例激活类型是PerSession,因为客户端和服务端要频繁的交换数据,所以选择有状态的会话通道。

    看了你给的帖子连接,那也就是说,服务端会在一定时间内销魂客户端的引用吗?

    还有2个问题:

    1.但客户端与服务端正在进行数据交互的时候,如果这个时候关闭客户端,客户端调用了取消订阅事件的方法,这个时候非常卡,整个UI界面都无响应了,另外在这种情况下,我在服务端(另一台机器)也用的VS调试,发现客户端根本就没能调用到那个方法。相反,当客户端仅仅是订阅了事件,而服务端没有发布事件的时候,这时候客户端可以正常的取消订阅。这是怎么回事呢?

    2.就是客户端与服务端数据交换在开启了可靠会话的情况下,如果客户端在关闭的时候调用取消订阅事件,UI无相应,强制关闭后,后续启动的客户端可以正常订阅事件,但根本无法接受服务端发送的数据了,很奇怪。但是这种情况在没有启动可靠会话的情况下就没事了,不管客户端是否正常关闭或正常取消订阅,后续启动的客户端都可以正常接受数据,不知道这是怎么回事?

    以下是我的部分代码:

    发布-订阅类

    public abstract class SubscriptionManager<T> where T : class
      {
        static Dictionary<string, List<T>> m_TransientStore;
    
        static SubscriptionManager()
        {
          m_TransientStore = new Dictionary<string, List<T>>();
          string[] methods = GetOperations();
          Action<string> insert = delegate(string methodName)
          {
            m_TransientStore.Add(methodName, new List<T>());
          };
          Array.ForEach(methods, insert);
        }
        static string[] GetOperations()
        {
          MethodInfo[] methods = typeof(T).GetMethods(BindingFlags.Public | BindingFlags.FlattenHierarchy | BindingFlags.Instance);
          List<string> operations = new List<string>(methods.Length);
    
          Action<MethodInfo> add = delegate(MethodInfo method)
          {
            Debug.Assert(!operations.Contains(method.Name));
            operations.Add(method.Name);
          };
          Array.ForEach(methods, add);
          return operations.ToArray();
        }
    
        //Transient subscriptions management 
        internal static T[] GetTransientList(string eventOperation)
        {
          lock (typeof(SubscriptionManager<T>))
          {
            List<T> list = m_TransientStore[eventOperation];
            return list.ToArray();
          }
        }
        static void AddTransient(T subscriber, string eventOperation)
        {
          lock (typeof(SubscriptionManager<T>))
          {
            List<T> list = m_TransientStore[eventOperation];
            if (list.Contains(subscriber))
            {
              return;
            }
            list.Add(subscriber);
          }
        }
        static void RemoveTransient(T subscriber, string eventOperation)
        {
          lock (typeof(SubscriptionManager<T>))
          {
            List<T> list = m_TransientStore[eventOperation];
            list.Remove(subscriber);
          }
        }
    
        public void Subscribe(string eventOperation)
        {
          lock (typeof(SubscriptionManager<T>))
          {
            T subscriber = OperationContext.Current.GetCallbackChannel<T>();
            if (String.IsNullOrEmpty(eventOperation) == false)
            {
              AddTransient(subscriber, eventOperation);
            }
            else
            {
              string[] methods = GetOperations();
              Action<string> addTransient = delegate(string methodName)
              {
                AddTransient(subscriber, methodName);
              };
              Array.ForEach(methods, addTransient);
            }
          }
        }
    
        public void Unsubscribe(string eventOperation)
        {
          lock (typeof(SubscriptionManager<T>))
          {
            T subscriber = OperationContext.Current.GetCallbackChannel<T>();
            if (String.IsNullOrEmpty(eventOperation) == false)
            {
              RemoveTransient(subscriber, eventOperation);
            }
            else
            {
              string[] methods = GetOperations();
              Action<string> removeTransient = delegate(string methodName)
              {
                RemoveTransient(subscriber, methodName);
              };
              Array.ForEach(methods, removeTransient);
            }
          }
        }  
      }

     发布服务类

    public abstract class PublishService<T> where T : class
      {
        protected static void FireEvent(params object[] args)
        {
          StackFrame stackFrame = new StackFrame(1);
          string methodName = stackFrame.GetMethod().Name;
    
          PublishTransient(methodName, args);
        }
    
        static void FireEvent(string methodName, params object[] args)
        {
          PublishTransient(methodName, args);
        }
    
        static void PublishTransient(string methodName, params object[] args)
        {
          T[] subscribers = SubscriptionManager<T>.GetTransientList(methodName);
          Publish(subscribers, false, methodName, args);
        }
    
        static void Publish(T[] subscribers, bool closeSubscribers, string methodName, params object[] args)
        {
          WaitCallback fire = delegate(object subscriber)
          {
            Invoke(subscriber as T, methodName, args);
            if (closeSubscribers)
            {
              using (subscriber as IDisposable)
              { }
            }
          };
          Action<T> queueUp = delegate(T subscriber)
          {
            ThreadPool.QueueUserWorkItem(fire, subscriber);
          };
          Array.ForEach(subscribers, queueUp);
        }
        static void Invoke(T subscriber, string methodName, object[] args)
        {
          Debug.Assert(subscriber != null);
          Type type = typeof(T);
          MethodInfo methodInfo = type.GetMethod(methodName);
          try
          {
            methodInfo.Invoke(subscriber, args);
          }
          catch (Exception e)
          {
            Trace.WriteLine(e.Message);
          }
        }
      }

    服务端配置文件:

    <?xml version="1.0"?>
    <configuration>
    <system.serviceModel>
      <bindings>
       <netTcpBinding>
        <binding name="BindingBehaviorConfiguration" receiveTimeout="00:10:00" maxBufferSize="65536"
             maxReceivedMessageSize="65536" transferMode="Buffered" maxBufferPoolSize="65536"
             closeTimeout="00:10:00" sendTimeout="00:10:00" openTimeout="00:10:00">
         <readerQuotas maxArrayLength="65536" maxStringContentLength="65536" maxBytesPerRead="65536"/>
    <reliableSession enabled="true"/>
         <security mode="None">
          <transport clientCredentialType="Windows"/>
          <message clientCredentialType="Windows"/>
         </security>
        </binding>
       </netTcpBinding>
      </bindings>
      <services>
       <service behaviorConfiguration="serviceBehavior" name="HK.Globex.Services.MySubscriptionService">
        <endpoint address="MySubscriptionService" binding="netTcpBinding" bindingConfiguration="BindingBehaviorConfiguration" contract="HK.Globex.Contracts.IMySubscriptionService">
         <identity>
          <dns value="localhost"/>
         </identity>
        </endpoint>
        <endpoint address="mex" binding="mexTcpBinding" bindingConfiguration="" contract="IMetadataExchange">
         <identity>
          <dns value="localhost"/>
         </identity>
        </endpoint>
        <host>
         <baseAddresses>
          <add baseAddress="net.tcp://192.168.1.2:9000"/>
         </baseAddresses>
        </host>
       </service>
       <service behaviorConfiguration="serviceBehavior" name="HK.Globex.Services.MyPublishService">
        <endpoint address="MyPublishService" binding="netTcpBinding" bindingConfiguration="BindingBehaviorConfiguration" contract="HK.Globex.Contracts.IMyEvents">
         <identity>
          <dns value="localhost"/>
         </identity>
        </endpoint>
        <endpoint address="mex" binding="mexTcpBinding" bindingConfiguration="" contract="IMetadataExchange">
         <identity>
          <dns value="localhost"/>
         </identity>
        </endpoint>
        <host>
         <baseAddresses>
          <add baseAddress="net.tcp://192.168.1.2:9001"/>
         </baseAddresses>
        </host>
       </service>
      </services>
      <behaviors>
       <serviceBehaviors>
        <behavior name="serviceBehavior">
         <serviceMetadata/>
         <serviceDebug includeExceptionDetailInFaults="true"/>
         <serviceThrottling maxConcurrentCalls="1000" maxConcurrentInstances="1000" maxConcurrentSessions="1000"/>
        </behavior>
       </serviceBehaviors>
      </behaviors>
     </system.serviceModel>
    </configuration>
    

    发布者配置文件:

    <?xml version="1.0"?>
    <configuration>
     <system.serviceModel>
       <bindings>
       <netTcpBinding>
        <binding name="NetTcpBinding_MyEventsContract" closeTimeout="00:10:00"
             openTimeout="00:10:00" receiveTimeout="00:10:00" sendTimeout="00:10:00"
             transactionFlow="false" transferMode="Buffered" transactionProtocol="OleTransactions"
             hostNameComparisonMode="StrongWildcard" listenBacklog="10"
             maxBufferPoolSize="65536" maxBufferSize="65536" maxConnections="10"
             maxReceivedMessageSize="65536">
         <reliableSession ordered="false" inactivityTimeout="00:10:00"
                enabled="true" />
         <readerQuotas maxDepth="32" maxStringContentLength="65536" maxArrayLength="65536"
           maxBytesPerRead="65536" maxNameTableCharCount="65536" />
         <security mode="None">
          <transport clientCredentialType="Windows" protectionLevel="EncryptAndSign" />
          <message clientCredentialType="Windows" />
         </security>
        </binding>
       </netTcpBinding>
      </bindings>
      <client>
       <endpoint address="net.tcp://192.168.1.2:9001/MyPublishService"
            binding="netTcpBinding" bindingConfiguration="NetTcpBinding_MyEventsContract"
            contract="MyEventsContract" name="NetTcpBinding_MyEventsContract">
        <identity>
         <dns value="localhost" />
        </identity>
       </endpoint>
      </client>
     </system.serviceModel>
    </configuration>
    

    客户端配置文件:

    <?xml version="1.0"?>
    <configuration>
     <system.serviceModel>
        <bindings>
        <netTcpBinding>
         <binding name="NetTcpBinding_MySubscriptionContract" closeTimeout="00:10:00"
           openTimeout="00:10:00" receiveTimeout="00:10:00" sendTimeout="00:10:00"
           transactionFlow="false" transferMode="Buffered" transactionProtocol="OleTransactions"
           hostNameComparisonMode="StrongWildcard" listenBacklog="10"
           maxBufferPoolSize="65536" maxBufferSize="65536" maxConnections="10"
           maxReceivedMessageSize="2147483647">
          <readerQuotas maxDepth="32" maxStringContentLength="2147483647" maxArrayLength="2147483647"
            maxBytesPerRead="4096" maxNameTableCharCount="2147483647" />
          <reliableSession ordered="false" inactivityTimeout="00:10:00"
                enabled="true" />
          <security mode="None">
           <transport clientCredentialType="Windows" protectionLevel="EncryptAndSign" />
           <message clientCredentialType="Windows" />
          </security>
         </binding>
         </netTcpBinding>
       </bindings>
       <client>
        <endpoint address="net.tcp://192.168.1.2:9000/MySubscriptionService"
          binding="netTcpBinding" bindingConfiguration="NetTcpBinding_MySubscriptionContract"
          contract="MySubscriptionContract" name="NetTcpBinding_MySubscriptionContract">
         <identity>
          <dns value="localhost" />
         </identity>
        </endpoint>
       </client>
     </system.serviceModel>
    </configuration>
    
    2010年5月5日 12:59
  • 请你们帮帮我解决问题,急死了。一旦客户端订阅了事件,服务端会用一个字典保存客户端回调实例,以及客户端订阅(Sunscribe)的操作,但是一旦客户端非正常退出,比如急速进程。没有能主动发起取消订阅(Unsubscribe)事件,这时服务端一直不会删除客户端回调引用,并一直发布数据,在服务端能捕获到CommunicationObjectDisposedException,请问这个问题该如何解决。

     

    2010年5月6日 5:02
  • 服务端在发送回调消息前,判断一下回调的通道状态
    Frank Xu Lei--谦卑若愚,好学若饥
    专注于.NET平台下分布式应用系统开发和企业应用系统集成
    Focus on Distributed Applications Development and EAI based on .NET
    欢迎访问老徐的中文技术博客:Welcome to My Chinese Technical Blog
    欢迎访问微软WCF中文技术论坛:Welcome to Microsoft Chinese WCF Forum
    欢迎访问微软WCF英文技术论坛:Welcome to Microsoft English WCF Forum
    2010年5月6日 9:57
    版主
  • 这个应该在哪里判断,又该怎么判断呢?因为这个模式 我也是参考的别人的例子,所以有很多不懂的地方。

    static void Invoke(T subscriber, string methodName, object[] args)
        {
          Debug.Assert(subscriber != null);
          Type type = typeof(T);
          MethodInfo methodInfo = type.GetMethod(methodName);
          try
          {
            methodInfo.Invoke(subscriber, args);
          }
          catch (Exception e)
          {
            Trace.WriteLine(e.Message);
          }
        }
    这个方法主要是用于回调的,怎么在回调前加入判断呢?

     

    2010年5月6日 11:59
  • 大家帮帮忙,谢谢咯。
    2010年5月9日 3:28
  • 因为你的网络情况较差,是否可以考虑使用pull的方式代替push? 即由客户端定时刷新,服务端只做被动响应。虽然实时性不及回调形式,当客户端很多时服务端也可能会有一点性能问题,但不用去考虑通信问题了。
    2010年5月22日 7:42
  • 这个应该在哪里判断,又该怎么判断呢?因为这个模式 我也是参考的别人的例子,所以有很多不懂的地方。

    static void Invoke(T subscriber, string methodName, object[] args)
        {
          Debug.Assert(subscriber != null);
          Type type = typeof(T);
          MethodInfo methodInfo = type.GetMethod(methodName);
          try
          {
            methodInfo.Invoke(subscriber, args);
          }
          catch (Exception e)
          {
            Trace.WriteLine(e.Message);
          }
        }
    这个方法主要是用于回调的,怎么在回调前加入判断呢?

     


    这样判断:

    if (((IChannel)subscriber).State != CommunicationState.Opened)

    {

       // 客户端通道已关闭或出错,从字典里删除 subscriber.

    }

    2010年5月23日 4:46