none
如何实现这样一种排队机制呢? RRS feed

  • 常规讨论

  • 问题描述:

    类似于银行的取款的服务,

    银行提供服务,但在其内部,按照业务或是用户类型的不同,银行会提供不同的窗口,每个窗口的处理能力不一样,如:

    A窗口:处理一般用户的小额的存取业务,能同时处理10人;

    B窗口:处理大宗的存取业务,能同时处理4人;

    C窗口:处理VIP客户的存取业务,能同时处理5人;

    ……

    具体的窗口个数不定,有可能会根据情况新开或减少窗口。

    窗口能处理能力也可能有所变化。 

    同时,有这样一种要求,VIP的客户如果做小额的存取,如果C窗口已满,可以选择到A窗口操作;

    而一些特定的操作,只能在特定的窗口完成,如大宗的存取,只能在B窗口完成,或者VIP用户的某些特定操作,也只能在C窗口完成。

    现在最头痛的问题是:

    银行没有提供排队的操作,如果用户一拥而入,那么后来的用户,只要是超出了处理能力,一律踢出去,从而得不到服务。

    所以,需要要提供一个服务,来实现排队的操作。

    再对应到具体业务,说得更仔细一点:

    银行服务:指的是服务提供商,通过类似于

    http://192.168.1.254:8080/?cmd=test&user=abc&pwd=123456的形式,获取信息。其内部会通过user参数,确定用户类型,而通过cmd参数,确定其操作,从而选择不同的线路进行服务;

    窗口:指的是内部的不同线路;

    业务规则:选择不同线路的规则,前面的比喻应该是比较清楚了;

    处理能力:指的是并发数,并发数之所以受限,是因为服务商对每种用户类型或是操作提供的连接数不一样;

    问题:指的是服务商没有提供排队功能,超出连接数时,一律返回错误;

    排队服务:需要自己用wcf实现的排队服务;

    嗯,不知道有没有把问题说清楚。

    个人的一些想法,

    1、解决方案选择:

    考虑到安全性和其他方面的一些因素,所以选用了wcf提供一个中间服务,每个客户端请求进来之后,根据其用户类型和操作,给他一个对应的访问用户名和密码,向服务商发送请求,得到信息后,返回客户端。

    2、排队机制:

    总的来说有两种方法:

    一种是一旦用户请求,立刻把他排在相应的队伍后面,但这样有和上面规则不符合的地方,如VIP客户的小额存款,由于他既可以排在A窗口,也可以排在C窗口,在他跨进银行的这一刻,其实是无法断定他到底排在哪个窗口的,当然,考虑到处理还是比较快,可以让他多等一会;

    更好的另一种选择是,类似于银行的叫号机,每处理完一个业务后,再叫下一个,就是在有空位的时候,再去决定请求由哪个线路处理;

    3、现在个人能想到的解决方法:

    利用wcf的限流(Thtottle,它可以自动实现排队机制,每个线路开启一个服务,但这样的缺点是显而易见的。

         a、无法动态增加线路,一旦线路有增减,需要手工操作,甚至牵涉到客户端的改动;

         b、排队方式还是第一种;

         当然,对于客户端变动这一块,似乎可以用wcf中路由的功能得到解决,但手工添加服务,似乎有些无法忍受。

    4、如果我上面的分析没有问题的话,那么我的问题是,如何在wcf中实现自己的限流功能呢?

    因为wcf虽然提供了限流,但已近固定死了,无法做任何手脚。也就需要自己来解决了。

    按照wcf限流的理论来讲,过程是这样:

         a、宿主接受请求;

         b、消息被传到信道栈顶部;

         c、消息传送到ChannelDispatcher对象;

         d、传送至EndpointDispatcher对象;

         在传送请求至EndpointDispatcher对象之前,ChannelDispatcher对象可以检查服务当前的负载。如果该请求会导致服务超出许可的负载那么选择延迟该请求。在这种情况下,请求将被阻塞并且保存在内部的队列中直到服务的负载得以释放。

        

         那么,我该在哪个环节动手?(按照结构图,限制行为是包含在服务运行时中)

         如何检查负载?(用性能计数器?)

         如何延迟请求并阻塞?

    当负载释放时,又该如何操作呢?

    以上,接触wcf时间不长,一些特性也不熟悉,包括对业务的理解方面,大家多多讨论和提出宝贵意见。各位达人们,不吝赐教。

    2012年6月27日 2:24

全部回复

  • 补充一些资料:

    关于限流的一些内容,可以查看这个帖子WCF中并发(Concurrency)与限流(Throttling)体系深入解析系列的第7篇。

    另外,C#中FlowThrottle的实现参考代码:

    namespace System.ServiceModel.Dispatcher
    {
        using System;
        using System.Diagnostics;
        using System.ServiceModel.Channels;
        using System.ServiceModel.Diagnostics;
        using System.Collections.Generic;
        using System.Threading;
     
        sealed class FlowThrottle
        {
            int capacity;
            int count;
            object mutex;
            WaitCallback release;
            Queue<object> waiters;
            String propertyName;
            String configName;
      
            internal FlowThrottle(WaitCallback release, int capacity, String propertyName, String configName)
            {
                if (capacity <= 0)
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.SFxThrottleLimitMustBeGreaterThanZero0)));
      
                this.count = 0;
                this.capacity = capacity;
                this.mutex = new object();
                this.release = release;
                this.waiters = new Queue<object>();
                this.propertyName = propertyName;
                this.configName = configName;
            }
      
            internal int Capacity
            {
                get { return this.capacity; }
                set
                {
                    if (value <= 0)
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.SFxThrottleLimitMustBeGreaterThanZero0)));
                    this.capacity = value;
                }
            }
     
            internal bool Acquire(object o)
            {
                lock (this.mutex)
                {
                    if (this.count < this.capacity)
                    {
                        this.count++;
                        return true;
                    }
                    else
                    {
                        if (this.waiters.Count == 0)
                        {
                            if (DiagnosticUtility.ShouldTraceInformation)
                            {
                                DiagnosticUtility.DiagnosticTrace.TraceEvent(
                                    TraceEventType.Information,
                                    TraceCode.ServiceThrottleLimitReached,
                                    SR.GetString(SR.TraceCodeServiceThrottleLimitReached,
                                                 this.propertyName, this.capacity, this.configName));
                            }
                        }
      
                        this.waiters.Enqueue(o);
                        return false;
                    }
                }
            }
      
            internal void Release()
            {
                object next = null;
     
                lock (this.mutex)
                {
                    if (this.waiters.Count > 0)
                    {
                        next = this.waiters.Dequeue();
                        if (this.waiters.Count == 0)
                            this.waiters.TrimExcess();
                    }
                    else
                    {
                        this.count--;
                    }
                }
      
                if (next != null)
                    this.release(next);
            }
        }
    
    以及,ServiceThrottle的参考实现:
    namespace System.ServiceModel.Dispatcher
    {
        using System;
        using System.ServiceModel;
        using System.Collections.Generic;
        using System.Globalization;
        using System.Threading;
        using System.Runtime.Serialization;
     
        interface ISessionThrottleNotification
        {
            void ThrottleAcquired();
        }
     
        public sealed class ServiceThrottle
        {
            internal const int DefaultMaxConcurrentCalls = 16;
            internal const int DefaultMaxConcurrentSessions = 10;
      
            FlowThrottle calls;
            FlowThrottle sessions;
            QuotaThrottle dynamic;
            FlowThrottle instanceContexts;
      
            ServiceHostBase host;
            bool isActive;
            object thisLock = new object();
      
            internal ServiceThrottle(ServiceHostBase host)
            {
                if (!((host != null)))
                {
                    DiagnosticUtility.DebugAssert("ServiceThrottle.ServiceThrottle: (host != null)");
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("host");
                }
                this.host = host;
                this.MaxConcurrentCalls = ServiceThrottle.DefaultMaxConcurrentCalls;
                this.MaxConcurrentSessions = ServiceThrottle.DefaultMaxConcurrentSessions;
      
                this.isActive = true;
            }
      
            FlowThrottle Calls
            {
                get
                {
                    lock (this.ThisLock)
                    {
                        if (this.calls == null)
                        {
                            this.calls = new FlowThrottle(this.GotCall, ServiceThrottle.DefaultMaxConcurrentCalls,
                                                          ServiceThrottle.MaxConcurrentCallsPropertyName, ServiceThrottle.MaxConcurrentCallsConfigName);
                        }
                        return this.calls;
                    }
                }
            }
      
            FlowThrottle Sessions
            {
                get
                {
                    lock (this.ThisLock)
                    {
                        if (this.sessions == null)
                        {
                            this.sessions = new FlowThrottle(this.GotSession, ServiceThrottle.DefaultMaxConcurrentSessions,
                                                             ServiceThrottle.MaxConcurrentSessionsPropertyName, ServiceThrottle.MaxConcurrentSessionsConfigName);
                        }
                        return this.sessions;
                    }
                }
            }
     
            QuotaThrottle Dynamic
            {
                get
                {
                    lock (this.ThisLock)
                    {
                        if (this.dynamic == null)
                        {
                            this.dynamic = new QuotaThrottle(this.GotDynamic, new object());
                            this.dynamic.Owner = "ServiceHost";
                        }
                        this.UpdateIsActive();
                        return this.dynamic;
                    }
                }
            }
      
            internal int ManualFlowControlLimit
            {
                get { return this.Dynamic.Limit; }
                set { this.Dynamic.SetLimit(value); }
            }
     
            const string MaxConcurrentCallsPropertyName = "MaxConcurrentCalls";
            const string MaxConcurrentCallsConfigName = "maxConcurrentCalls";
            public int MaxConcurrentCalls
            {
                get { return this.Calls.Capacity; }
                set
                {
                    this.ThrowIfClosedOrOpened(MaxConcurrentCallsPropertyName);
                    this.Calls.Capacity = value;
                    this.UpdateIsActive();
                }
            }
     
            const string MaxConcurrentSessionsPropertyName = "MaxConcurrentSessions";
            const string MaxConcurrentSessionsConfigName = "maxConcurrentSessions";
            public int MaxConcurrentSessions
            {
                get { return this.Sessions.Capacity; }
                set
                {
                    this.ThrowIfClosedOrOpened(MaxConcurrentSessionsPropertyName);
                    this.Sessions.Capacity = value;
                    this.UpdateIsActive();
                }
            }
      
            const string MaxConcurrentInstancesPropertyName = "MaxConcurrentInstances";
            const string MaxConcurrentInstancesConfigName = "maxConcurrentInstances";
            public int MaxConcurrentInstances
            {
                get { return this.InstanceContexts.Capacity; }
                set
                {
                    this.ThrowIfClosedOrOpened(MaxConcurrentInstancesPropertyName);
                    this.InstanceContexts.Capacity = value;
                    this.UpdateIsActive();
                }
            }
      
            FlowThrottle InstanceContexts
            {
                get
                {
                    lock (this.ThisLock)
                    {
                        if (this.instanceContexts == null)
                        {
                            this.instanceContexts = new FlowThrottle(this.GotInstanceContext, Int32.MaxValue,
                                                                     ServiceThrottle.MaxConcurrentInstancesPropertyName, ServiceThrottle.MaxConcurrentInstancesConfigName);
                        }
                        return this.instanceContexts;
                    }
                }
            }
     
            internal bool IsActive
            {
                get { return this.isActive; }
            }
      
            internal object ThisLock
            {
                get { return this.thisLock; }
            }
     
            bool PrivateAcquireCall(ChannelHandler channel)
            {
                return (this.calls == null) || this.calls.Acquire(channel);
            }
     
            bool PrivateAcquireSessionListenerHandler(ListenerHandler listener)
            {
                if ((this.sessions != null) && (listener.Channel != null) && (listener.Channel.Throttle == null))
                {
                    listener.Channel.Throttle = this;
                    return this.sessions.Acquire(listener);
                }
                else
                {
                    return true;
                }
            }
     
            bool PrivateAcquireSession(ISessionThrottleNotification source)
            {
                return (this.sessions == null || this.sessions.Acquire(source));
            }
     
            bool PrivateAcquireDynamic(ChannelHandler channel)
            {
                return (this.dynamic == null) || this.dynamic.Acquire(channel);
            }
      
            bool PrivateAcquireInstanceContext(ChannelHandler channel)
            {
                if ((this.instanceContexts != null) && (channel.InstanceContext == null))
                {
                    channel.InstanceContextServiceThrottle = this;
                    return this.instanceContexts.Acquire(channel);
                }
                else
                {
                    return true;
                }
            }
     
            internal bool AcquireCall(ChannelHandler channel)
            {
                lock (this.ThisLock)
                {
                    return (this.PrivateAcquireCall(channel));
                }
            }
      
            internal bool AcquireInstanceContextAndDynamic(ChannelHandler channel, boolacquireInstanceContextThrottle)
            {
                lock(this.ThisLock)
                {
                    if (!acquireInstanceContextThrottle)
                    {
                        return this.PrivateAcquireDynamic(channel);
                    }
                    else
                    {
                        return (this.PrivateAcquireInstanceContext(channel) &&
                                this.PrivateAcquireDynamic(channel));
                    }
                }
            }
     
            internal bool AcquireSession(ISessionThrottleNotification source)
            {
                lock (this.ThisLock)
                {
                    return this.PrivateAcquireSession(source);
                }
            }
      
            internal bool AcquireSession(ListenerHandler listener)
            {
                lock (this.ThisLock)
                {
                    return this.PrivateAcquireSessionListenerHandler(listener);
                }
            }
     
            void GotCall(object state)
            {
                ChannelHandler channel = (ChannelHandler)state;
      
                lock (this.ThisLock)
                {
                    channel.ThrottleAcquiredForCall();
                }
            }
      
            void GotDynamic(object state)
            {
                ((ChannelHandler)state).ThrottleAcquired();
            }
      
            void GotInstanceContext(object state)
            {
                ChannelHandler channel = (ChannelHandler)state;
      
                lock (this.ThisLock)
                {
                    if (this.PrivateAcquireDynamic(channel))
                        channel.ThrottleAcquired();
                }
            }
     
            void GotSession(object state)
            {
                ((ISessionThrottleNotification)state).ThrottleAcquired();
            }
      
            internal void DeactivateChannel()
            {
                if (this.isActive)
                {
                    if (this.sessions != null)
                        this.sessions.Release();
                }
            }
      
            internal void DeactivateCall()
            {
                if (this.isActive)
                {
                    if (this.calls != null)
                        this.calls.Release();
                }
            }
      
            internal void DeactivateInstanceContext()
            {
                if (this.isActive)
                {
                    if (this.instanceContexts != null)
                    {
                        this.instanceContexts.Release();
                    }
                }
            }
      
            internal int IncrementManualFlowControlLimit(int incrementBy)
            {
                return this.Dynamic.IncrementLimit(incrementBy);
            }
     
            void ThrowIfClosedOrOpened(string memberName)
            {
                if (this.host.State == CommunicationState.Opened)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(newInvalidOperationException(SR.GetString(SR.SFxImmutableThrottle1, memberName)));
                }
                else
                {
                    this.host.ThrowIfClosedOrOpened();
                }
            }
     
            void UpdateIsActive()
            {
                this.isActive = ((this.dynamic != null) ||
                                 ((this.calls != null) && (this.calls.Capacity != Int32.MaxValue)) ||
                                 ((this.sessions != null) && (this.sessions.Capacity != Int32.MaxValue)) ||
                                 ((this.instanceContexts != null) && (this.instanceContexts.Capacity != Int32.MaxValue)));
            }
        }
    }
    

    欢迎大家讨论。

    2012年6月27日 2:48