none
Implementing Wokflow Management Service RRS feed

  • Question

  • I'm developing serveral workflows services using Visual Studio 11 Beta. Because there is no AppFabric for WF 4.5, I've implemented a very simplified Workflow Management Service for workflow activation. I've spent several days digging into AppFabric assemblies using .NET Reflector, and then come up with an implementation. Now I've realized that I could do it in a very simpler way.

    The Worflow Management Service is a Windows Service that does a bunch of things:

    • Creates SqlWorkflowStore
    • Creates WorkflowInstanceHandle
    • Set the workflow owner
    • Wait for activatable workflow event
    • Queries for activatable workflows
    • Activate workflows by calling a web service.

    These things causes SqlWorkflowStore to query every 5 seconds (by default) the Workflow store database. Now, I'm considering to do only one thing: calling the web service every 5 seconds. Simple. Less performant? I don't think so, I avoid to query the database.

    What do you think?

    Incidentally, here is the implementation:

    The web service is this:

        public class ActivationService : IActivationService
        {
            public void Activate(string virtualPath)
            {
                try
                {
                    ServiceHostingEnvironment.EnsureServiceAvailable(virtualPath);
                }
                catch (Exception ex)
                {
                    EventLog.WriteEntry("ExpensesWorkflowServices", ex.ToString(), EventLogEntryType.Error);
                    throw;
                }
            }
        }

    And the Workflow Management Service is this one:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using System.Activities.DurableInstancing;
    using System.Configuration;
    using System.Runtime.DurableInstancing;
    using System.Threading;
    using System.Xml.Linq;
    using System.Net;
    using inercya.Logging;
    
    namespace inercya.WorkflowManagement
    {
        class ManagementServiceExecutor : IDisposable
        {
            SqlWorkflowInstanceStore instanceStore;
            InstanceHandle instanceHandle;
    
            private static readonly TimeSpan defaultExecuteTimeout = TimeSpan.FromSeconds(60.0);
            private static readonly TimeSpan waitForEventsTimeout = TimeSpan.FromMinutes(5);
            private static readonly TimeSpan backoffTimerPeriod = TimeSpan.FromSeconds(12.0);
            private static readonly TimeSpan runnableInstancesDetectionPeriod = TimeSpan.FromSeconds(10.0);
            private static readonly TimeSpan maxWaitForReopen = TimeSpan.FromMinutes(6);
    
            private AutoResetEvent storeEvent= new AutoResetEvent(true);
            private CancellationTokenSource waitCancelationTokenSource = null;
    
            private string ConnectionString
            {
                get
                {
                    return ConfigurationManager.ConnectionStrings["WorkflowInstanceStore"].ConnectionString;
                }
            }
    
            private async Task WaitAsync(TimeSpan time)
            {
                waitCancelationTokenSource = new CancellationTokenSource();
                await TaskExtensions.WaitAsync(time, waitCancelationTokenSource.Token);
                waitCancelationTokenSource = null;
            }
    
            public async Task RunAsync()
            {
                DateTime lastErrorTime = DateTime.MinValue;
                TimeSpan openStoreWaitTime = TimeSpan.Zero;
                TimeSpan increment = TimeSpan.FromSeconds(10.0);
                TimeSpan totalIncrement = TimeSpan.Zero;
    
                while (!disposed)
                {
                    try
                    {
                        if (openStoreWaitTime != TimeSpan.Zero)
                        {
                            await WaitAsync(openStoreWaitTime);
                        }
                        await OpenStoreAsync();
                        await WaitForAndProcessEvents();
                    }
                    catch (Exception ex)
                    {
                        if (Fx.IsFatal(ex))
                        {
                            Logger.Log.Fatal("Fatal Error Running Workflow Management Service", ex);
                            throw;
                        }
                        if (!disposed)
                        {
                            var now = DateTime.UtcNow;
                            if (now.Subtract(lastErrorTime) < openStoreWaitTime.Add(TimeSpan.FromMinutes(1)))
                            {
                                if (openStoreWaitTime < maxWaitForReopen)
                                {
                                    totalIncrement = totalIncrement.Add(increment);
                                    openStoreWaitTime = openStoreWaitTime.Add(totalIncrement);
                                    Logger.Log.Warn("Could not open store, next retry after " + openStoreWaitTime.ToString(), ex);
                                }
                                else
                                {
                                    Logger.Log.Error("Could not open store after several attempts, next retry after " + openStoreWaitTime.ToString(), ex);
                                }
                            }
                            else
                            {
                                openStoreWaitTime = TimeSpan.Zero;
                                totalIncrement = TimeSpan.Zero;
                            }
                            lastErrorTime = now;
                        }
                    }
                    if (!disposed)
                    {
                        await FreeStoreAsync();
                    }
                }       
            }
    
            public void Open()
            {
                this.OpenStoreAsync().Wait();
            }
    
            private async Task FreeStoreAsync()
            {
                //ThrottledEventLogger.Trace.EventWriteRecycleStoreOwner(this.configuration.Name, this.configuration.StoreLocation);
                this.storeEvent.WaitOne();
                try
                {
                    if (this.instanceHandle != null || this.instanceStore != null)
                    {
                        try
                        {
                            if (this.instanceStore != null)
                            {
                                await this.instanceStore.DeleteWorkflowOwnerAsync();
                            }
                        }
                        catch (Exception exception)
                        {
                            if (Fx.IsFatal(exception))
                            {
                                Logger.Log.Fatal("Fatal Error Deleting Workflow Owner", exception);
                                throw;
                            }
                        }
                        finally
                        {
                            if (this.instanceHandle != null) this.instanceHandle.Free();
                            this.instanceHandle = null;
                            this.instanceStore = null;
                        }
                    }
                }
                finally
                {
                    storeEvent.Set();
                }
            }
    
            private async Task WaitForAndProcessEvents()
            {
                while (!disposed)
                {
                    try
                    {
                        Logger.Log.Debug("Waiting For Events");
                        var events = await instanceStore.WaitForEventsAsync(instanceHandle);
                        await ProcessEventsAsync(events);
                        await WaitAsync(TimeSpan.FromSeconds(5.0));
                    }
                    catch (TimeoutException)
                    {
                        Logger.Log.Debug("Timeout waiting for events");
                    }
                }
            }
    
            private async Task ActivateServicesAsync(IEnumerable<ServiceActivationData> activations)
            {
                try
                {
                    Logger.Log.Debug("Activating Services");
                    using (ManagementServices.ActivationServiceClient client = new ManagementServices.ActivationServiceClient())
                    {
                        client.ClientCredentials.Windows.ClientCredential = CredentialCache.DefaultNetworkCredentials;
                        foreach (var activation in activations)
                        {
                            if (disposed) return;
                            try
                            {
                                await client.ActivateAsync(activation.RelativeServicePath);
                                Logger.Log.Info("Service " + activation.RelativeServicePath + " Activated Successfully");
                            }
                            catch (Exception ex)
                            {
                                Logger.Log.Error("Could Not Activate Service " + activation.RelativeServicePath, ex);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Logger.Log.Error("Could Not Activate Services", ex);
                }
            }
    
            private List<ServiceActivationData> ExtractServiceActivationData(IEnumerable<ActivatableWorkflowsQueryResult> queryResult)
            {
                var intermetiateResult = new List<ServiceActivationData>();
                foreach (var aw in queryResult)
                {
                    foreach (IDictionary<XName, object> dictionary in aw.ActivationParameters)
                    {
                        object obj;
                        if (!dictionary.TryGetValue(WorkflowServiceNamespace.SiteName, out obj))
                        {
                            break;
                        }
                        string siteName = (string) obj;
                        if (!dictionary.TryGetValue(WorkflowServiceNamespace.RelativeApplicationPath, out obj))
                        {
                            break;
                        }
                        string relativeApplicationPath = (string) obj;
                        if (!dictionary.TryGetValue(WorkflowServiceNamespace.RelativeServicePath, out obj))
                        {
                            break;
                        }
                        string relativeServicePath = (string) obj;
                        intermetiateResult.Add( new ServiceActivationData(siteName, relativeApplicationPath, relativeServicePath));
                    }
                }
                return intermetiateResult.Distinct(ServiceActivationData.EqualityComparer).ToList();
            }
    
            private async Task ProcessEventsAsync(List<InstancePersistenceEvent> events)
            {
                if (disposed) return;
                if (events.Contains(InstancePersistenceEvent<HasActivatableWorkflowEvent>.Value))
                {
                    Logger.Log.Debug("Processing HasActivatableWorkflow Event");
                    //await TaskExtensions.WaitAsync(TimeSpan.FromMilliseconds(200.0));
                    var view = await instanceStore.QueryActivatableWorkflowsAsync(instanceHandle);
                    var activations = ExtractServiceActivationData(view.InstanceStoreQueryResults.OfType<ActivatableWorkflowsQueryResult>());
                    if (activations.Count > 0)
                    {
                        await ActivateServicesAsync(activations);
                    }
                    else
                    {
                        Logger.Log.Debug("QueryActivatableWorkflows result has no activations");
                    }
                }
                else
                {
                    Logger.Log.Debug("Processing Events");
                }
            }
                   
            private async Task OpenStoreAsync()
            {
                storeEvent.WaitOne();
                try
                {
                    if (disposed) return;
                    if (instanceStore == null || instanceHandle == null || !instanceHandle.IsValid)
                    {
                        Logger.Log.Debug("Opening Store");
                    }
                    if (instanceStore == null)
                    {
                        instanceStore = new SqlWorkflowInstanceStore(this.ConnectionString);
                        instanceStore.RunnableInstancesDetectionPeriod = runnableInstancesDetectionPeriod;
                    }
                    if (instanceHandle == null || !instanceHandle.IsValid)
                    {
                        instanceHandle = instanceStore.CreateInstanceHandle((InstanceOwner)null);
                        try
                        {
                            var instanceView = await instanceStore.CreateWorkflowOwnerAsync(instanceHandle);
                            instanceStore.DefaultInstanceOwner = instanceView.InstanceOwner;
                        }
                        catch (Exception)
                        {
                            if (instanceHandle != null)
                            {
                                instanceHandle.Free();
                                instanceHandle = null;
                            }
                            throw;
                        }
                    }
                }
                finally
                {
                    storeEvent.Set();
                }
            }
    
            #region IDisposable Members
    
            bool disposed;
    
            public void Dispose()
            {
                this.Dispose(true);
            }
    
            private void Dispose(bool disposing)
            {
                
                if (!this.disposed && disposing)
                {
                    Logger.Log.Debug("Disposing Management Service");
                    this.disposed = true;
                    var cs = this.waitCancelationTokenSource;
                    if (cs != null) cs.Cancel();                
                    this.CloseStore();
                }
            }
    
    
            private void CloseStore()
            {
                if (this.instanceHandle != null)
                {                
                    try
                    {
                        FreeStoreAsync().Wait();
                    }
                    catch (Exception exception)
                    {
                        if (Fx.IsFatal(exception))
                        {
                            throw;
                        }
                        //ThrottledEventLogger.Trace.EventWriteHandlingException(exception.Message, exception.ToString());
                    }
                }
            }
    
            #endregion
        }
    }
    

    Wednesday, May 9, 2012 1:58 PM

All replies

  • Hi Jesus,

    WF 4.5 is supposed to work with Microsoft AppFabric for Windows Server, as .NET 4.5 as an in-place update to .NET 4. Have you tried WF 4.5 on AppFabric? Have you faced any specific problems?

    I would appreciate if you could let me know.

    Thank you,

    Hani

    Wednesday, May 23, 2012 1:12 AM
  • Hi,

    Honestly, I haven't tested AppFabric with WF 4.5, I have just read this thread http://social.msdn.microsoft.com/Forums/en/dublin/thread/3c6319ae-f5f3-471e-89e6-f61aeb30c1cd

    and I guessed that workflow versioning is not supported by AppFabric.

    This is our first project using WF. WF has been here since long time, but it lacked some critical features like versioning. WF 4.5 implement versioning this time and other interesting features, so we adopted it for a new development.

    Regarding Workflow Management Service, we finally went throught the simplest approach: calling the activation service every few seconds, and it seems to work great, completed delays after an app pool recicle wakes up the workflow and continue execution.

    Thank you.

    Friday, May 25, 2012 7:33 PM
  • Hi Jesus,

    The new WF 4.5 features are not supported by the AppFabric UI at this time; but generally speaking, WF 4.5 should work fine on AppFabric as .NET 4.5 is an in-place update to .NET 4.

    Thank you,

    Hani

    Friday, May 25, 2012 9:22 PM