none
Capture chunked http stream data from EventHub in Azure Data Lake Store

    Question

  • Hi,

    I am consuming a streaming API using Python and trying to send it to an EventHub which has a capture setup using Azure Data Lake Store.

    I am able to do the setup for the demo suggested in MSDN in here : 

    https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-python-get-started-send

    This I see keeps creating a 508 bytes file

    

    which looks like this

    But it did contain another file which had the actual data (the dummy 100 messages)

    And I was able to query the data in that avro file (after downloading it for a test) using the code:

    import avro.schema
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    reader = DataFileReader(open("C:\\Users\\abc\\Downloads\\17.avro", "rb"), DatumReader())
    for user in reader:
       print (user)
    reader.close()

    So, all fine there.

    However when I try to send chunked data to it- object of AioHttp.StreamReader class, it does not fail but when I go and check the Data Lake Store, I see a whole bunch of data folders but each one of those folders has only one single "avro" file (the 508 bytes file) , the same as in my first screenshot.

    I understand that the avro file will contain both the schema and the data, but why does it keep creating that 508 bytes file in separate folders, even when I am not sending any data to the event hub (even in the demo sample from MSDN). I then turned the data capture off in Event hub and it stopped.

    And how do I send chunked data , so that it is correctly captured in ADLS. Mind you the program does not complain about any error.

    Here is the sample code snippet that I am using

    async def fetch(session, url, headers,sender):
        with async_timeout.timeout(None):
            async with session.get(init_res.headers['Location'], headers=headers, proxy="http://127.0.0.1:8888", allow_redirects=False,timeout=None) as r:
                while True:
                    chunk=await r.content.read(1024*3)
                    if not chunk:
                        break                    
                    sender.send(EventData(chunk))
    
    async def main(url, headers,sender):
        async with aiohttp.ClientSession() as session:
            html = await fetch(session, url,headers,sender)

    In the caller:

    client = EventHubClient(EventHubAddress, debug=True,username=USER, password=KEY)
    
    sender = client.add_sender(partition=None)
    client.run()
    ......
    ......
    loop = asyncio.get_event_loop()
            loop.run_until_complete(main(loc, headers=headers,sender=sender))

    How do I make ADLS get this chunked data?


    Update: I have also tried receiving data directly from the Event hub in Python and it does show me the data that I pushed. So, why is the capture to ADLS not working.
    Tuesday, December 11, 2018 3:22 PM

All replies

  • Ok, I figured it out. 

    The incoming stream I had was a json. So, it seems I had to specifically send only fully formed json (s) to Event Hub and then the Data Capture to Azure Data Lake Store worked. This of course slows down the sending of the stream to the event hub. So, this is what I had to do:

    async def fetch(session, url, headers,sender):
        with async_timeout.timeout(None):
            async with session.get(init_res.headers['Location'], headers=headers, proxy="http://127.0.0.1:8888", allow_redirects=False,timeout=None) as r:
                buff=""
                while True:
                    chunk=await r.content.read(1024*3)
                    if not chunk:
                        break                     
                    else:
                        chunk=chunk.decode("utf-8")
                        ind=chunk.find("\r\n")
                        if ind==-1:
                            buff=buff + chunk
                        else:
                            buff= buff + chunk[0:ind]
                            print(buff)
                            sender.send(EventData(buff))
                            buff=""
                            buff= buff + chunk[ind+2:]
    
    async def main(url, headers,sender):
        async with aiohttp.ClientSession() as session:
            html = await fetch(session, url,headers,sender)

    So, here I am checking for CRLF (that's how one message from the API is determined) and then send it to Event Hub. I am wondering, why can't Data Lake store pick up data from EventHub. Does it really have to have a well formed json? Should it not act like just a dump in this case? 

    Anyways, works now with the code above (though slower), but something is better than nothing.

    If someone knows a better way to do this , I am all ears and eyes.


    Wednesday, December 12, 2018 1:55 PM
  • Glad to hear that you have found a workaround. Appreciate for sharing the information, this would certainly benefit other community members.
    Wednesday, December 12, 2018 2:10 PM
    Moderator
  • Hi,

    After having worked a little more, I found out something and thus thought I should share it. My previous inference that the json had to be a well formed one, was incorrect. In fact, the capture functionality captures everything (even ill formed json), which is how it should be as the capture's whole purpose is to capture everything sent to the event hub.

    The reason it seems, I was not able to see the files in ADLS was because of the file size (and/or interval period), that you specify when defining the capture on the Event Hub.

    However, it is of course better (or I would say imperative) to write well formed jsons to ADLS, else it will be a pain to read the files downstream, which I found out while reading them through Databricks..

    Finally the reason why I see so many 508 bytes files is because the capture keeps writing to ADLS (even empty files with just the schema) at the end of the interval period (or the file size in case of data), specified while defining the capture. This is documented in Microsoft's documentation saying this enables us to figure out that the capture is still working.


    Thursday, January 31, 2019 8:30 AM