none
Multi-Threaded access to rows RRS feed

  • Question

  • Hi, we have a bunch of items "queued" in a table. Various backend-q-processors (QP) select items from this table depending on item-state. This works fine as long as these QP are not running on multiple machines or in multiple threads.

     

    Why? because each QP must only process items not currently beim processed by another instance of the same QP.

     

    That's why we implement a sort of temporary lock within a transaction which is rolled-back, as soon as the QP has finished processing the items. This eliminats the need for a cleanup-job and enables dynamic scaling of QPs as needed.

     

    In detail each QP calles a SP which returns the next n items from the queue with the specified status, which are NOT already found in a locking-table. for this SP to work, it must be able to read uncommited rows in the locking-table.

     

    While the transaction is being held, the QP would process the item and UPDATE the STATUS of it (in another connection) using TransactionScope(Suppress) (just to make sure), so the status-update is not rolled-back with the release of the lock.

     

    This works fine in Unit-Tests, until we switch from ThreadPool to creating threads manually (in our production env. we host ONE QP per Machine, so ThreadPool is not an option) the items are sometimes being processed multiple times by the various QPs and we have - after extensive tests - not been able to find the reason.

     

    We have been able to extract the problem to a unit-test (see below) with which I hope someone is able to tell us a solution. Is there no common practice for this kind of work?

     

    Code Block

    private String erpGateDBConnectionString = null;

    public String ErpGateDBConnectionString {

    get {

    if (erpGateDBConnectionString == null)

    erpGateDBConnectionString = "Data Source=.\\SQLExpress;Initial Catalog=ErpGate_DB;Integrated Security=True;Pooling=false;Connect Timeout=30;";

    return erpGateDBConnectionString;

    }

    set {

    erpGateDBConnectionString = value;

    }

    }

     

    [TestMethod()]

    public void InnerMethodOutsideTransScope() {

    this.TestContext.WriteLine("Using Connection: {0}", this.ErpGateDBConnectionString);

    SqlConnection conn = new SqlConnection(this.ErpGateDBConnectionString);

    conn.Open();

    new SqlCommand("create table ##QLock (id int primary key, lockedBy int NULL);", conn).ExecuteNonQuery();

    new SqlCommand("create table ##UnitTestTrans (id int identity(1,1) primary key, status int, touched int);", conn).ExecuteNonQuery();

    new SqlCommand("declare @i int; set @i = 0; while (@i < 54) begin insert into ##UnitTestTrans values (1, 0); set @i = @i + 1; end", conn).ExecuteNonQuery();

    bool failed = false;

    try {

    int maxThreads = 20;

    Thread[] t = new Thread[maxThreads];

    for (Int32 i = 0; i < t.Length; i++) {

    t[i] = new Thread(ProcessItems);

    }

    //start threads

    for (Int32 i = 0; i < t.Length; i++) {

    t[i].Start();

    }

    //wait for threads

    for (Int32 i = 0; i < t.Length; i++) {

    t[i].Join();

    }

    SqlCommand cmd = new SqlCommand("select id, status, touched from ##UnitTestTrans;", conn);

    cmd.CommandTimeout = 2;

    SqlDataReader rdr = cmd.ExecuteReader();

    while (rdr.Read()) {

    this.TestContext.WriteLine("Row {0} Status={1} Touched={2}", rdr[0], rdr[1], rdr[2]);

    if ((int)rdr[2] > 1) failed = true;

    }

    rdr.Close();

    }

    finally {

    this.TestContext.WriteLine("Removing temp-tables");

    new SqlCommand("drop table ##QLock; drop table ##UnitTestTrans;", conn).ExecuteNonQuery();

    }

    conn.Close();

    Assert.IsFalse(failed, "At least one row got touched more than once!");

    }

    void ProcessItems(object state) {

    SqlConnection conn = new SqlConnection(this.ErpGateDBConnectionString);

    SqlCommand cmd = new SqlCommand("SET TRANSACTION ISOLATION LEVEL READ COMMITTED; INSERT INTO ##QLock SELECT TOP 3 id, @lockedBy FROM ##UnitTestTrans where status = 1 and id not in (select id from ##QLock WITH (NOLOCK)); select id from ##QLock WITH (NOLOCK) where lockedBy = @lockedBy", conn);

    cmd.Parameters.AddWithValue("@lockedBy", Thread.CurrentThread.ManagedThreadId);

    cmd.CommandTimeout = 2;

    int itemsProcessed = 0;

    try {

    using (TransactionScope scope = new TransactionScope()) {

    conn.Open();

    this.TestContext.WriteLine(" :: Thread {0} is retrieving items...", Thread.CurrentThread.ManagedThreadId);

    try {

    SqlDataReader rdr = cmd.ExecuteReader();

    while (rdr.Read()) {

    ProcessItem((int)rdr[0]);

    itemsProcessed++;

    }

    }

    catch (Exception err) {

    this.TestContext.WriteLine(" :: ERROR retrieving items: {0}", err.ToString());

    return;

    }

    conn.Close();

    } //implicit rollback!

    }

    finally {

    this.TestContext.WriteLine("Thread {0} processed {1} items. Terminating gracefully.", Thread.CurrentThread.ManagedThreadId, itemsProcessed);

    }

    }

    private void ProcessItem(int itemID) {

    this.TestContext.WriteLine(" :: Processing item {0} in thread {1}...", itemID, Thread.CurrentThread.ManagedThreadId);

    Thread.Sleep(100); //simulate work

    UpdateStatus(itemID, 2);

    }

    private void UpdateStatus(int itemID, int newStatus) {

    this.TestContext.WriteLine(" :: Setting status of item {0} to {2} in thread {1}...", itemID, Thread.CurrentThread.ManagedThreadId, newStatus);

    SqlConnection conn = new SqlConnection(this.ErpGateDBConnectionString);

    SqlCommand cmd = new SqlCommand("update ##UnitTestTrans set status = @newStatus, touched=touched+1 where id = @itemID", conn);

    cmd.CommandTimeout = 2;

    cmd.Parameters.AddWithValue("@newStatus", newStatus);

    cmd.Parameters.AddWithValue("@itemID", itemID);

    using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Suppress)) {

    conn.Open();

    cmd.ExecuteNonQuery();

    conn.Close();

    scope.Complete();

    }

    }

     

     

    Monday, November 19, 2007 9:18 AM

All replies

  • Hi,

    I'm not sure I understand your problem. Why isn't it sufficient for the threads to call a stored proc that within a transaction selects the work to be done (excluding any records currently being processed) and sets a processing flag on those rows.  You then commit the transaction and you are sure to optain only rows that were never processed.

    regards,
    Charles
    Sunday, December 2, 2007 7:56 PM
  •  

    2 reasons:

     

    1. who cleans up the flag if the owning process dies "ungracefully"?

    2. I should not have this problem, wether keeping the transaction open or committing it. Even if I were to commit it, the same situation could arise.

     

    We did find a "version" of the code which seems to work, but it's rather unsatisfying why this code fails in the first place!

    Monday, December 3, 2007 8:37 AM