Answered by:
Combining two partitioned streams into one stream

Question
-
Hi.
I am new to Azure eventhub. With event hub, we receive data from IoT device and the data are partitioned into two streams by assigning partition number "0" and " 1".
Reason why we need two streams is that one is needed for training "deep learning model" and another one is needed for testing the model we trained with new data coming in from the other side.
This is called "online learning".
However, in case where we do not have training model yet, we are not able to test it with the model, so instead of having two streams in this case, I would rather combine two partitioned streams into one so that there is no waste in data. Later once the model is created then we can have two streams back to test and train at the same time.
I could not find any module that enables to combine them in eventhub scripts. Any suggestions?
- Edited by Brian2004 Monday, November 25, 2019 12:34 AM
Monday, November 25, 2019 12:29 AM
Answers
-
Posting solution here for reference from the Stack overflow.
If you can add properties to the data during sending to event hub, then you can try the steps below.
1.We need to set 2 properties for each event data.
For test data, we can add the following 2 properties:
property_name: "category", its velue: "test", which is used to determine which kind of data you're receiving, like for test or for train.
property_name: "seqNum", its value is number, like 0,1,2,3, which is used to determine the sequence of the data.
And for train data, use the steps above, just change category value to "train".
I set these properties in c# code, looks like below. You can set it via your own way without c#:
for (var i = 0; i < numMessagesToSend; i++) { var message = "555 Message"; EventData mydata = new EventData(Encoding.UTF8.GetBytes(message)); //add properties mydata.Properties.Add("seqNum", i); mydata.Properties.Add("category", "test"); await eventHubClient.SendAsync(mydata); }
Then use the following python code to receive the data. Here, I define 2 dicts, one for store test data, another for store train data.
import logging import asyncio import os import sys import signal import functools from azure.eventprocessorhost import ( AbstractEventProcessor, AzureStorageCheckpointLeaseManager, EventHubConfig, EventProcessorHost, EPHOptions ) # define 2 dictionaries, to store test data and train data respectively. dict_test={} dict_train={} class EventProcessor(AbstractEventProcessor): def __init__(self, params=None): super().__init__(params) self._msg_counter = 0 async def open_async(self, context): print("Connection established {}".format(context.partition_id)) async def close_async(self, context, reason): print("Connection closed (reason {}, id {}, offset {}, sq_number {})".format( reason, context.partition_id, context.offset, context.sequence_number)) async def process_events_async(self, context, messages): for m in messages: data = m.body_as_str() if m.application_properties is not None: mycategory = m.application_properties.get(b'category').decode('utf-8') mysequence = str(m.application_properties.get(b'seqNum')) if mycategory == 'test': dict_test[mysequence]=data if mycategory == 'train': dict_train[mysequence]=data print("Received data: {}".format(data)) await context.checkpoint_async() async def process_error_async(self, context, error): print("Event Processor Error {!r}".format(error)) async def wait_and_close(host): await asyncio.sleep(60) await host.close_async() try: loop = asyncio.get_event_loop() # Storage Account Credentials STORAGE_ACCOUNT_NAME = "xxx" STORAGE_KEY = "xxxx" LEASE_CONTAINER_NAME = "xxx" NAMESPACE = "xxx" EVENTHUB = "xxx" USER = "RootManageSharedAccessKey" KEY = "xxxx" # Eventhub config and storage manager eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default") eh_options = EPHOptions() eh_options.release_pump_on_timeout = True eh_options.debug_trace = False storage_manager = AzureStorageCheckpointLeaseManager( STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME) # Event loop and host host = EventProcessorHost( EventProcessor, eh_config, storage_manager, ep_params=["param1","param2"], eph_options=eh_options, loop=loop) tasks = asyncio.gather( host.open_async(), wait_and_close(host)) loop.run_until_complete(tasks) print("***this is the data for test***") print(dict_test) print("***-----------------------***") print("***this is the data for train***") print(dict_train) except KeyboardInterrupt: # Canceling pending tasks and stopping the loop for task in asyncio.Task.all_tasks(): task.cancel() loop.run_forever() tasks.exception() finally: loop.stop()
The test result as below:
The last step, since the test data / train data are stored in dictionaries respectively, and the keys of the dict are the sequence number, you can write your code to operate the dict, rebuilt test data / train data in sequence.
- Edited by AshokPeddakotla-MSFTMicrosoft employee Friday, November 29, 2019 11:12 AM format
- Marked as answer by AshokPeddakotla-MSFTMicrosoft employee Friday, November 29, 2019 11:13 AM
Friday, November 29, 2019 11:10 AM
All replies
-
This can be implemented in different ways to consume messages from Event Hubs directly plus the option to use things like Streaming Analytics which are probably built on top of the two direct ways. The first way is the Event Hub Receiver, the second which is higher level is the Event Processor Host. Also, please continue your discussion on the Stack Overflow thread for further help and it will have visibility across the community which is a better suited audience for Event Hub related issues.
- Proposed as answer by AshokPeddakotla-MSFTMicrosoft employee Monday, November 25, 2019 10:17 AM
Monday, November 25, 2019 10:17 AM -
Posting solution here for reference from the Stack overflow.
If you can add properties to the data during sending to event hub, then you can try the steps below.
1.We need to set 2 properties for each event data.
For test data, we can add the following 2 properties:
property_name: "category", its velue: "test", which is used to determine which kind of data you're receiving, like for test or for train.
property_name: "seqNum", its value is number, like 0,1,2,3, which is used to determine the sequence of the data.
And for train data, use the steps above, just change category value to "train".
I set these properties in c# code, looks like below. You can set it via your own way without c#:
for (var i = 0; i < numMessagesToSend; i++) { var message = "555 Message"; EventData mydata = new EventData(Encoding.UTF8.GetBytes(message)); //add properties mydata.Properties.Add("seqNum", i); mydata.Properties.Add("category", "test"); await eventHubClient.SendAsync(mydata); }
Then use the following python code to receive the data. Here, I define 2 dicts, one for store test data, another for store train data.
import logging import asyncio import os import sys import signal import functools from azure.eventprocessorhost import ( AbstractEventProcessor, AzureStorageCheckpointLeaseManager, EventHubConfig, EventProcessorHost, EPHOptions ) # define 2 dictionaries, to store test data and train data respectively. dict_test={} dict_train={} class EventProcessor(AbstractEventProcessor): def __init__(self, params=None): super().__init__(params) self._msg_counter = 0 async def open_async(self, context): print("Connection established {}".format(context.partition_id)) async def close_async(self, context, reason): print("Connection closed (reason {}, id {}, offset {}, sq_number {})".format( reason, context.partition_id, context.offset, context.sequence_number)) async def process_events_async(self, context, messages): for m in messages: data = m.body_as_str() if m.application_properties is not None: mycategory = m.application_properties.get(b'category').decode('utf-8') mysequence = str(m.application_properties.get(b'seqNum')) if mycategory == 'test': dict_test[mysequence]=data if mycategory == 'train': dict_train[mysequence]=data print("Received data: {}".format(data)) await context.checkpoint_async() async def process_error_async(self, context, error): print("Event Processor Error {!r}".format(error)) async def wait_and_close(host): await asyncio.sleep(60) await host.close_async() try: loop = asyncio.get_event_loop() # Storage Account Credentials STORAGE_ACCOUNT_NAME = "xxx" STORAGE_KEY = "xxxx" LEASE_CONTAINER_NAME = "xxx" NAMESPACE = "xxx" EVENTHUB = "xxx" USER = "RootManageSharedAccessKey" KEY = "xxxx" # Eventhub config and storage manager eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default") eh_options = EPHOptions() eh_options.release_pump_on_timeout = True eh_options.debug_trace = False storage_manager = AzureStorageCheckpointLeaseManager( STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME) # Event loop and host host = EventProcessorHost( EventProcessor, eh_config, storage_manager, ep_params=["param1","param2"], eph_options=eh_options, loop=loop) tasks = asyncio.gather( host.open_async(), wait_and_close(host)) loop.run_until_complete(tasks) print("***this is the data for test***") print(dict_test) print("***-----------------------***") print("***this is the data for train***") print(dict_train) except KeyboardInterrupt: # Canceling pending tasks and stopping the loop for task in asyncio.Task.all_tasks(): task.cancel() loop.run_forever() tasks.exception() finally: loop.stop()
The test result as below:
The last step, since the test data / train data are stored in dictionaries respectively, and the keys of the dict are the sequence number, you can write your code to operate the dict, rebuilt test data / train data in sequence.
- Edited by AshokPeddakotla-MSFTMicrosoft employee Friday, November 29, 2019 11:12 AM format
- Marked as answer by AshokPeddakotla-MSFTMicrosoft employee Friday, November 29, 2019 11:13 AM
Friday, November 29, 2019 11:10 AM -
Could you specify the first way using Event Hub Receiver? Did you mean adding each receiver for each partition?Wednesday, December 4, 2019 3:05 AM