Executing X tasks, in Y threads at a time
This example illustrates how to run a total of X tasks, using a total of Y threads.
public class ThreadTest { // declare some member variables private int m_lTotalItems = 0; private int m_lTotalThreads = 0; private int m_lThreadsProcessing = 0; private int m_lThreadCompleted = 0; private object m_oThreadLock = 0; // object to use for locking private Tuple<int, int>[] m_threadItems = null; private Action m_action = null; public void StartThreadingSample(Action action) { // set the action method to fire when all the processes are complete. this.m_action = action; // determine the number of total thread that need to be processed. this.m_lTotalItems = 15; // determine the number of items that we can process simultaneously. this.m_lTotalThreads = 5; this.m_threadItems = new Tuple<int, int>[this.m_lTotalItems]; this.AddToOutput(string.Format("Total tasks: {0}, total threds: {1}", this.m_lTotalItems, this.m_lTotalThreads)); // create an instance of the random class Random rnd = new Random(); // build the items. for (int x = 0; x < this.m_lTotalItems; x++) { // get a random time that this task will take to complete, between 5 & 15 seconds. int l2 = rnd.Next(10000) + 5000; // create the item and add it to our array. this.m_threadItems[x] = new Tuple<int, int>(x, l2); } lock (this.m_oThreadLock) { // reset the number of threads processing. this.m_lThreadsProcessing = 0; // reset the number of threads completed. this.m_lThreadCompleted = 0; } this.AddToOutput(string.Format("Queue up the 1st {0} items", this.m_lTotalThreads)); for (int x = 0; x < this.m_lTotalThreads; x++) { this.StartProcessingNextItem(); } } private void StartProcessingNextItem() { do { Tuple<int, int> tsk = null; lock (this.m_oThreadLock) { // determine if all the threads have completed. if (this.m_lThreadCompleted >= this.m_lTotalItems && this.m_action != null) { // call the completed action method. this.m_action(); // get out now. break; } // determine if we have any more thread items to process / start. If not, get out now. if (this.m_lThreadsProcessing >= this.m_lTotalItems) break; if (this.m_lThreadsProcessing >= this.m_threadItems.Length) break; // get the next task to run. tsk = this.m_threadItems[this.m_lThreadsProcessing]; // increment the threads processing. this.m_lThreadsProcessing++; } this.AddToOutput(string.Format("Queuing task {0}, ", tsk.Item1)); // start the task System.Threading.Tasks.Task ret1 = System.Threading.Tasks.Task.Run(() => this.TaskX(tsk)); // specify that the StartProcessingNextItem() method be called when the thread completes. var c1 = ret1.ContinueWith((antecedant) => this.StartProcessingNextItem(), TaskContinuationOptions.None); } while (false); } private void AddToOutput(string sInfo) { System.Diagnostics.Debug.WriteLine(string.Format("{0:h:mm:ss tt} {1}", DateTime.Now, sInfo)); } private void TaskX(Tuple<int, int> tsk) { this.AddToOutput(string.Format("Starting task {0}, {1} ms", tsk.Item1, tsk.Item2)); System.Threading.Thread.Sleep(tsk.Item2); this.AddToOutput(string.Format("Completing task {0}", tsk.Item1)); lock (this.m_oThreadLock) { // increment our Threads Completed counter this.m_lThreadCompleted++; } } }
An example of how the above sample would be called.
public partial class Form1 : Form { private ThreadTest m_test = null; private void btnTest_Click(object sender, EventArgs e) { try { System.Diagnostics.Debug.WriteLine(string.Format("Starting sample at {0:h:mm:ss tt}", DateTime.Now)); this.m_test = new ThreadTest(); this.m_test.StartThreadingSample(this.ThreadsComplete); } catch (Exception ex) { Socius.Common.UI.dlg.DisplayError.Show(ex, this); } } delegate void threadComplete(); public void ThreadsComplete() { if (this.InvokeRequired) { threadComplete tc = new threadComplete(this.ThreadsComplete); this.Invoke(tc); } else { System.Diagnostics.Debug.WriteLine("All threads completed."); MessageBox.Show("All threads completed"); } } }