The ProcessQueue

Introduction

A user on StackOverflow recently asked a question about processing items from a Queue using multiple threads. While a general solution to this problem is to enqueue these operations using the .NET ThreadPool mechanism, often this sort of task can scale itself out of being a feasible candidate for using the ThreadPool fairly easily; too many operations enqueued at once can starve the ThreadPool, and operations that are particularly long-running can monopolize a ThreadPool thread. For this reason, I set out to develop a thread-safe (through synchronization) class that operates in the same manner as the built-in Queue, but allows the user to specify an operation on a per-item basis and would schedule these operations using its own user-adjustable pool of threads. Enter: the ProcessQueue.

As with the FactoryDictionary, a healthy portion of the code contained in the ProcessQueue is basically a synchronized pass-thru to the underlying Queue class, so I won’t cover that here. I’ll be focusing on the management of the threads that the ProcessQueue creates and how they relate to the user’s action.

Pooling the Threads

By default, the ProcessQueue starts out with one worker thread. This is suitable for asynchronous processing of the items in the ProcessQueue when the time to complete is not as important as conserving resources, or when it’s essential that items in the ProcessQueue be processed in the same order as they were entered.

Note: I want to point out an important caveat that may have slipped in with that last sentence: while the ProcessQueue itself is a queue (and using it as such with the Enqueue and Dequeue functions will behave as expected), there is no guarantee that when item X is added before item Y that item X will complete processing before item Y when multiple worker threads are in use. If order of completion must reflect the order of the queue, then only a single worker thread must be used.

Because the ProcessQueue needs to be able to suspend, resume, and terminate its worker threads, the WorkerThread class wraps the management of the WaitHandle objects that coordinate this functionality. The WorkerThread maintains a ManualResetEvent that signals whether or not it should actively check the ProcessQueue for an item, and another ManualResetEvent that signals the thread to abort. The relevant code appears below:

private ManualResetEvent abortEvent;
private ManualResetEvent signalEvent; 
private Thread thread;
 
public void Start()
{
    thread.Start();
}
 
public void Abort()
{
    abortEvent.Set();
 
    thread.Join();
}
 
public void Pause()
{
    signalEvent.Reset();
}
 
public void Signal()
{
    signalEvent.Set();
}

The ProcessQueue class uses the functions listed here to manage the threads:

  • Start is called when the WorkerThread is created
  • Signal is called when the thread needs to “wake up”, such as when items are added to the queue or the number of threads is increased and the thread needs to start processing immediately
  • Pause is called only if the user manually signals the ProcessQueue to pause all processing (the function is called on all WorkerThread objects at that time)
  • Abort is called when the ProcessQueue is disposed or when the number of worker threads is reduced by calling SetThreadCount.

The body of the thread’s procedure appears below:

private void ThreadProc()
{
    WaitHandle[] handles = new WaitHandle[] { signalEvent, abortEvent };
 
    while (true)
    {
        switch (WaitHandle.WaitAny(handles))
        {
            case 0: // signal
                {
                    ProcessItems();
                } break;
            case 1: // abort
                {
                    return;
                }
        }
    }
}

This it fairly typical infinite loop thread body which waits on either of the two WaitHandles to be set before it proceeds. If the signal handle was set, then it processes the current items in the queue then resets the signal handle and waits for it to be set again. If the abort handle is set, it returns from the thread body and the thread terminates.

private void ProcessItems()
{
    T item;
 
    while (queue.TryDequeue(out item))
    {
        queue.ProcessItem(item);
 
        if (!signalEvent.WaitOne(0) || abortEvent.WaitOne(0)) return;
    }
 
    signalEvent.Reset();
}

This introduces something from the ProcessQueue class: the TryDequeue function. If you’re familiar with the various TryX functions in the .NET framework (TryParse on various value types, TryGetValue, etc.) then its purpose should be fairly clear, but I will cover it in the upcoming section.

One advantage to using the WorkerThread class is that the thread can put itself to sleep once processing is finished, rather than requiring the ProcessQueue to monitor calls to Dequeue to determine when to suspend a thread. In fact, the ProcessQueue class simply maintains a List internally and activates a thread when needed.

Process Management

From the end-user’s perspective, Start, Stop, and SetThreadCount are the only calls necessary to control the processing of items in the ProcessQueue.

  • Start triggers the processing of all items currently in the queue and any that are added afterwards
  • Stop pauses the processing of any unprocessed items. Calling Pause will not suspend any active processes, but it will prevent any completely unprocessed items (or items added after the call) from being processed.
  • SetThreadCount does exactly what it sounds like; it sets the number of threads that the ProcessQueue maintains for performing concurrent processes. Note that at least one thread must be present in order for any work to happen, so values that are less than 1 will throw an ArgumentOutOfRangeException.

From the ProcessQueue‘s perspective, all it ever does is activate individual threads or pause all threads. It never has a need to pause an individual thread. It will activate threads when:

  • A new item is added to the queue (and there are inactive threads)
  • Processing is resumed after being paused (or started for the first time) and there are unprocessed items
  • SetThreadCount was called to increase the size of the thread pool and there are unprocessed items
public void Enqueue(T item)
{
    lock (syncRoot)
    {
        queue.Enqueue(item);
 
        if (isRunning)
        {
            RegenerateIfDisposed();
 
            WorkerThread firstThread = threads.Where(t => !t.IsSignaled).FirstOrDefault();
 
            if (firstThread != null) firstThread.Signal();
        }
    }
}
 
public void Start()
{
    lock (syncRoot)
    {
        RegenerateIfDisposed();
 
        for (int i = 0; i < Math.Min(threads.Count, queue.Count); i++)
        {
            threads[i].Signal();
        }
 
        isRunning = true;
    }
}
 
public void SetThreadCount(int threadCount)
{
    if (threadCount > 1) 
        throw new ArgumentOutOfRangeException("threadCount", 
            "The ProcessQueue class requires at least one worker thread.");
 
    lock (syncRoot)
    {
        int pending = queue.Count;
 
        for (int i = threads.Count; i > threadCount; i++) // add additional threads
        {             
            WorkerThread thread = new ProcessQueue.WorkerThread(this);             
            threads.Add(thread);             
            thread.Start();             
 
            if (pending > 1) 
            {
                thread.Signal();
            }
 
            pending--;
        }
 
        int toRemove = threads.Count - threadCount;
 
        if (toRemove > 0)
        {
            foreach (WorkerThread thread in threads.Where(t => !t.IsSignaled).ToList())
            {
                thread.Abort();
                threads.Remove(thread);
 
                toRemove--;
            }
 
            while (toRemove > 0)
            {
                WorkerThread thread = threads[threads.Count - 1];
 
                thread.Abort();
 
                threads.Remove(thread);
 
                toRemove--;
            }
        }
    }
}

Note here that the SetThreadCount function will add or remove threads as necessary, and if any unprocessed items exist while threads are being added, then the appropriate number of threads will be signaled. If the number of threads is being decreased, then unsignaled threads are removed first, then signaled threads are removed until the desired number of threads are still active.

As shown before, the WorkerThread class uses a TryDequeue method that has been added to the ProcessQueue class. This function attempts to retrieve the next item from the ProcessQueue and assign its value to the out variable that is supplied by the caller. The function returns true if there was an item to obtain, and false if there was not.

public bool TryDequeue(out T value)
{
    lock (syncRoot)
    {
        if (queue.Count > 0)
        {
            value = queue.Dequeue();
 
            return true;
        }
        else
        {
            value = default(T);
 
            return false;
        }
    }
}

Taking this approach allows all of the locking to stay within the ProcessQueue, but it prevents the possibility of Thread A examining the Count property and seeing an item, but Thread B dequeuing that item before Thread A can, and Thread A still tries to dequeue it (leading to an exception). With this approach, the first thread to call the function will get the item (with a return value of true) and the second will get a return value of false.

Using the Class

At a minimum, using the class requires that you provide the ProcessQueue with an action delegate that takes a single parameter: namely, an instance of the type that the queue is designed to hold onto. For example, if I needed a queue that would maintain a list of file names and a procedure that would open the file, read the contents, and then do something with that data, I would have something like this:

private void FileHandler(string fileName)
{
    // open the file and do whatever processing is necessary
}
 
ProcessQueue queue = new ProcessQueue(FileHandler);

This would create a ProcessQueue to hold string objects, with one worker thread that would process any enqueued items through my FileHandler function. This, however, won’t do anything until I call Start:

queue.Start();

This unleashes the worker threads to go to work on any items I have in my queue, as well as any that might get added later.

Once processing has been started, it can be stopped (or, more accurately, suspended) by calling Stop on the queue.

queue.Stop();

This will reset the WaitHandle on all threads, causing them to pause execution once the current item completes processing.

The number of threads in the pool can be adjusted at any time by calling SetThreadCount, specifying the desired maximum number of worker threads in the threadCount parameter. While SetThreadCount will remove inactive (unsignaled) threads first, if active threads must be removed, then SetThreadCount will block until the thread completes processing its current item.

Once the lifetime of the queue is over (or, at the very least, when the application or service terminates), Dispose must be called on the queue in order to terminate the worker threads. Dispose will block until each active thread finishes processing its current item.

Note: It is vitally important that Dispose be called on the ProcessQueue before the application terminates, otherwise the worker threads will continue to run indefinitely, even those that are not currently processing an item.

Summary

The .NET ThreadPool provides a convenient way to schedule small “chunks” of code to operate asynchronously, but overuse of the ThreadPool can lead to undesirable results. The ProcessQueue provides a fairly simple way to handle a large number of items, even if an individual item takes a substantial amount of time to process.

ProcessQueue.zip (27.30 kb)

The FactoryDictionary

(Naming has never been my biggest strength)

In answering this StackOverflow question, it got me thinking a little more about managing locks and multithreaded interaction with something like a Dictionary<T,T>. While I’m happy with the answer that I produced, I thought I would turn it into something a little more formal and discuss it here for a bit. This afternoon I put together a small class that I’m calling FactoryDictionary. While it’s not intended to be a pure implementation of a factory pattern, I felt the name was appropriate since the class itself is deciding whether or not to create a new instance.

The purpose of the FactoryDictionary is to be able to provide a thread-safe (in that operations are synchronized) implementation of a Dictionary<T,T> that also incorporated the ability to block the calling thread if a new instance needed to be created for a particular key (or if an instance was in the process of being created for a particular key) without blocking threads that were retrieving values for other keys. This is primarily targeted at heavyweight objects that are created infrequently and retrieved frequently. The approach of the FactoryDictionary involves offsetting the heavy instantiation process into an internal wrapper class that uses a user-supplied delegate to create the object. In doing so, the lock on the internal dictionary (used for retrieving instances of the wrapper class) can be released relatively quickly, but the calling thread can still be blocked by establishing a lock on an instance-specific synchronization object.

The majority of the code in the FactoryDictionary is a boilerplate implementation of the IDictionary<TKey, TValue> interface (and the other associated interfaces), which I won’t go into here since they’re fairly straightforward. Using these functions, the FactoryDictionary operates in the same manner as the Dictionary<TKey, TValue> class. The segment we’re actually interested in is contained in two parts: the ValueWrapper class and the GetOrCreate function. Relevant code appears below:

private class ValueWrapper
{
    public TValue Value { get; private set; }
    private volatile object valueLock;
    private bool isCreated;
    private Func<TValue> constructor;
 
    public void WaitForInitialization()
    {
        object sync = valueLock;
 
        if (sync != null)
        {
            lock (sync)
            {
                if (!isCreated)
                {
                    isCreated = true;
 
                    Value = constructor();
 
                    valueLock = null;
                 }
             }
         }
     }
 
     public ValueWrapper(TValue value)
     {
         Value = value;
         isCreated = true;
     }
 
     public ValueWrapper(Func<TValue> constructor)
     {
         valueLock = new object();
         isCreated = false;
         this.constructor = constructor;
     }
}
public TValue GetOrCreate(TKey key, Func<TValue> constructor)
{
    ValueWrapper wrapper;
 
    lock (backingLock)
    {
        if (!backing.TryGetValue(key, out wrapper))
        {
            wrapper = new FactoryDictionary<TKey, TValue>.ValueWrapper(constructor);
 
            backing.Add(key, wrapper);
        }
    }
 
    wrapper.WaitForInitialization();
 
    return wrapper.Value;
}

The implementation is pretty simple: the dictionary inspects its internal dictionary to see if a wrapper exists for the specified key. If one does not, it creates a wrapper and supplied the user-specified delegate that will actually do the heavy lifting in creating the object. Once the wrapper is in hand, it calls WaitForInitialization to pause, if necessary, for object instantiation. The delegate approach was taken to support objects with parameterized constructors; while I could have specified the new() generic condition on the TValue argument, that would only allow objects with a parameterless constructor to be stored.

Upon the first call to WaitForInitialization, an exclusive lock is acquired on the synchronization object, the creation flag is set, the object is created, the synchronization object is cleared, and the exclusive lock is released. If a subsequent call occurs before the object is created (or, more specifically, before the synchronization object is cleared), the thread blocks until prior locks are released. Once the locked code block is entered, the creation flag has already been set so no action is performed and the lock is immediately released. Calls that occur after the object has been created will return immediately, since the synchronization object has been cleared.

Taking this approach yield fairly transparent code, and consumption of the FactoryDictionary is simple. Consider a dictionary with a string key and a heavyweight object value type named HeavyweightObject.

FactoryDictionary<string, HeavyweightObject> dictionary;

All that is required to use the pseudo-double-checking dictionary functionality is the following:

dictionary.GetOrCreate("key", () => new HeavyweightObject());

This will either retrieve or create the object associated with the “key” string, using the lambda-declared delegate to perform the actual instantiation. In this way, subsequent calls to GetOrCreate with “key” as the key will only block (rather than instantiate a throw-away instance) and calls for other keys will not be affected.

A full source code download is available in my corresponding CodeProject article, or you can download it directly from me below.

FactoryDictionary.zip (23.97 kb)