locked
Combining two partitioned streams into one stream RRS feed

  • Question

  • Hi.

    I am new to Azure eventhub. With event hub, we receive data from IoT device and the data are partitioned into two streams by assigning partition number "0" and " 1". 

    Reason why we need two streams is that one is needed for training "deep learning model" and another one is needed for testing the model we trained with new data coming in from the other side. 

    This is called "online learning". 

    However, in case where we do not have training model yet, we are not able to test it with the model, so instead of having two streams in this case, I would rather combine two partitioned streams into one so that there is no waste in data. Later once the model is created then we can have two streams back to test and train at the same time.

    I could not find any module that enables to combine them in eventhub scripts. Any suggestions? 



    • Edited by Brian2004 Monday, November 25, 2019 12:34 AM
    Monday, November 25, 2019 12:29 AM

Answers

  • Posting solution here for reference from the Stack overflow.

    If you can add properties to the data during sending to event hub, then you can try the steps below.

    1.We need to set 2 properties for each event data.

    For test data, we can add the following 2 properties:

    property_name: "category", its velue: "test", which is used to determine which kind of data you're receiving, like for test or for train.

    property_name: "seqNum", its value is number, like 0,1,2,3, which is used to determine the sequence of the data.

    And for train data, use the steps above, just change category value to "train".

    I set these properties in c# code, looks like below. You can set it via your own way without c#:

    for (var i = 0; i < numMessagesToSend; i++)
            {                
                    var message = "555 Message";
                    EventData mydata = new EventData(Encoding.UTF8.GetBytes(message));
    
                    //add properties
                    mydata.Properties.Add("seqNum", i);
                    mydata.Properties.Add("category", "test");
                    await eventHubClient.SendAsync(mydata);
    
             }

    Then use the following python code to receive the data. Here, I define 2 dicts, one for store test data, another for store train data.

    import logging
    import asyncio
    import os
    import sys
    import signal
    import functools
    
    from azure.eventprocessorhost import (
        AbstractEventProcessor,
        AzureStorageCheckpointLeaseManager,
        EventHubConfig,
        EventProcessorHost,
        EPHOptions
    )
    
    # define 2 dictionaries, to store test data and train data respectively.
    dict_test={}
    dict_train={}
    
    class EventProcessor(AbstractEventProcessor):
    
        def __init__(self, params=None):       
            super().__init__(params)
            self._msg_counter = 0
    
        async def open_async(self, context):        
            print("Connection established {}".format(context.partition_id))
    
        async def close_async(self, context, reason):
    
            print("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
                reason,
                context.partition_id,
                context.offset,
                context.sequence_number))
    
        async def process_events_async(self, context, messages):
    
            for m in messages:
                data = m.body_as_str()
                if m.application_properties is not None:
                    mycategory = m.application_properties.get(b'category').decode('utf-8')
                    mysequence = str(m.application_properties.get(b'seqNum'))                
    
                    if mycategory == 'test':
                        dict_test[mysequence]=data
    
                    if mycategory == 'train':
                        dict_train[mysequence]=data
    
                    print("Received data: {}".format(data))
            await context.checkpoint_async()
    
        async def process_error_async(self, context, error):
    
            print("Event Processor Error {!r}".format(error))
    
    
    async def wait_and_close(host):
    
        await asyncio.sleep(60)
        await host.close_async()
    
    try:
        loop = asyncio.get_event_loop()
    
        # Storage Account Credentials
        STORAGE_ACCOUNT_NAME = "xxx"
        STORAGE_KEY = "xxxx"
        LEASE_CONTAINER_NAME = "xxx"
        NAMESPACE = "xxx"
        EVENTHUB = "xxx"
        USER = "RootManageSharedAccessKey"
        KEY = "xxxx"
    
        # Eventhub config and storage manager 
        eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default")
        eh_options = EPHOptions()
        eh_options.release_pump_on_timeout = True
        eh_options.debug_trace = False
        storage_manager = AzureStorageCheckpointLeaseManager(
            STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME)
    
        # Event loop and host
        host = EventProcessorHost(
            EventProcessor,
            eh_config,
            storage_manager,
            ep_params=["param1","param2"],
            eph_options=eh_options,
            loop=loop)
    
    
    
        tasks = asyncio.gather(
            host.open_async(),
            wait_and_close(host))
        loop.run_until_complete(tasks)
    
        print("***this is the data for test***")
        print(dict_test)
        print("***-----------------------***")
        print("***this is the data for train***")
        print(dict_train)
    
    except KeyboardInterrupt:
        # Canceling pending tasks and stopping the loop
        for task in asyncio.Task.all_tasks():
            task.cancel()
        loop.run_forever()
        tasks.exception()
    
    finally:
        loop.stop()

    The test result as below:

    enter image description here

    The last step, since the test data / train data are stored in dictionaries respectively, and the keys of the dict are the sequence number, you can write your code to operate the dict, rebuilt test data / train data in sequence.

    Friday, November 29, 2019 11:10 AM

All replies

  • This can be implemented in different ways to consume messages from Event Hubs directly plus the option to use things like Streaming Analytics which are probably built on top of the two direct ways. The first way is the Event Hub Receiver, the second which is higher level is the Event Processor Host. Also, please continue your discussion on the Stack Overflow thread for further help and it will have visibility across the community which is a better suited audience for Event Hub related issues.


    Monday, November 25, 2019 10:17 AM
  • Posting solution here for reference from the Stack overflow.

    If you can add properties to the data during sending to event hub, then you can try the steps below.

    1.We need to set 2 properties for each event data.

    For test data, we can add the following 2 properties:

    property_name: "category", its velue: "test", which is used to determine which kind of data you're receiving, like for test or for train.

    property_name: "seqNum", its value is number, like 0,1,2,3, which is used to determine the sequence of the data.

    And for train data, use the steps above, just change category value to "train".

    I set these properties in c# code, looks like below. You can set it via your own way without c#:

    for (var i = 0; i < numMessagesToSend; i++)
            {                
                    var message = "555 Message";
                    EventData mydata = new EventData(Encoding.UTF8.GetBytes(message));
    
                    //add properties
                    mydata.Properties.Add("seqNum", i);
                    mydata.Properties.Add("category", "test");
                    await eventHubClient.SendAsync(mydata);
    
             }

    Then use the following python code to receive the data. Here, I define 2 dicts, one for store test data, another for store train data.

    import logging
    import asyncio
    import os
    import sys
    import signal
    import functools
    
    from azure.eventprocessorhost import (
        AbstractEventProcessor,
        AzureStorageCheckpointLeaseManager,
        EventHubConfig,
        EventProcessorHost,
        EPHOptions
    )
    
    # define 2 dictionaries, to store test data and train data respectively.
    dict_test={}
    dict_train={}
    
    class EventProcessor(AbstractEventProcessor):
    
        def __init__(self, params=None):       
            super().__init__(params)
            self._msg_counter = 0
    
        async def open_async(self, context):        
            print("Connection established {}".format(context.partition_id))
    
        async def close_async(self, context, reason):
    
            print("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
                reason,
                context.partition_id,
                context.offset,
                context.sequence_number))
    
        async def process_events_async(self, context, messages):
    
            for m in messages:
                data = m.body_as_str()
                if m.application_properties is not None:
                    mycategory = m.application_properties.get(b'category').decode('utf-8')
                    mysequence = str(m.application_properties.get(b'seqNum'))                
    
                    if mycategory == 'test':
                        dict_test[mysequence]=data
    
                    if mycategory == 'train':
                        dict_train[mysequence]=data
    
                    print("Received data: {}".format(data))
            await context.checkpoint_async()
    
        async def process_error_async(self, context, error):
    
            print("Event Processor Error {!r}".format(error))
    
    
    async def wait_and_close(host):
    
        await asyncio.sleep(60)
        await host.close_async()
    
    try:
        loop = asyncio.get_event_loop()
    
        # Storage Account Credentials
        STORAGE_ACCOUNT_NAME = "xxx"
        STORAGE_KEY = "xxxx"
        LEASE_CONTAINER_NAME = "xxx"
        NAMESPACE = "xxx"
        EVENTHUB = "xxx"
        USER = "RootManageSharedAccessKey"
        KEY = "xxxx"
    
        # Eventhub config and storage manager 
        eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default")
        eh_options = EPHOptions()
        eh_options.release_pump_on_timeout = True
        eh_options.debug_trace = False
        storage_manager = AzureStorageCheckpointLeaseManager(
            STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME)
    
        # Event loop and host
        host = EventProcessorHost(
            EventProcessor,
            eh_config,
            storage_manager,
            ep_params=["param1","param2"],
            eph_options=eh_options,
            loop=loop)
    
    
    
        tasks = asyncio.gather(
            host.open_async(),
            wait_and_close(host))
        loop.run_until_complete(tasks)
    
        print("***this is the data for test***")
        print(dict_test)
        print("***-----------------------***")
        print("***this is the data for train***")
        print(dict_train)
    
    except KeyboardInterrupt:
        # Canceling pending tasks and stopping the loop
        for task in asyncio.Task.all_tasks():
            task.cancel()
        loop.run_forever()
        tasks.exception()
    
    finally:
        loop.stop()

    The test result as below:

    enter image description here

    The last step, since the test data / train data are stored in dictionaries respectively, and the keys of the dict are the sequence number, you can write your code to operate the dict, rebuilt test data / train data in sequence.

    Friday, November 29, 2019 11:10 AM
  • Could you specify the first way using Event Hub Receiver? Did you mean adding each receiver for each partition?
    Wednesday, December 4, 2019 3:05 AM