Sunday, December 20, 2009

Countdown Latch, take #1

OK, enough ranting. It's time to post something useful. Here is a Countdown Latch I wrote some time ago. A countdown latch is a thread-safe object that has a counter associated with it. The latch allows threads to block until that counter -- you can only decrement the counter -- reaches zero. I always missed it in .Net FW, and one is going to be in FW 4 -- finally!

(UPDATE: of course, only after writing this I've discovered an easier solution -- one using Monitor.Wait() and Monitory.PulseAll()).

Here is the listing:
using System;
using System.Globalization;
using System.Threading;

namespace Primitives.Threading
{
    /// 
    /// A thread-safe counter (goes only down) that is capable
    /// of blocking threads until the counter's value reaches zero.
    /// 
    public class CountdownLatch : IDisposable
    {
        private int currentCount;
        private ManualResetEvent countHasReachedZero = new ManualResetEvent(false);
        private object disposalCriticalSection = new object();
        private ManualResetEvent allWaitingThreadsAreGone;
        private const int MaximumNumberOfWaiters = int.MaxValue - 1;

        /// 
        /// Note: it's safe to access/manipulate the value of this
        /// field only under a lock for disposalCriticalSection,
        /// or using the Inrerlocked methods.
        /// 
        private int numberOfWaitingThreads;

        private bool disposed;

        public CountdownLatch(int initialCount)
        {
            if (initialCount <= 0)
                throw new ArgumentOutOfRangeException(
                    "initialCount",
                    "Must be greater than zero.");

            this.currentCount = initialCount;
        }

        /// 
        /// Decreases the latch' counter value by one.
        /// Does not throw, if the count has already reached zero.
        /// 
        /// 
        public void Decrement()
        {
            lock (this.disposalCriticalSection)
            {
                this.MakeSureTheLatchHasNotBeenDisposed();

                if (this.currentCount > 0)
                {
                    this.currentCount--;

                    if (this.currentCount == 0)
                    {
                        this.countHasReachedZero.Set();
                    }
                }
            }
        }

        /// 
        /// Current latch counter value.
        /// This is for informational purposes only.
        /// 
        public int CurrentCount
        {
            get
            {
                return Thread.VolatileRead(ref this.currentCount);
            }
        }

        /// 
        /// Blocks the calling thread until the latch count becomes zero.
        /// 
        /// 
        /// 
        public void WaitUntilCountIsZero()
        {
            lock (this.disposalCriticalSection)
            {
                this.MakeSureTheLatchHasNotBeenDisposed();

                this.IncrementNumberOfWaiters();
            }

            try
            {
                this.countHasReachedZero.WaitOne();

                this.ThrowIfHandleHasBeenSetByDispose();
            }
            finally
            {
                this.DecrementNumberOfWaitersSignalingTheHandleIfDisposing();
            }
        }

        /// 
        /// This is an insane precaution: even having that many threads is simply impossible.
        /// 
        /// 
        private void IncrementNumberOfWaiters()
        {
            this.numberOfWaitingThreads++;

            if (this.numberOfWaitingThreads == MaximumNumberOfWaiters)
            {
                this.numberOfWaitingThreads--;

                throw new InvalidOperationException(
                    String.Format(
                        "Cannot block - only as many as {0} waiting threads are supported.",
                        MaximumNumberOfWaiters));
            }
        }

        private void DecrementNumberOfWaitersSignalingTheHandleIfDisposing()
        {
            lock (this.disposalCriticalSection)
            {
                this.numberOfWaitingThreads--;

                if (this.disposed && (this.numberOfWaitingThreads == 0))
                {
                    this.allWaitingThreadsAreGone.Set();
                }
            }
        }

        /// 
        /// Blocks the calling thread until the latch' counter value
        /// is zero or specified amount of time passes.
        /// 
        /// Amount of time to wait for latch'
        /// counter value to reach zero.
        /// True, if the latch has signaled (counter reached zero)
        /// before specified Time Span elapsed; False, otherwise.
        /// 
        /// 
        /// 
        public bool WaitUntilCountIsZero(TimeSpan waitTimeSpan)
        {
            lock (this.disposalCriticalSection)
            {
                this.MakeSureTheLatchHasNotBeenDisposed();

                if (waitTimeSpan < TimeSpan.Zero)
                    throw new ArgumentOutOfRangeException(
                        "waitTimeSpan",
                        "Must be greater than TimeSpan.Zero.");

                this.IncrementNumberOfWaiters();
            }

            try
            {
                bool gotZero = this.countHasReachedZero.WaitOne(waitTimeSpan);

                if (gotZero)
                {
                    this.ThrowIfHandleHasBeenSetByDispose();
                }

                return gotZero;
            }
            finally
            {
                this.DecrementNumberOfWaitersSignalingTheHandleIfDisposing();
            }
        }

        private void ThrowIfHandleHasBeenSetByDispose()
        {
            lock (this.disposalCriticalSection)
            {
                if (this.disposed && (this.currentCount != 0))
                    throw new ObjectDisposedException(this.GetType().FullName);
            }
        }

        /// 
        /// Disposes of all unmanaged resources allocated by the latch.
        /// (The kernel objects used, are released).
        /// Also: releases all waiting threads.
        /// 
        public void Dispose()
        {
            bool needToWaitForWaitersToFallOff = false;

            lock (this.disposalCriticalSection)
            {
                if (this.disposed)
                    return;

                this.countHasReachedZero.Set();
                this.disposed = true;

                if (this.numberOfWaitingThreads > 0)
                {
                    this.allWaitingThreadsAreGone = new ManualResetEvent(false);
                    needToWaitForWaitersToFallOff = true;
                }
            }

            if (needToWaitForWaitersToFallOff)
            {
                this.allWaitingThreadsAreGone.WaitOne();
                this.allWaitingThreadsAreGone.Close();
            }

            this.countHasReachedZero.Close();
        }

        private void MakeSureTheLatchHasNotBeenDisposed()
        {
            if (this.disposed)
                throw new ObjectDisposedException(this.GetType().FullName);
        }
    }
}

No comments:

Post a Comment