none
WCF Client Pool Caching Channel RRS feed

  • Question

  • This is about a WCF client pooling issue. We have a windows service that spawns several worker threads. Each thread executes on a task and makes WCF calls to a service that runs on the same machine. We have a client pool logic (code attached) where we are associating a thread making the wcf call with a client channel which is created by ChannelFactory.CreateChannel( ).

    /// <summary>
    /// Class used to keep track internally of which proxy is being used and which is available for the pool
    /// </summary>
    private class ProxyScribe
    {
        /// <summary>
        /// Gets or sets the owner thread
        /// </summary>
        public Thread OwnerThread;
    
        /// <summary>
        /// Gets or sets the proxy instance
        /// </summary>
        public T ProxyInstance { get; set; }
    }
    

    In the Acquire( ) method, we check to see if any channel in the pool is available and reuse the same channel(proxy instance) if no thread is owning it. Since we are using Interlocked.CompareExchange( ) only one thread can use a channel at a time. If no channel is available in the pool, we create a new channel.

    In the Release( ) method, we check if the channel being released is actually owned by the thread that was recorded to own it. In certain scenarios of high load, this check is failing and we hit the below exception.

    Line 180 : throw new InvalidOperationException("The current thread is not the owner of the proxy, owning thread id is " + scribe.OwnerThread.ManagedThreadId);

    Do you see anything wrong with this implementation?  Please let me know if you have any suggestions or workarounds.

    ___________________________________________________

    ClientPool.cs

    ___________________________________________________

    /// <summary>
    /// The client pool.
    /// </summary>
    /// <typeparam name="T">The type param.</typeparam>
    /// <typeparam name="TChannel">The channel type param.</typeparam>
    public abstract class ClientPool<T, TChannel> : IPool<T> where T : class where TChannel : IClientChannel, T
    {
        /// <summary>
        /// The pool size configuration key
        /// </summary>
        private const string PoolSizeKey = "PoolSize";
    
        /// <summary>
        /// The upper bound for for the pool size
        /// </summary>
        private const int PoolSizeLimit = 1024;
    
        /// <summary>
        /// The channel factory
        /// </summary>
        private readonly IOcpChannelFactory<TChannel> channelFactory;
    
        /// <summary>
        /// The proxy log
        /// </summary>
        private readonly List<ProxyScribe> log;
    
        /// <summary>
        /// Initializes a new instance of the <see cref="ClientPool&lt;T, TChannel&gt;"/> class
        /// </summary>
        /// <param name="channelFactory">The channel factory</param>
        protected ClientPool(IOcpChannelFactory<TChannel> channelFactory)
            : this(
                channelFactory, 
                int.Parse(Factory<AcmClientFactory>.Resolve<IAcmConfiguration>().GetKeyValueConfig(AcmConstants.ProxyExtensionConfigurationTypeId)[PoolSizeKey], CultureInfo.InvariantCulture))
        {
        }
    
        /// <summary>
        /// Initializes a new instance of the <see cref="ClientPool&lt;T, TChannel&gt;"/> class
        /// </summary>
        /// <param name="channelFactory">The channel factory</param>
        /// <param name="size">The size of the pool.</param>
        protected ClientPool(IOcpChannelFactory<TChannel> channelFactory, int size)
        {
            if (channelFactory == null)
            {
                throw new ArgumentNullException("channelFactory");
            }
    
            if (size < 0)
            {
                throw new ArgumentException("Pool size must be greater or equal to 0", "size");
            }
    
            if (size > PoolSizeLimit)
            {
                throw new ArgumentException("Pool size cannot exceed limit of " + PoolSizeLimit, "size");
            }
    
            this.channelFactory = channelFactory;
            this.log = new List<ProxyScribe>(size);
    
            for (int i = 0; i < size; ++i)
            {
                ProxyScribe scribe = new ProxyScribe();
                this.log.Add(scribe);
            }
        }
    
        /// <summary>
        /// Gets the number of clients currently in use by the pool
        /// </summary>
        public int Used
        {
            get { return this.log.Count(x => x.OwnerThread != null); }
        }
    
        /// <summary>
        /// Gets the type of the pool
        /// </summary>
        public string PoolType
        { 
            get { return typeof(T).Name; }
        }
    
        /// <summary>
        /// Acquires an instance
        /// </summary>
        /// <returns>The instance</returns>
        public virtual T Acquire()
        {
            Thread currentThread = Thread.CurrentThread;
    
            // go through each proxy we have in the pool and find the first available one
            foreach (ProxyScribe scribe in this.log)
            {
                // if we can grab one from the pool, then mark it as used
                if (null == Interlocked.CompareExchange(ref scribe.OwnerThread, currentThread, null))
                {
                    try
                    {
                        scribe.ProxyInstance = this.EnsureNotFaulted(scribe.ProxyInstance);
                    }
                    catch (Exception e)
                    {
                        // we want to make sure the proxy won't get taken forever because we had an exception
                        scribe.OwnerThread = null;
                        ULS.SendTraceTag(0x6832346c /* tag_h24l */, ULSCat.ProxyExtension, ULSTraceLevel.Unexpected, "Unexpected exception occured in Acquire: {0}", e);
                        continue;
                    }
    
                    ULS.SendTraceTag(0x6832346d /* tag_h24m */, ULSCat.ProxyExtension, ULSTraceLevel.Verbose, "Acquired proxy ({0}:{1}) from the pool", this.PoolType, scribe.ProxyInstance.GetHashCode());
                    return scribe.ProxyInstance;
                }
            }
    
            if (this.log.Count > 0)
            {
                // if no proxy is available in the pool, just give the caller a new instance to work with
                ULS.SendTraceTag(0x6832346e /* tag_h24n */, ULSCat.ProxyExtension, ULSTraceLevel.High, "There were no available proxies on the {0} pool, creating a new instance. Current size is {1}", this.PoolType, this.log.Count);
            }
    
            return this.channelFactory.CreateChannel();
        }
    
        /// <summary>
        /// Releases the specified instance
        /// </summary>
        /// <param name="instance">The instance</param>
        public virtual void Release(T instance)
        {
            if (instance == null)
            {
                return;
            }
    
            ULS.SendTraceTag(0x6832346f /* tag_h24o */, ULSCat.ProxyExtension, ULSTraceLevel.Verbose, "Releasing proxy ({0}:{1}) to the pool", this.PoolType, instance.GetHashCode());
    
            Thread currentThread = Thread.CurrentThread;
    
            // go through each proxy we have in the pool and find the one used
            foreach (ProxyScribe scribe in this.log)
            {
                // if we find it, then make sure it's clean
                if (instance == scribe.ProxyInstance)
                {
                    ULS.SendTraceTag(0x68323470 /* tag_h24p */, ULSCat.ProxyExtension, ULSTraceLevel.Verbose, "Proxy instance is part of the pool, putting it back");
    
                    // ensure the thread who requested the proxy is the one releasing it
                    if (scribe.OwnerThread == currentThread)
                    {
                        try
                        {
                            scribe.ProxyInstance = this.EnsureNotFaulted(scribe.ProxyInstance);
                        }
                        finally
                        {
                            // regardless if we cleaned up, release the proxy at this point
                            scribe.OwnerThread = null;
                        }
    
                        return;
                    }
    
                    ULS.SendTraceTag(0x68323471 /* tag_h24q */, ULSCat.ProxyExtension, ULSTraceLevel.Unexpected, "The proxy instance in the {0} pool is not owned by thread {1}, but rather {2}", this.PoolType, currentThread.ManagedThreadId, scribe.OwnerThread.ManagedThreadId);
                    throw new InvalidOperationException("The current thread is not the owner of the proxy, owning thread id is " + scribe.OwnerThread.ManagedThreadId);
                }
            }
    
            // if the instance wasn't found on the pool it means it was a loaner 
            // so just dispose of it quitely and with no witnesses
            ULS.SendTraceTag(0x68323472 /* tag_h24r */, ULSCat.ProxyExtension, ULSTraceLevel.Verbose, "Proxy instance was not part of the pool, disposing of it");
            EnsureClosed(instance);
        }
    
        /// <summary>
        /// Makes sure the given channel gets disposed of properly
        /// </summary>
        /// <param name="instance">The channel instance</param>
        private static void EnsureClosed(T instance)
        {
            IClientChannel clientChannel = instance as IClientChannel;
    
            if (clientChannel == null)
            {
                return;
            }
    
            if (clientChannel.State == CommunicationState.Faulted)
            {
                clientChannel.Abort();
            }
            else
            {
                clientChannel.Close();
            }
        }
    
        /// <summary>
        /// Makes sure the given channel is not faulted
        /// </summary>
        /// <param name="instance">The channel instance</param>
        /// <returns>The proxy instance if create is true</returns>
        private T EnsureNotFaulted(T instance)
        {
            IClientChannel clientChannel = instance as IClientChannel;
    
            if (clientChannel != null && clientChannel.State == CommunicationState.Faulted)
            {
                ULS.SendTraceTag(0x68323473 /* tag_h24s */, ULSCat.ProxyExtension, ULSTraceLevel.Medium, "Proxy instance was faulted, aborting it");
                clientChannel.Abort();
                instance = null;
            }
    
            if (instance == null)
            {
                ULS.SendTraceTag(0x68323474 /* tag_h24t */, ULSCat.ProxyExtension, ULSTraceLevel.Verbose, "Proxy instance was null, creating new channel");
                instance = this.channelFactory.CreateChannel();
            }
    
            return instance;
        }
    
        /// <summary>
        /// Class used to keep track internally of which proxy is being used and which is available for the pool
        /// </summary>
        private class ProxyScribe
        {
            /// <summary>
            /// Gets or sets the owner thread
            /// </summary>
            [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "Needed for interlocked exchange to pass by reference")] 
            public Thread OwnerThread;
    
            /// <summary>
            /// Gets or sets the proxy instance
            /// </summary>
            public T ProxyInstance { get; set; }
        }
    }

    Tuesday, February 11, 2014 4:49 AM

Answers

  • Hi,

    The ChannelFactory does not pool / reuse channels.  However, it will maintain a list of the channels created so that closing  / aboorting the factory will do the same to any channels create through it.

    To reuse channels, I reccomened implementing caching them yourself.  In your implementation, when placing or removing from your cache, you may want to check the state of the channels so that user's of the channels don't get faulted / closed channels.

    Thanks.

    Wednesday, February 12, 2014 7:30 AM