locked
Consuming Event Hub messages with the IEventProcessor interface RRS feed

  • Question

  • Hi - 

    It seems that I am doing something wrong when I try to consume messages off of an Event Hub using the EventProcessorHost with an implemented IEventProcessor Interface.

    If I manually create a receiver, I can pull messages of my hub. Example (just creating a receiver on partition "1"):

    let consumer = ehClientrec.GetDefaultConsumerGroup().CreateReceiverAsync("1").Result
    
        let msg = consumer.ReceiveAsync()
        msg.Wait()
        Console.WriteLine(Encoding.UTF8.GetString(msg.Result.GetBytes()))

    This successfully prints a message to my console. 

    But if I try to use the event processor host, first I (very simply) implement the IEventProcessor interface:

    type msgProc() = 
        interface IEventProcessor with
            member this.CloseAsync(context,reason) =
                let r = reason.ToString()
                Console.WriteLine("Closed Partition {0} for reason: {1}",context.Lease.PartitionId,r)
                Unchecked.defaultof<Task>
            member this.OpenAsync(context) =
                Console.WriteLine("Opened Partition {0}",context.Lease.PartitionId)
                Unchecked.defaultof<Task>
            member this.ProcessEventsAsync(context,msgs) = 
                Console.WriteLine("Received message.")
                Unchecked.defaultof<Task>

    Then I create the host and register it, and run the program. 

    I never hit the ProcessEventsAsync method.

    Instead, I see console output that shows the Partitions cyclically opening and closing:

    "Opened Partition 1"
    "Opened Partition 2"
    "Opened Partition 4"
    "Opened Partition 3"
    "Opened Partition 0"
    "Opened Partition 5"
    "Opened Partition 7"
    "Opened Partition 6"
    
    "Closed Partition 3 for reason: Shutdown"
    "Closed Partition 6 for reason: Shutdown"
    "Closed Partition 7 for reason: Shutdown"
    "Closed Partition 2 for reason: Shutdown"
    "Closed Partition 0 for reason: Shutdown"
    "Closed Partition 1 for reason: Shutdown"
    "Closed Partition 4 for reason: Shutdown"
    "Closed Partition 5 for reason: Shutdown"
    

    (This repeats forever, with the opening and closing partitions appearing in a slightly different order each time).

    I can't find any documentation on what conditions typically cause a shutdown or what would cause this behavior. Any ideas?

    I'm basically positive my EventProcessorHost is configured properly.

    Thanks.

    Thursday, March 19, 2015 4:04 PM

Answers

  • Figured it out. A bit of a silly problem. 

    member this.OpenAsync(context) =
                Console.WriteLine("Opened Partition {0}",context.Lease.PartitionId)
                Unchecked.defaultof<Task>

    The task coming out of here was never running, which was causing OpenAsync to fail (possibly time out). For some reason this was not causing anything to surface with the error handler. Switched to 

    Task.Run(fun () -> ())
    
    and I'm off and running...

    • Marked as answer by M_Bradford Tuesday, April 14, 2015 7:36 PM
    Tuesday, April 14, 2015 7:36 PM

All replies

  • Can you register error handler and see if you get anything that might shed some light? Can you also share the code where you register event processor? I wonder if you are using processor host or registering on the consumer group.
    Thursday, March 19, 2015 5:10 PM
  • Sure. I'll set up an error handler right now. In the meantime, code where I register event processor:

    let connectionstring = "Endpoint=sb://EXAMPLE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=ACCESSKEY;TransportType=Amqp"
    let EventHubReceiverClient = EventHubClient.CreateFromConnectionString(connectionstring,"testEventHub")
    let storageKey = "DefaultEndpointsProtocol=https;AccountName=STORAGETEST;AccountKey=STORAGEKEY"
    let consumerGroup = EventHubReceiverClient.GetDefaultConsumerGroup()
    let EPHost = EventProcessorHost("singleworker",EventHubReceiverClient.Path,consumerGroup.GroupName,connectionstring,storageKey)

    EPhost.RegisterEventProcessorAsync<msgProc>()

    How would I register the event processor on the consumer group anyway?

    Also I just found "PartitionManagerOptions" on an EventProcessorHost which seems like it could bear on this issue. However, in all the sample code I read and run, there is no reference to this.


    • Edited by M_Bradford Thursday, March 19, 2015 5:52 PM
    Thursday, March 19, 2015 5:47 PM
  • The register code you pasted seems OK to me.

    For EventProcessor on consumer group you can check APIs from https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.eventhubconsumergroup.aspx?f=255&MSPPError=-2147217396

    All *register* APIs are related.

    Thursday, March 19, 2015 6:02 PM
  • What exactly what you mean by registering an error handler (besides putting exception handlers at each critical step in my code, and making sure none of the Tasks I run result in a faulted state)? Having done all of those things, I'm not catching any exceptions or seeing anything mysterious.

    I can get the C# sample code (https://code.msdn.microsoft.com/windowsazure/Service-Bus-Event-Hub-286fd097) to read off of the same Event Hub using an EventProcessor, but it's not a very satisfying answer to have to use that code any time I want to consume from an Event Hub.

    I haven't finished trying registering an EventProcessor on a consumer group, mostly because this option is taking me a lot longer to set up than the first strategy (using a processor host). 

    If I were just using event hubs, I could route everything through one partition and use a manual receiver (as I set up in my first post), but I'm actually publishing to this Event Hub from a Stream Analytics job, which I can't seem to coerce to just one partition.



    • Edited by M_Bradford Thursday, March 19, 2015 7:12 PM
    Thursday, March 19, 2015 7:11 PM
  • EventProcessorOptions has an event handler named ExceptionReceived. https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.eventprocessoroptions.exceptionreceived.aspx

    You can register your handler on the event as below.

    var epo = new EventProcessorOptions(); epo.ExceptionReceived += HandleException; void HandleException(object obj, ExceptionReceivedEventArgs args) {
    Console.Writeline("Exception handled at action {0}: {1}", args.Action, args.Exception.ToString()); }


    Thursday, March 19, 2015 7:21 PM
  • Didn't catch anything with that handler. This is very mysterious. 

    I will keep trying different things and will post if I have any other questions or fixes. 

    Thanks for the help.

    Thursday, March 19, 2015 7:49 PM
  • Figured it out. A bit of a silly problem. 

    member this.OpenAsync(context) =
                Console.WriteLine("Opened Partition {0}",context.Lease.PartitionId)
                Unchecked.defaultof<Task>

    The task coming out of here was never running, which was causing OpenAsync to fail (possibly time out). For some reason this was not causing anything to surface with the error handler. Switched to 

    Task.Run(fun () -> ())
    
    and I'm off and running...

    • Marked as answer by M_Bradford Tuesday, April 14, 2015 7:36 PM
    Tuesday, April 14, 2015 7:36 PM