Not logged in - Login

Executing X tasks, in Y threads at a time

This example illustrates how to run a total of X tasks, using a total of Y threads.

public class ThreadTest
{
   // declare some member variables
   private int m_lTotalItems = 0;
   private int m_lTotalThreads = 0;
   private int m_lThreadsProcessing = 0;
   private int m_lThreadCompleted = 0;
   private object m_oThreadLock = 0;   // object to use for locking
   private Tuple<int, int>[] m_threadItems = null;

   private Action m_action = null;

   public void StartThreadingSample(Action action)
   {
      // set the action method to fire when all the processes are complete.
      this.m_action = action;

      // determine the number of total thread that need to be processed.
      this.m_lTotalItems = 15;

      // determine the number of items that we can process simultaneously.
      this.m_lTotalThreads = 5;

      this.m_threadItems = new Tuple<int, int>[this.m_lTotalItems];

      this.AddToOutput(string.Format("Total tasks: {0}, total threds: {1}", this.m_lTotalItems, this.m_lTotalThreads));

      // create an instance of the random class
      Random rnd = new Random();
      // build the items.
      for (int x = 0; x < this.m_lTotalItems; x++)
      {
         // get a random time that this task will take to complete, between 5 & 15 seconds.
         int l2 = rnd.Next(10000) + 5000;

         // create the item and add it to our array.
         this.m_threadItems[x] = new Tuple<int, int>(x, l2);
      }

      lock (this.m_oThreadLock)
      {
         // reset the number of threads processing.
         this.m_lThreadsProcessing = 0;
         // reset the number of threads completed.
         this.m_lThreadCompleted = 0;
      }


      this.AddToOutput(string.Format("Queue up the 1st {0} items", this.m_lTotalThreads));

      for (int x = 0; x < this.m_lTotalThreads; x++)
      {
         this.StartProcessingNextItem();
      }
   }

   private void StartProcessingNextItem()
   {
      do
      {
         Tuple<int, int> tsk = null;

         lock (this.m_oThreadLock)
         {
            // determine if all the threads have completed.
            if (this.m_lThreadCompleted >= this.m_lTotalItems && this.m_action != null)
            {
               // call the completed action method.
               this.m_action();
               // get out now.
               break;
            }
            // determine if we have any more thread items to process / start. If not, get out now.
            if (this.m_lThreadsProcessing >= this.m_lTotalItems) break;
            if (this.m_lThreadsProcessing >= this.m_threadItems.Length) break;

            // get the next task to run.
            tsk = this.m_threadItems[this.m_lThreadsProcessing];
            // increment the threads processing.
            this.m_lThreadsProcessing++;
         }

         this.AddToOutput(string.Format("Queuing task {0}, ", tsk.Item1));

         // start the task
         System.Threading.Tasks.Task ret1 =
            System.Threading.Tasks.Task.Run(() => this.TaskX(tsk));

         // specify that the StartProcessingNextItem() method be called when the thread completes.
         var c1 = ret1.ContinueWith((antecedant) => this.StartProcessingNextItem(), TaskContinuationOptions.None);

      } while (false);
   }

   private void AddToOutput(string sInfo)
   {
      System.Diagnostics.Debug.WriteLine(string.Format("{0:h:mm:ss tt} {1}", DateTime.Now, sInfo));
   }

   private void TaskX(Tuple<int, int> tsk)
   {
      this.AddToOutput(string.Format("Starting task {0}, {1} ms", tsk.Item1, tsk.Item2));

      System.Threading.Thread.Sleep(tsk.Item2);

      this.AddToOutput(string.Format("Completing task {0}", tsk.Item1));


      lock (this.m_oThreadLock)
      {
         // increment our Threads Completed counter
         this.m_lThreadCompleted++;
      }
   }
}

An example of how the above sample would be called.

public partial class Form1 : Form
{
   private ThreadTest m_test = null;
   private void btnTest_Click(object sender, EventArgs e)
   {
      try
      {
        System.Diagnostics.Debug.WriteLine(string.Format("Starting sample at {0:h:mm:ss tt}", DateTime.Now));


        this.m_test = new ThreadTest();
        this.m_test.StartThreadingSample(this.ThreadsComplete);

      }
      catch (Exception ex)
      {
        Socius.Common.UI.dlg.DisplayError.Show(ex, this);
      }
   }

   delegate void threadComplete();
   public void ThreadsComplete()
   {
      if (this.InvokeRequired)
      {
        threadComplete tc = new threadComplete(this.ThreadsComplete);
        this.Invoke(tc);
      }
      else
      {
        System.Diagnostics.Debug.WriteLine("All threads completed.");

        MessageBox.Show("All threads completed");
      }
   }
}