(UPDATE: of course, only after writing this I've discovered an easier solution -- one using Monitor.Wait() and Monitory.PulseAll()).
Here is the listing:
using System;
using System.Globalization;
using System.Threading;
namespace Primitives.Threading
{
    /// 
    /// A thread-safe counter (goes only down) that is capable
    /// of blocking threads until the counter's value reaches zero.
    ///  
    public class CountdownLatch : IDisposable
    {
        private int currentCount;
        private ManualResetEvent countHasReachedZero = new ManualResetEvent(false);
        private object disposalCriticalSection = new object();
        private ManualResetEvent allWaitingThreadsAreGone;
        private const int MaximumNumberOfWaiters = int.MaxValue - 1;
        /// 
        /// Note: it's safe to access/manipulate the value of this
        /// field only under a lock for disposalCriticalSection,
        /// or using the Inrerlocked methods.
        ///  
        private int numberOfWaitingThreads;
        private bool disposed;
        public CountdownLatch(int initialCount)
        {
            if (initialCount <= 0)
                throw new ArgumentOutOfRangeException(
                    "initialCount",
                    "Must be greater than zero.");
            this.currentCount = initialCount;
        }
        /// 
        /// Decreases the latch' counter value by one.
        /// Does not throw, if the count has already reached zero.
        ///  
        ///  
        public void Decrement()
        {
            lock (this.disposalCriticalSection)
            {
                this.MakeSureTheLatchHasNotBeenDisposed();
                if (this.currentCount > 0)
                {
                    this.currentCount--;
                    if (this.currentCount == 0)
                    {
                        this.countHasReachedZero.Set();
                    }
                }
            }
        }
        /// 
        /// Current latch counter value.
        /// This is for informational purposes only.
        ///  
        public int CurrentCount
        {
            get
            {
                return Thread.VolatileRead(ref this.currentCount);
            }
        }
        /// 
        /// Blocks the calling thread until the latch count becomes zero.
        ///  
        ///  
        ///  
        public void WaitUntilCountIsZero()
        {
            lock (this.disposalCriticalSection)
            {
                this.MakeSureTheLatchHasNotBeenDisposed();
                this.IncrementNumberOfWaiters();
            }
            try
            {
                this.countHasReachedZero.WaitOne();
                this.ThrowIfHandleHasBeenSetByDispose();
            }
            finally
            {
                this.DecrementNumberOfWaitersSignalingTheHandleIfDisposing();
            }
        }
        /// 
        /// This is an insane precaution: even having that many threads is simply impossible.
        ///  
        ///  
        private void IncrementNumberOfWaiters()
        {
            this.numberOfWaitingThreads++;
            if (this.numberOfWaitingThreads == MaximumNumberOfWaiters)
            {
                this.numberOfWaitingThreads--;
                throw new InvalidOperationException(
                    String.Format(
                        "Cannot block - only as many as {0} waiting threads are supported.",
                        MaximumNumberOfWaiters));
            }
        }
        private void DecrementNumberOfWaitersSignalingTheHandleIfDisposing()
        {
            lock (this.disposalCriticalSection)
            {
                this.numberOfWaitingThreads--;
                if (this.disposed && (this.numberOfWaitingThreads == 0))
                {
                    this.allWaitingThreadsAreGone.Set();
                }
            }
        }
        /// 
        /// Blocks the calling thread until the latch' counter value
        /// is zero or specified amount of time passes.
        ///  
        /// Amount of time to wait for latch'
        /// counter value to reach zero.
        /// True, if the latch has signaled (counter reached zero)
        /// before specified Time Span elapsed; False, otherwise. 
        ///  
        ///  
        ///  
        public bool WaitUntilCountIsZero(TimeSpan waitTimeSpan)
        {
            lock (this.disposalCriticalSection)
            {
                this.MakeSureTheLatchHasNotBeenDisposed();
                if (waitTimeSpan < TimeSpan.Zero)
                    throw new ArgumentOutOfRangeException(
                        "waitTimeSpan",
                        "Must be greater than TimeSpan.Zero.");
                this.IncrementNumberOfWaiters();
            }
            try
            {
                bool gotZero = this.countHasReachedZero.WaitOne(waitTimeSpan);
                if (gotZero)
                {
                    this.ThrowIfHandleHasBeenSetByDispose();
                }
                return gotZero;
            }
            finally
            {
                this.DecrementNumberOfWaitersSignalingTheHandleIfDisposing();
            }
        }
        private void ThrowIfHandleHasBeenSetByDispose()
        {
            lock (this.disposalCriticalSection)
            {
                if (this.disposed && (this.currentCount != 0))
                    throw new ObjectDisposedException(this.GetType().FullName);
            }
        }
        /// 
        /// Disposes of all unmanaged resources allocated by the latch.
        /// (The kernel objects used, are released).
        /// Also: releases all waiting threads.
        ///  
        public void Dispose()
        {
            bool needToWaitForWaitersToFallOff = false;
            lock (this.disposalCriticalSection)
            {
                if (this.disposed)
                    return;
                this.countHasReachedZero.Set();
                this.disposed = true;
                if (this.numberOfWaitingThreads > 0)
                {
                    this.allWaitingThreadsAreGone = new ManualResetEvent(false);
                    needToWaitForWaitersToFallOff = true;
                }
            }
            if (needToWaitForWaitersToFallOff)
            {
                this.allWaitingThreadsAreGone.WaitOne();
                this.allWaitingThreadsAreGone.Close();
            }
            this.countHasReachedZero.Close();
        }
        private void MakeSureTheLatchHasNotBeenDisposed()
        {
            if (this.disposed)
                throw new ObjectDisposedException(this.GetType().FullName);
        }
    }
}
No comments:
Post a Comment