Categories
C# Tasks

Processing Tasks in Order of Completion

Disclaimer: I acknowledge this method was obtained from another blog; I’m pretty sure it was Stephen Toub’s, but I cannot seem to find his post again.

But here are my modified versions of his Interleaved() method.

#nullable enable

	using System;
	using System.Collections.Concurrent;
	using System.Collections.Generic;
	using System.Diagnostics;
	using System.Linq;
	using System.Threading;
	using System.Threading.Tasks;
	using JetBrains.Annotations;

	public static class TaskExtensions {

		/// <summary>
		/// Return tasks in order of completion.
		/// </summary>
		/// <param name="tasks"></param>
		/// <returns></returns>
		public static IEnumerable<Task<Task>> InOrderOfCompletion( this IEnumerable<Task> tasks ) {
			var inputTasks = tasks.ToList();

			var buckets = new TaskCompletionSource<Task>[ inputTasks.Count ];
			var results = new Task<Task>[ buckets.Length ];

			for ( var i = 0; i < buckets.Length; i++ ) {
				buckets[ i ] = new TaskCompletionSource<Task>();
				results[ i ] = buckets[ i ].Task;
			}

			var nextTaskIndex = -1;

			void Continuation( Task completed ) {
				var bucket = buckets[ Interlocked.Increment( ref nextTaskIndex ) ];
				bucket.TrySetResult( completed );
			}

			foreach ( var inputTask in inputTasks ) {
				inputTask?.ContinueWith( Continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default );
			}

			return results;
		}

		/// <summary>
		/// Return tasks in order of completion.
		/// </summary>
		/// <typeparam name="T"></typeparam>
		/// <param name="tasks"></param>
		/// <returns></returns>
		[ItemNotNull]
		[NotNull]
		public static IEnumerable<Task<Task<T>>> InOrderOfCompletion<T>( this IEnumerable<Task<T>> tasks ) {
			var inputTasks = tasks.ToList();

			var buckets = new TaskCompletionSource<Task<T>>[ inputTasks.Count ];
			var results = new Task<Task<T>>[ buckets.Length ];

			for ( var i = 0; i < buckets.Length; i++ ) {
				buckets[ i ] = new TaskCompletionSource<Task<T>>();
				results[ i ] = buckets[ i ].Task;
			}

			var nextTaskIndex = -1;

			void Continuation( Task<T> completed ) {
				var bucket = buckets[ Interlocked.Increment( ref nextTaskIndex ) ];
				bucket.TrySetResult( completed );
			}

			foreach ( var inputTask in inputTasks ) {
				inputTask?.ContinueWith( Continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default );
			}

			return results;
		}

		/// <summary>
		/// Return tasks in order of completion.
		/// </summary>
		/// <typeparam name="T"></typeparam>
		/// <param name="tasks"></param>
		/// <returns></returns>
		[ItemNotNull]
		[NotNull]
		public static IEnumerable<Task<Task<T>>> InOrderOfCompletion<T>( this IDictionary<TimeSpan, Task<T>> tasks ) {
			var inputTasks = tasks.ToList();

			var buckets = new TaskCompletionSource<Task<T>>[ inputTasks.Count ];
			var results = new Task<Task<T>>[ buckets.Length ];

			for ( var i = 0; i < buckets.Length; i++ ) {
				buckets[ i ] = new TaskCompletionSource<Task<T>>();
				results[ i ] = buckets[ i ].Task;
			}

			var nextTaskIndex = -1;

			void Continuation( Task<T> completed ) {
				var bucket = buckets[ Interlocked.Increment( ref nextTaskIndex ) ];
				bucket.TrySetResult( completed );
			}

			foreach ( var inputTask in inputTasks ) {
				inputTask.Value?.ContinueWith( Continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default );
			}

			return results;
		}

	}

And here is my test code.

	public static class Examples {

		public static async Task TestsInOrderOfCompletion( CancellationToken token ) {
			var rng = new Random();

			//Add to the list, out of "order"..
			var tasks = new ConcurrentDictionary<TimeSpan, Task<String>> {
				[ TimeSpan.FromSeconds( 3 ) ] = Task.Delay( TimeSpan.FromSeconds( 3 ), token ).ContinueWith( _ => "3 seconds", token ),
				[ TimeSpan.FromSeconds( 1 ) ] = Task.Delay( TimeSpan.FromSeconds( 1 ), token ).ContinueWith( _ => "1 second", token ),
				[ TimeSpan.FromSeconds( 2 ) ] = Task.Delay( TimeSpan.FromSeconds( 2 ), token ).ContinueWith( _ => "2 seconds", token ),
				[ TimeSpan.FromSeconds( 5 ) ] = Task.Delay( TimeSpan.FromSeconds( 5 ), token ).ContinueWith( _ => "5 seconds", token ),
				[ TimeSpan.FromSeconds( 8 ) ] = Task.Delay( TimeSpan.FromSeconds( 8 ), token ).ContinueWith( _ => "8 seconds", token ),
				[ TimeSpan.FromSeconds( 7 ) ] = Task.Delay( TimeSpan.FromSeconds( 7 ), token ).ContinueWith( _ => "7 seconds", token ),
				[ TimeSpan.FromSeconds( 9 ) ] = Task.Delay( 1000, token ).ContinueWith( _ => "9 seconds", token ),
				[ TimeSpan.FromSeconds( 10 ) ] = Task.Delay( 1000, token ).ContinueWith( _ => "10 seconds", token ),
				[ TimeSpan.FromSeconds( 4 ) ] = Task.Delay( 1000, token ).ContinueWith( _ => "4 seconds", token )
			};

			//Add a few more to the list, also in "random" order..
			for ( var i = 0; i < 25; i++ ) {
				var millisecondsDelay = rng.Next( 10000 );
				var task = Task.Delay( millisecondsDelay, token ).ContinueWith( _ => $"{millisecondsDelay / 1000.0:F4}", token );   //return how many milliseconds we just delayed
				tasks[ TimeSpan.FromMilliseconds( millisecondsDelay ) ] = task;
			}

			foreach ( var bucket in tasks.InOrderOfCompletion() ) {
				try {
					var task = await bucket.ConfigureAwait( false );
					var result = await task.ConfigureAwait( false );
					Console.WriteLine( $"{DateTime.Now:hh:mm:ss}, TaskId #{task.Id:N0}, {result} ms" );

				}
				catch ( OperationCanceledException ) { }

				catch ( Exception exception ) {
					Debug.WriteLineIf( Debugger.IsAttached, exception.ToString() );
				}
			}

		}

	}

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s