InfiniTec - Henning Krauses Blog

Don't adjust your mind - it's reality that is malfunctioning

Minor update on my multi threading library

Here is a small update on my multi threading framework. After some background reading on how the BackgroundWorker synchronizes event invocations, I decided to implement a similar functionality into my framework as well.

UI Synchronization

The Background worker allows updates to the UI without the need to call Control.Invoke. It does this by utilizing the SynchronizationContext class (A good article on this topic can be found here on CodeProject).

This version of my multithreading framework also uses this mechanism to call the Completed, ProgressChanged and Error events. And best of all, this is agnostic to the UI framework used: It works with Windows Forms as well as Windows Presentation Foundation.

Locking inside an asynchronous method

Another issue you should know when working with my framework is this: You must never return a NestedOperation or a ParallelOperaton from within a region protected by either a Monitor (explicitly or via the lock (SyncLock in Visual Basic) keyword) or a Mutex. This is because these synchronization primitives are thread-dependent. Use a Semaphore instead.

Consider the following code:

    1 if (_ConfigurationNamingContext == null)

    2 {

    3     lock (_ConfigurationNamingContextLock)

    4     {

    5         if (_ConfigurationNamingContext == null)

    6         {

    7             using (

    8                 AsyncOperation<int, Item> operation =

    9                     ActiveDirectoryEntry.GetConfigurationNamingContext(_Connection.ActiveDirectoryConnection)

   10                 )

   11             {

   12                 yieldreturnnewNestedOperation(operation, false);

   13                 _ConfigurationNamingContext = operation.Result;

   14             }

   15         }

   16     }

   17 }

The code from line 3 through line 16 are protected by a Monitor to ensure that the code is not executed twice (which could hapen when the method is called in parellel). In line 12, a nested operation is executed, doing the hard work. But when the displayed method is resumed in line 13, the code may run on a different thread than before. This will cause the Monitor.Exit (which is implicitly called in line 16) to throw a SynchronizationLockException.

To avoid this, use a Semaphore instead.

Create a semaphore with a maximum count of 1:

    1 privatestaticSemaphore _ConfigurationNamingContextLock = newSemaphore(1, 1);

Now, use the semaphore as outlined here:

    1 if (_ConfigurationNamingContext == null)

    2 {

    3     try

    4     {

    5         _ConfigurationNamingContextLock.WaitOne();

    6 

    7         if (_ConfigurationNamingContext == null)

    8         {

    9             using (

   10                 AsyncOperation<int, Item> operation =

   11                     ActiveDirectoryEntry.GetConfigurationNamingContext(_Connection.ActiveDirectoryConnection)

   12                 )

   13             {

   14                 yieldreturnnewNestedOperation(operation, false);

   15                 _ConfigurationNamingContext = operation.Result;

   16             }

   17         }

   18     }

   19     finally

   20     {

   21         _ConfigurationNamingContextLock.Release();

   22     }

   23 }

It's not as neat as using the lock keyword, but it works :-)

Change log for version 2.5.1

  • Events are now called via a SynchronizationContext. This makes the usage of Invoke() methods unnecessary when reporting progress to Windows Forms or WPF applications.

Downloads

InfiniTec.Threading.zip (277,385 Bytes)
Source, Binaries and Documentation for version 2.5.1

Technorati:

Posted by Henning Krause on Sunday, January 28, 2007 12:00 AM, last modified on Sunday, January 28, 2007 12:00 PM
Permalink | Post RSSRSS comment feed

Extreme threading...

This is a major upgrade for my multithreading library. If you are not familiar about what this library does, please read the other articles in this section.

New in this release are parallel operations: You can return ParallelOperation from within an asynchronous operation. The execution of the current operation will be suspended until all nested operations are executed. Additionally, a maximum number of parallel executions can be passed along, which allows you to further control the execution of the nested operations. And of course, you can select which exceptions you want to catch on each of the nested operation.

I'm primarily using these parallel operations in my InfiniTec.Exchange library to speed up lookups to Active Directory and the Exchange store. For example, when looking up members of a distribution list, I get a bunch of references, either to user in Active Directory, or references to contacts on an Exchange folder. Take a look at this method:

    1 privateIEnumerable<OperationAction> RefreshInternal(AsyncOperation baseOperation)

    2 {

    3     Dictionary<NestedOperation,EntryIdReference> operations;

    4     ExchangeStoreReference entryId;

    5     List<IAddressListEntry> result;

    6 

    7     _UnresolvableEntries = 0;

    8 

    9     yieldreturnnewNestedOperation(baseOperation, false);

   10 

   11 

   12     if (!_ResolveMembers)

   13     {

   14         _ResolveMembers = true;

   15         yieldbreak;

   16     }

   17 

   18     entryId = newExchangeStoreReference(Properties.GetProperty<byte[]>(WellknownProperties.Item.EntryId).Value);

   19 

   20     operations = newDictionary<NestedOperation, EntryIdReference>();

   21 

   22     foreach (EntryIdReference reference in DecodeMembers(entryId.StoreId))

   23     {

   24         operations.Add(

   25             newNestedOperation(reference.GetResolveOperation(BaseItem.Connection), false),

   26             reference);

   27     }

   28 

   29     yieldreturnnewParallelOperations(operations.Keys, 5);

   30 

   31     result = newList<IAddressListEntry>();

   32 

   33     foreach (KeyValuePair<NestedOperation, EntryIdReference> entry in operations)

   34     {

   35         AsyncOperation<IAddressListEntry> operation;

   36 

   37         operation = (AsyncOperation<IAddressListEntry>) entry.Key.Operation;

   38         if (operation.StatusInformation != null)

   39         {

   40             result.Add(operation.StatusInformation);

   41         }

   42         else

   43         {

   44             _UnresolvableEntries++;

   45         }

   46     }

   47 

   48     _Members = result.AsReadOnly();

   49 }

In Line 9, I call the original refresh method, using the NestedOperation operation action. After that, I decode those member entries in lines 22 to 27 and put them in a dictionary. The interesting thing happens in line 29. At this point, I return a ParallelOperation operation action with a list of those member references. Additionally, a maximum number of concurrent operations of 5 is specified.

When the method resumes it's operation in line 31, all nested operations have been completed.

Changelog

  • Introduced parallel operations: Using the ParallelOperations action, you can now return multiple nested operations which will be executed in parallel. Once all nested operations are finished, the parent operation is resumed.
  • The AsyncOperation<StatusType> can now also be used to return a result. Just return an OperationResult<StatusType> in the enumerator.
  • Fixed a bug with nested operations and exception handling

Downloads

InfiniTec.Threading.zip (268,722 Bytes)
Version 2.5. Includes source, binaries and documentation

Technorati:

Posted by Henning Krause on Saturday, January 27, 2007 12:00 AM, last modified on Monday, November 29, 2010 8:27 PM
Permalink | Post RSSRSS comment feed

InfiniTec.Threading library

Description

In his blog Michael Entin talks about using C# iterators to simplify writing of asynchronous code. Be sure to read his article before proceeding!

If you want to do a web request using the WebRequest class, you can either do this synchronously, like this one:

    1 publicstring DoRequest(string uri)

    2 {

    3     WebRequest request = WebRequest.Create(uri);

    4 

    5     WebResponse response = request.GetResponse();

    6 

    7     using (Stream stream = response.GetResponseStream())

    8     {

    9         byte[] buffer = newbyte[4096];

   10         StringBuilder result = newStringBuilder();

   11         do

   12         {

   13             stream.Read(buffer, 0, buffer.Length);

   14 

   15             if (bytesRead > 0)

   16             {

   17                 result.Append(Encoding.UTF8.GetString(buffer, 0, bytesRead));

   18             }

   19         } while (bytesRead != 0);

   20 

   21         return result.ToString();

   22     }

   23 }

Very nice and elegant. But a waste of resources. The operation blocks the caller (and with it probably the UI). The solution to this is to perform the entire request asynchronously: Use BeginGetRequestStream, EndGetRequestStream, and the other BeginXXX and EndXXX methods. You will end up with many callback methods for each of them. At the end you'll have a very complicated set of methods, which will be hard to debug.

Michael (and other) had the ingenious idea to use C# iterators to merge these two concepts, and take the best of both worlds together. The AsyncOperation class in this library implements his approach, at least the basic principles.

Creative misuse…

The basic idea is to implement the operation (e.g. downloading a website) as an iterator method:

    1 privateIEnumerable<OperationAction> DownloadUrlInternal(string url)

    2 {

    3     WebRequest request = WebRequest.Create(Url);

    4 

    5     IAsyncResult ar = request.BeginGetResponse(null, null);

    6 

    7     ProgressUpdate<Status> update = newProgressUpdate<Status>(Status.ReceivingResponse, ar.AsyncWaitHandle);

    8     yieldreturn update;

    9 

   10     WebResponse response = request.EndGetResponse(ar);

   11 }

This method implements an iterator and returns an operation action of some sort. After a call to BeginGetResponse, the method creates a ProgressChangedEventStateBag which contains the status of the current request, as well as the Waithandle returned by the BeginGetResponse. Using the 'yield return' statement, this information is passed to the caller.

The method is not resumed until the Waithandle is signaled.

The DowndloadUrl method is not called directly by an application. Instead, a wrapper class is used, called AsyncOperation. This class ensures that the DownloadUrl method is called properly and translates those OperationActions into events like ProgressChanged, Completed, Error and Timedout.

The wrapper is written this way:

    1 publicAsyncOperation<Status, string> DownloadUrl(string url)

    2 {

    3     returnnewAsyncOperation<Status, string>(DownloadUrlInternal(url, 3000);

    4 }

This call initializes a new AsyncOperation class with a timeout of 3 seconds. The calling application can use this code to run the code:

    1 helper = reader.DownloadUrl(tbUrl.Text);

    2 

    3 helper.ProgressChanged += helper_ProgressChanged;

    4 helper.Finished += helper_Finished;

    5 helper.Error += helper_Error;

    6 

    7 helper.Run();

The entire source code, as well as a sample application can be downloaded from the bottom of this page.

The AsyncOperation are intensively used in my InfiniTec.Exchange project.

More Information

 

The AsyncOperation classes

The main classes are the AsyncOperation and those derived from AsyncOperation. The most important members are displayed in this diagram:


(click to enlarge)

Most of the logic is implemented in the AsyncOperation class. However, only the AsyncOperation<StatusType> and the AsyncOperation<StatusType, ResultType> classes can be used. The first one is best suited for operation which do not yield a result. For example copying an input stream to an output stream. An example for this is the AsyncOperations.Copy


operation, which does exactly this. During progress updates, it returns the current operation (reading from the input stream or writing to the output stream), as well as the current progress, if the source stream supports seeking.

 

The second operation type reports status updates and returns a result. A good example for this would be the download of a file using the HttpWebRequest operation.

The operation actions

An asynchronous operation can return several different operation actions. These are displayed below:


(click to enlarge)

  • The ProgrssUpdate<StatusType> action is used to report progress changes to the caller. This is also the action that drives the operation. Every BeginXXX method should be followed by a ProgressUpdate action containing the IAsyncResult.AsyncWaitHandle.
  • The abstract ProgressUpdate action is only used internally.
  • The OperationResult<ResultType> is used to report the result of the operation.
  • The NestedOperation can be used to run an asynchronous operation from another asynchronous operation. The current operation is not resumed until the nested operation has completed. The calling operation can specifiy wether progress updates are propagated to the root operation. Addionally, the calling operation can specify, wehter it wants to handle a possible timeout of the nested operation and certain exception. Unhandled exceptions and timeouts are propagated up the call chain until a handler is found or until the root operation is reached.
  • The NestedOperation<StatusType> is a combination of the NestedOperation action and the ProgressUpdate<StatusType> action. This type will first report the progress update and then execute the nested operation.

Progress updates of nested operations

The main problem of propagating the progress of nested operations to the root operation is the possible incompatible types of the status types. This is solved by marking properties of the status classes of the parent operation as a nested state. The NestedState attribute is used for this purpose. During a nested progress update, the status class of the parent operation is searched for a matching property which is marked with the NestedState attribute.

ChangeLog

Version 2.0 (05/15/2006)

This version is a major update… somewhere in the middle I lost track of the changes…. Some of the changes are:

  • New class AsyncOperations. Contains a basic Stream-to-Stream copy operation
  • Changed the name of the AsyncOperationBase class to AsyncOperation
  • Added an Update method to the ProgressUpdate class
  • The type of the Timeout property has been changed from int to TimeSpan
  • The StatusBag classes have benn renamed to OperationActions.
  • Nested operation are now better supported
  • Progress updates can now be reported from a nested operation to the root operation.
  • New Class AsyncOperation<StatusType> for operations which don't return a result
  • WaitForComletion operation added to the AsyncOperation class. Blocks the calling thread until the operation is finished.

Version 1.1 (03/31/2006)

  •  Added a WaitHandle that is signaled, when the status of the operation changes. This allows better nesting of asynchronous operations.
  • Added Status, StatusInformation, Result and Exception properties. Now the current statatus can be read from the AsyncOperation class.
  • Added Abort method to the AsyncOperation class.
  • OnFinished in OnCompleted umbenannt
  • Added NDoc documentation

Version 1.0 (03/30/2006)

  • Initial release

Downloads

InfiniTec.Threading Release.zip (28,518 Bytes)
Binaries version 2.1 with debug symbols.. Signed with the InfiniTec private key.
InfiniTec.Threading source.zip (21,563 Bytes)
Source code for the InfiniTec.Threading assembly.
InfiniTec.Threading help.zip (129,542 Bytes)
Help file containing the documentation for the assembly.

Technorati:

Posted by Henning Krause on Thursday, March 30, 2006 12:00 AM, last modified on Wednesday, July 12, 2006 1:00 PM
Permalink | Post RSSRSS comment feed