none
[Python][Azure Storage] Python storage SDK batch_commit not working with multiprocessing or multithreading RRS feed

  • Question

  • We are testing how Python SDK for Azure Storage would suit our needs for using the Table, but we ran into apparently some problem with the SDK and multiprocessing/multithreading (they both give identical error). Here is our test code in short:
    _____________________________________________
    from azure.storage import TableService, Entity
    from multiprocessing import Process
    import logging

    logging.basicConfig(level=logging.DEBUG,
                        format='(%(threadName)-10s) %(message)s',
                        )

    def WriteGenomedata(start=0, end=1000):

      partkey = "testpartition"
      ifile = "test_genomedata.txt"

      storage_access_key = "XXXX"
      storage_name = "testtable"

      logging.debug("started a new writer from %d to %d", start, end)

      # loop all rows and write each to table
      counter = 0
      table_service = TableService(account_name=storage_name, account_key=storage_access_key)
      logging.debug("table_service is %s", type(wr))

      row = Entity()
      row.PartitionKey = partkey
      gfile = open(ifile, 'r')
      for line in gfile:
        # we insert lines in batches of 99 entities
        if counter%99 == 0:
          if counter > start:
            logging.debug("committing until line %d", counter)
            table_service.commit_batch()
          table_service.begin_batch()

        vals = line.split("\t")
        rsid = vals[0]
        chromosome = vals[1]
        positio = vals[2]
        genotype = vals[3]

        row.RowKey = rsid

        row.chromosome = str(chromosome)
        row.position = str(positio)
        row.genotype = genotype

        table_service.insert_entity("test23andme", row)

        counter += 1

        if (end > 0) and (counter >= end):
          break

      table_service.commit_batch()

    wr = Process(target=WriteGenomedata,args=(0, 500))
    logging.debug("wr is %s", type(wr))
    wr.start()

    WriteGenomedata(500, 1000)
    ----------------------------------------------------


    this produces:
    ___________________________________
    me@testserver:~/test$ python threadtesti.py
    (MainThread) wr is <class 'multiprocessing.process.Process'>
    (MainThread) started a new writer from 500 to 1000
    (MainThread) table_service is <class 'multiprocessing.process.Process'>
    (MainThread) started a new writer from 0 to 500
    (MainThread) table_service is <class 'multiprocessing.process.Process'>
    (MainThread) committing until line 99
    Traceback (most recent call last):
      File "threadtesti.py", line 82, in <module>
        WriteGenomedata(1787, 5381)
      File "threadtesti.py", line 65, in WriteGenomedata
        table_service.commit_batch()
      File "/usr/local/lib/python2.7/dist-packages/azure/storage/tableservice.py", line 156, in commit_batch
        ret = self._batchclient.commit_batch()
    AttributeError: 'NoneType' object has no attribute 'commit_batch'
    ---------------------------------------------------------
    completely identical error comes if we try this with multithreading. It seems to work with adding each row individually without batch commit. Is there a simple fix for this? We were not able to find any..



    Wednesday, July 8, 2015 11:26 AM

Answers

  • Hi Sir,

    You need add your executed function into main method:

    if __name__ == '__main__':
    

    I changed your code as following and You need make sure your have the corresponding table on Table Storage.

    def mutipleProcess():
        wr = Process(target=WriteGenomedata,args=(0, 10))
        logging.debug("wr is %s", type(wr))
        wr.start()
        #WriteGenomedata(0, 10)
    if __name__ == '__main__':
        freeze_support()
        mutipleProcess()

    SO I suggest you can try this code sample:

    from azure.storage import TableService, Entity from multiprocessing import Process from multiprocessing import freeze_support import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) def WriteGenomedata(start=0, end=1000): partkey = "partitionkeyname" ifile = "*.txt" storage_access_key = "*+*" storage_name = "storagename" logging.debug("started a new writer from %d to %d", start, end) counter = 0 table_service = TableService(account_name=storage_name, account_key=storage_access_key) row = Entity() row.PartitionKey = partkey gfile = open(ifile, 'r') for line in gfile: if counter%3 == 0: if counter > start: logging.debug("committing until line %d", counter) table_service.commit_batch() table_service.begin_batch() vals = line.split("\t") rsid = vals[0] chromosome = vals[1] positio = vals[2] genotype = vals[3] row.RowKey = rsid row.chromosome = str(chromosome) row.position = str(positio) row.genotype = genotype

    #make sure you have this table table_service.insert_entity("tablename", row) counter += 1 if (end > 0) and (counter >= end): break table_service.commit_batch() def mutipleProcess(): wr = Process(target=WriteGenomedata,args=(0, 10)) logging.debug("wr is %s", type(wr)) wr.start() #WriteGenomedata(0, 10) if __name__ == '__main__': freeze_support() mutipleProcess()


    Please try it. Any questions, please feel free to let me know.

    Regards,
    Will


    We are trying to better understand customer views on social support experience, so your participation in this interview project would be greatly appreciated if you have time. Thanks for helping make community forums a great place.
    Click HERE to participate the survey.

    Thursday, July 23, 2015 5:43 AM
    Moderator

All replies

  • Hi,

    I'm checking on the thread and will get back to you with required information.

    Your patience is greatly appreciated.

    Thank you,
    Manu

    Thursday, July 9, 2015 10:38 AM
  • Hi Sir,

    You need add your executed function into main method:

    if __name__ == '__main__':
    

    I changed your code as following and You need make sure your have the corresponding table on Table Storage.

    def mutipleProcess():
        wr = Process(target=WriteGenomedata,args=(0, 10))
        logging.debug("wr is %s", type(wr))
        wr.start()
        #WriteGenomedata(0, 10)
    if __name__ == '__main__':
        freeze_support()
        mutipleProcess()

    SO I suggest you can try this code sample:

    from azure.storage import TableService, Entity from multiprocessing import Process from multiprocessing import freeze_support import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) def WriteGenomedata(start=0, end=1000): partkey = "partitionkeyname" ifile = "*.txt" storage_access_key = "*+*" storage_name = "storagename" logging.debug("started a new writer from %d to %d", start, end) counter = 0 table_service = TableService(account_name=storage_name, account_key=storage_access_key) row = Entity() row.PartitionKey = partkey gfile = open(ifile, 'r') for line in gfile: if counter%3 == 0: if counter > start: logging.debug("committing until line %d", counter) table_service.commit_batch() table_service.begin_batch() vals = line.split("\t") rsid = vals[0] chromosome = vals[1] positio = vals[2] genotype = vals[3] row.RowKey = rsid row.chromosome = str(chromosome) row.position = str(positio) row.genotype = genotype

    #make sure you have this table table_service.insert_entity("tablename", row) counter += 1 if (end > 0) and (counter >= end): break table_service.commit_batch() def mutipleProcess(): wr = Process(target=WriteGenomedata,args=(0, 10)) logging.debug("wr is %s", type(wr)) wr.start() #WriteGenomedata(0, 10) if __name__ == '__main__': freeze_support() mutipleProcess()


    Please try it. Any questions, please feel free to let me know.

    Regards,
    Will


    We are trying to better understand customer views on social support experience, so your participation in this interview project would be greatly appreciated if you have time. Thanks for helping make community forums a great place.
    Click HERE to participate the survey.

    Thursday, July 23, 2015 5:43 AM
    Moderator