概要
當(dāng)多個任務(wù)或線程并行運(yùn)行時,難以避免的對某些有限的資源進(jìn)行并發(fā)的訪問,
C# 并行編程 之 限制資源的并發(fā)訪問 使用SemaphoreS
?梢钥紤]使用信號量來進(jìn)行這方面的控制(System.Threading.Semaphore)是表示一個Windows內(nèi)核的信號量對象。如果預(yù)計(jì)等待的時間較短,可以考慮使用SemaphoreSlim,它則帶來的開銷更小。.NetFrameWork中的信號量通過跟蹤進(jìn)入和離開的任務(wù)或線程來協(xié)調(diào)對資源的訪問。信號量需要知道資源的最大數(shù)量,當(dāng)一個任務(wù)進(jìn)入時,資源計(jì)數(shù)器會被減1,當(dāng)計(jì)數(shù)器為0時,如果有任務(wù)訪問資源,它會被阻塞,直到有任務(wù)離開為止。
示例程序: 10個任務(wù)并行訪問3個資源
<code class="hljs cs">using System;using System.Text;using System.Threading;using System.Threading.Tasks;using System.Diagnostics;namespace Sample5_8_semaphoreslim{ class Program { private static int _TaskNum = 10; private static Task[] _Tasks; private const int MAX_RESOURCE = 3; private const int RUN_LOOP = 10; private static SemaphoreSlim m_Semaphore; private static void Work1(int TaskID) { int i = 0; var sw = Stopwatch.StartNew(); var rnd = new Random(); while (i < RUN_LOOP) { Thread.Sleep(rnd.Next(200, 500)); Console.WriteLine("TASK " + TaskID + " REQUESTing {"); m_Semaphore.Wait(); try { Console.WriteLine("TASK " + TaskID + " WOrking ... ..." + i); sw.Restart(); Thread.Sleep(rnd.Next(200, 500)); } finally { Console.WriteLine("TASK " + TaskID + " REQUESTing }"); m_Semaphore.Release(); i++; } } } static void Main(string[] args) { _Tasks = new Task[_TaskNum]; m_Semaphore = new SemaphoreSlim(MAX_RESOURCE); int i = 0; for (i = 0; i < _TaskNum; i++) { _Tasks[i] = Task.Factory.StartNew((num) => { var taskid = (int)num; Work1(taskid); }, i); } var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) => { Task.WaitAll(_Tasks); Console.WriteLine("=========================================================="); Console.WriteLine("All Phase is completed"); Console.WriteLine("=========================================================="); }); try { finalTask.Wait(); } catch (AggregateException aex) { Console.WriteLine("Task failed And Canceled" + aex.ToString()); } finally { m_Semaphore.Dispose(); } Console.ReadLine(); } }}</code>
使用超時和取消
信號量當(dāng)然不可能永久的阻塞在那里,
電腦資料
《C# 并行編程 之 限制資源的并發(fā)訪問 使用SemaphoreS》(http://www.ishadingyu.com)。信號量也提供了超時處理機(jī)制。方法是在Wait函數(shù)中傳入一個超時等待時間 - Wait(int TIMEOUT)。當(dāng)Wait返回值為false時表明它超時了。如果傳入了 -1,則表示無限期的等待。程序示例:注意其中的m_Semaphore.Release();已經(jīng)被注釋掉了,任務(wù)會等待1秒鐘然后超時。
<code class="hljs cs">using System;using System.Text;using System.Threading;using System.Threading.Tasks;using System.Diagnostics;namespace Sample5_8_semaphoreslim{ class Program { private static int _TaskNum = 10; private static Task[] _Tasks; private const int MAX_RESOURCE = 3; private const int RUN_LOOP = 10; private static SemaphoreSlim m_Semaphore; private static void Work1(int TaskID) { int i = 0; var sw = Stopwatch.StartNew(); var rnd = new Random(); while (i < RUN_LOOP) { Thread.Sleep(rnd.Next(200, 500)); Console.WriteLine("TASK " + TaskID + " REQUESTing {"); if (!m_Semaphore.Wait(1000)) { Console.WriteLine("TASK " + TaskID + " TIMEOUT!!!"); return; } try { Console.WriteLine("TASK " + TaskID + " WOrking ... ..." + i); sw.Restart(); Thread.Sleep(rnd.Next(2000, 5000)); } finally { Console.WriteLine("TASK " + TaskID + " REQUESTing }"); //m_Semaphore.Release(); i++; } } } static void Main(string[] args) { _Tasks = new Task[_TaskNum]; m_Semaphore = new SemaphoreSlim(MAX_RESOURCE); int i = 0; for (i = 0; i < _TaskNum; i++) { _Tasks[i] = Task.Factory.StartNew((num) => { var taskid = (int)num; Work1(taskid); }, i); } var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) => { Task.WaitAll(_Tasks); Console.WriteLine("=========================================================="); Console.WriteLine("All Phase is completed"); Console.WriteLine("=========================================================="); }); try { finalTask.Wait(); } catch (AggregateException aex) { Console.WriteLine("Task failed And Canceled" + aex.ToString()); } finally { m_Semaphore.Dispose(); } Console.ReadLine(); } }}</code>
跨進(jìn)程或AppDomain的同步
如果需要有跨進(jìn)程或AppDomain的同步時,可以考慮使用Semaphore。Semaphore是取得的Windows 內(nèi)核的信號量,所以在整個系統(tǒng)中是有效的。
它主要的接口時 Release和WaitOne,使用的方式和SemaphoreSlim是一致的。