none
发布订阅模式并行事件发布问题 RRS feed

  • 常规讨论

  • 这是之前发的一个帖子:http://social.msdn.microsoft.com/Forums/zh-CN/wcfzhchs/thread/5360f6ce-ec3b-4b8d-ba95-ec4c027fc119

    我的项目采用的是基于事件的订阅-发布模式,主要是实现客户端与服务端的即时数据交互,客户端通过订阅事件以待接收数据,服务端通过发布服务将即时数据发布至服务,服务通知订阅了事件的客户端。问题是这样的:软件在网络情况较差的情况下,客户端与服务端的数据交换在一段时间后便停止,这个时候自动如何重新与服务器建立通信。

      当服务端与客户端通讯中止的时候,客户端已经故障的情况下,服务端还是对客户端之前保存在服务端的引用进行数据发布,我的问题是如何在这种情况下服务端主动删除无效的客户端引用。(我已尝试删除,但有时候可以,有时候很困难而且几乎不能删除。)

      另一个问题是,采用这种模式,不同订阅者(客户端)在接受数据的时候会互相影响,也就是说一个客户端如果接受数据受阻,其他客户端接受也收到影响。不知道该怎么解决。

    这些问题困扰我一周了,请各位达人帮帮忙,下面我贴出我的代码。

    发布-订阅类

    public abstract class SubscriptionManager<T> where T : class
     {
      static Dictionary<string, List<T>> m_TransientStore;
    
      static SubscriptionManager()
      {
       m_TransientStore = new Dictionary<string, List<T>>();
       string[] methods = GetOperations();
       Action<string> insert = delegate(string methodName)
       {
        m_TransientStore.Add(methodName, new List<T>());
       };
       Array.ForEach(methods, insert);
      }
      static string[] GetOperations()
      {
       MethodInfo[] methods = typeof(T).GetMethods(BindingFlags.Public | BindingFlags.FlattenHierarchy | BindingFlags.Instance);
       List<string> operations = new List<string>(methods.Length);
    
       Action<MethodInfo> add = delegate(MethodInfo method)
       {
        Debug.Assert(!operations.Contains(method.Name));
        operations.Add(method.Name);
       };
       Array.ForEach(methods, add);
       return operations.ToArray();
      }
    
      //Transient subscriptions management 
      internal static T[] GetTransientList(string eventOperation)
      {
       lock (typeof(SubscriptionManager<T>))
       {
        List<T> list = m_TransientStore[eventOperation];
        return list.ToArray();
       }
      }
      static void AddTransient(T subscriber, string eventOperation)
      {
       lock (typeof(SubscriptionManager<T>))
       {
        List<T> list = m_TransientStore[eventOperation];
        if (list.Contains(subscriber))
        {
         return;
        }
        list.Add(subscriber);
       }
      }
      static void RemoveTransient(T subscriber, string eventOperation)
      {
       lock (typeof(SubscriptionManager<T>))
       {
        List<T> list = m_TransientStore[eventOperation];
        list.Remove(subscriber);
       }
      }
    
      public void Subscribe(string eventOperation)
      {
       lock (typeof(SubscriptionManager<T>))
       {
        T subscriber = OperationContext.Current.GetCallbackChannel<T>();
        if (String.IsNullOrEmpty(eventOperation) == false)
        {
         AddTransient(subscriber, eventOperation);
        }
        else
        {
         string[] methods = GetOperations();
         Action<string> addTransient = delegate(string methodName)
         {
          AddTransient(subscriber, methodName);
         };
         Array.ForEach(methods, addTransient);
        }
       }
      }
    
      public void Unsubscribe(string eventOperation)
      {
       lock (typeof(SubscriptionManager<T>))
       {
        T subscriber = OperationContext.Current.GetCallbackChannel<T>();
        if (String.IsNullOrEmpty(eventOperation) == false)
        {
         RemoveTransient(subscriber, eventOperation);
        }
        else
        {
         string[] methods = GetOperations();
         Action<string> removeTransient = delegate(string methodName)
         {
          RemoveTransient(subscriber, methodName);
         };
         Array.ForEach(methods, removeTransient);
        }
       }
      } 
     }
    

     发布服务类

    public abstract class PublishService<T> where T : class
     {
      protected static void FireEvent(params object[] args)
      {
       StackFrame stackFrame = new StackFrame(1);
       string methodName = stackFrame.GetMethod().Name;
    
       PublishTransient(methodName, args);
      }
    
      static void FireEvent(string methodName, params object[] args)
      {
       PublishTransient(methodName, args);
      }
    
      static void PublishTransient(string methodName, params object[] args)
      {
       T[] subscribers = SubscriptionManager<T>.GetTransientList(methodName);
       Publish(subscribers, false, methodName, args);
      }
    
      static void Publish(T[] subscribers, bool closeSubscribers, string methodName, params object[] args)
      {
       WaitCallback fire = delegate(object subscriber)
       {
        Invoke(subscriber as T, methodName, args);
        if (closeSubscribers)
        {
         using (subscriber as IDisposable)
         { }
        }
       };
       Action<T> queueUp = delegate(T subscriber)
       {
        ThreadPool.QueueUserWorkItem(fire, subscriber);
       };
       Array.ForEach(subscribers, queueUp);
      }
      static void Invoke(T subscriber, string methodName, object[] args)
      {
       Debug.Assert(subscriber != null);
       Type type = typeof(T);
       MethodInfo methodInfo = type.GetMethod(methodName);
       try
       {
        methodInfo.Invoke(subscriber, args);
       }
       catch (Exception e)
       {
        Trace.WriteLine(e.Message);
       }
      }
     }
    

    服务端配置文件:

    <pre lang="x-c#"><?xml version="1.0"?>
    <configuration>
    <system.serviceModel>
     <bindings>
      <netTcpBinding>
      <binding name="BindingBehaviorConfiguration" receiveTimeout="00:10:00" maxBufferSize="65536"
         maxReceivedMessageSize="65536" transferMode="Buffered" maxBufferPoolSize="65536"
         closeTimeout="00:10:00" sendTimeout="00:10:00" openTimeout="00:10:00">
       <readerQuotas maxArrayLength="65536" maxStringContentLength="65536" maxBytesPerRead="65536"/>
    <reliableSession enabled="true"/>
       <security mode="None">
       <transport clientCredentialType="Windows"/>
       <message clientCredentialType="Windows"/>
       </security>
      </binding>
      </netTcpBinding>
     </bindings>
     <services>
      <service behaviorConfiguration="serviceBehavior" name="HK.Globex.Services.MySubscriptionService">
      <endpoint address="MySubscriptionService" binding="netTcpBinding" bindingConfiguration="BindingBehaviorConfiguration" contract="HK.Globex.Contracts.IMySubscriptionService">
       <identity>
       <dns value="localhost"/>
       </identity>
      </endpoint>
      <endpoint address="mex" binding="mexTcpBinding" bindingConfiguration="" contract="IMetadataExchange">
       <identity>
       <dns value="localhost"/>
       </identity>
      </endpoint>
      <host>
       <baseAddresses>
       <add baseAddress="net.tcp://192.168.1.2:9000"/>
       </baseAddresses>
      </host>
      </service>
      <service behaviorConfiguration="serviceBehavior" name="HK.Globex.Services.MyPublishService">
      <endpoint address="MyPublishService" binding="netTcpBinding" bindingConfiguration="BindingBehaviorConfiguration" contract="HK.Globex.Contracts.IMyEvents">
       <identity>
       <dns value="localhost"/>
       </identity>
      </endpoint>
      <endpoint address="mex" binding="mexTcpBinding" bindingConfiguration="" contract="IMetadataExchange">
       <identity>
       <dns value="localhost"/>
       </identity>
      </endpoint>
      <host>
       <baseAddresses>
       <add baseAddress="net.tcp://192.168.1.2:9001"/>
       </baseAddresses>
      </host>
      </service>
     </services>
     <behaviors>
      <serviceBehaviors>
      <behavior name="serviceBehavior">
       <serviceMetadata/>
       <serviceDebug includeExceptionDetailInFaults="true"/>
       <serviceThrottling maxConcurrentCalls="1000" maxConcurrentInstances="1000" maxConcurrentSessions="1000"/>
      </behavior>
      </serviceBehaviors>
     </behaviors>
     </system.serviceModel>
    </configuration>
    
    

    发布者配置文件:

    <?xml version="1.0"?>
    <configuration>
     <system.serviceModel>
      <bindings>
      <netTcpBinding>
      <binding name="NetTcpBinding_MyEventsContract" closeTimeout="00:10:00"
         openTimeout="00:10:00" receiveTimeout="00:10:00" sendTimeout="00:10:00"
         transactionFlow="false" transferMode="Buffered" transactionProtocol="OleTransactions"
         hostNameComparisonMode="StrongWildcard" listenBacklog="10"
         maxBufferPoolSize="65536" maxBufferSize="65536" maxConnections="10"
         maxReceivedMessageSize="65536">
       <reliableSession ordered="false" inactivityTimeout="00:10:00"
          enabled="true" />
       <readerQuotas maxDepth="32" maxStringContentLength="65536" maxArrayLength="65536"
        maxBytesPerRead="65536" maxNameTableCharCount="65536" />
       <security mode="None">
       <transport clientCredentialType="Windows" protectionLevel="EncryptAndSign" />
       <message clientCredentialType="Windows" />
       </security>
      </binding>
      </netTcpBinding>
     </bindings>
     <client>
      <endpoint address="net.tcp://192.168.1.2:9001/MyPublishService"
        binding="netTcpBinding" bindingConfiguration="NetTcpBinding_MyEventsContract"
        contract="MyEventsContract" name="NetTcpBinding_MyEventsContract">
      <identity>
       <dns value="localhost" />
      </identity>
      </endpoint>
     </client>
     </system.serviceModel>
    </configuration>
    
    

    客户端配置文件:

    <?xml version="1.0"?>
    <configuration>
     <system.serviceModel>
      <bindings>
      <netTcpBinding>
       <binding name="NetTcpBinding_MySubscriptionContract" closeTimeout="00:10:00"
        openTimeout="00:10:00" receiveTimeout="00:10:00" sendTimeout="00:10:00"
        transactionFlow="false" transferMode="Buffered" transactionProtocol="OleTransactions"
        hostNameComparisonMode="StrongWildcard" listenBacklog="10"
        maxBufferPoolSize="65536" maxBufferSize="65536" maxConnections="10"
        maxReceivedMessageSize="2147483647">
       <readerQuotas maxDepth="32" maxStringContentLength="2147483647" maxArrayLength="2147483647"
        maxBytesPerRead="4096" maxNameTableCharCount="2147483647" />
       <reliableSession ordered="false" inactivityTimeout="00:10:00"
          enabled="true" />
       <security mode="None">
        <transport clientCredentialType="Windows" protectionLevel="EncryptAndSign" />
        <message clientCredentialType="Windows" />
       </security>
       </binding>
       </netTcpBinding>
      </bindings>
      <client>
      <endpoint address="net.tcp://192.168.1.2:9000/MySubscriptionService"
       binding="netTcpBinding" bindingConfiguration="NetTcpBinding_MySubscriptionContract"
       contract="MySubscriptionContract" name="NetTcpBinding_MySubscriptionContract">
       <identity>
       <dns value="localhost" />
       </identity>
      </endpoint>
      </client>
     </system.serviceModel>
    </configuration>
    

     

     

    2010年5月10日 12:53

全部回复

  •   参考了一些英文资料,有老外建议在服务回调客户端的时候采取异步的方法,但是我试了好像不起作用,不知道是我用错了还是怎么的。我主要就是通过修改回调契约,增加了BeginXXx与EndXXx方法:

    [OperationContract(IsOneWay=true,AsyncPattern=true)]
    IAsyncResult BeginDisplayPrice(string pName,string buyPrice,string sellPrice,AsyncCallback callback,object state);
    void EndDisplayPrice(IAsyncResult result);
    

    这样做好像程序不能正常运行,对于我的程序,服务回调客户端该如何实现,谢谢你们!

      static void Invoke(T subscriber, string methodName, object[] args)
      {
       Debug.Assert(subscriber != null);
       Type type = typeof(T);

       //获取回调方法名字
       MethodInfo methodInfo = type.GetMethod(methodName);
       try
       {

        //通过反射调用回调方法
        methodInfo.Invoke(subscriber, args);
       }
       catch (Exception e)
       {
        Trace.WriteLine(e.Message);
       }
      }

     

    2010年5月12日 2:46
  • UseSynchronizationContext=false,

    这个在服务端和客户端都尝试一下,http://msdn.microsoft.com/en-us/library/system.servicemodel.callbackbehaviorattribute.usesynchronizationcontext.aspx

    之前看的帖子也是说这个有作用。


    Frank Xu Lei--谦卑若愚,好学若饥
    专注于.NET平台下分布式应用系统开发和企业应用系统集成
    Focus on Distributed Applications Development and EAI based on .NET
    欢迎访问老徐的博客:Welcome to My Technical Blog
    欢迎访问老徐的网站:Welcome to My Website
    欢迎访问微软WCF中文技术论坛:Welcome to Microsoft Chinese WCF Forum
    欢迎访问微软WCF英文技术论坛:Welcome to Microsoft English WCF Forum
    2010年5月13日 15:31
    版主