不喜欢看废话的同学直接跳到 九 看代码
一、
有时候我们会有这样的需求:
一个或多个线程(Senders)向一个队列(FIFO)中写入数据,
另外一个或多个线程(Receivers)从这个队列中取数据,并对数据进行处理或加工
这就是异步队列
PS:发送者(sender)/接收者(receiver)有时也被叫做生产者(producer)/消费者(consumer )
二、
最近在项目中有使用(本地)异步队列的需求,在网上搜了一圈,都不是很满意:(所以说.NET生态还有待加强)
一种是通过事件机制来触发,这种方式写出的代码比较“分散”,不易阅读,理解和维护,这种当然不能接受啦,
另一种是通过“阻塞”模式来优化代码的可读性,代价就是浪费性能,
拜托,现在都什么年代了,怎么可能阻塞线程呢?当然是使用 C# 5.0 引入的 async/await啦。
因为搜不到,所以只能自己动手了
三、
我们的目标当然是写出这样的代码:
var x = await queue.Dequeue(cancellationToken);
并且内部的实现必须是非阻塞式的,
基于这个目标我们需要知道一个知识点信号量
四、
信号量简单来说就是对一个资源打上一个数字的标记,
这个数字(正数)表示了这个资源可以同时被多少个对象访问,(负数)还有多少个对象需要访问他
打个比方:一支笔,他同时只能被一个人使用,所以我可以初始给他打上一个信号量1
当第一个小朋友来借笔时,首先观察信号量1(大于0),则表示可以将笔(资源)借(分配)给小朋友(对象),并将信号量-1,此时信号量为0
第二个小朋友来借笔时,信号量为0,表示需要等待,并将信号量-1,此时信号量为-1(表示有1个对象正在等待资源释放)
如果这时,第一个小朋友,将笔(资源)归还(释放),则将信号量+1,并将笔借给第二个小朋友,此时信号量为0(表示无等待)
如果在第一个小朋友还没有将笔归还之前,第二个小朋友表示不愿意再等了,则信号量也-1
例子2:
一个小游泳池,可以同时允许10个人一起下水,则初始信号量为10
第一个人来,信号量-1,得到9,大于等于0,表示可以进去玩
第二人人来,信号量-1,得到8,大于等于0,表示可以进去玩
......
第十个人来,信号量-1,得到0,大于等于0,表示可以进去玩
第十一个人来,信号量-1,得到-1,小于0,表示需要等待
第十二个人来,信号量-1,得到-2,小于0,表示需要等待
第十三个人来,信号量-1,得到-3,小于0,表示需要等待
第一个人走了,信号量+1,将第十个人放进去,信号量等于-2,有2个人在等待
第十二个人走了,信号量+1,信号量等于-1,有1个人在等待
与信号量的处理相关的还有一个PV操作,了解一下
五、
在C#中有专门用于解决信号量问题的类:Semaphore
和SemaphoreSlim
Semaphore:限制可同时访问某一资源或资源池的线程数。
SemaphoreSlim:对可同时访问资源或资源池的线程数加以限制的 System.Threading.Semaphore 的轻量替代。
这里我选择更轻量的SemaphoreSlim
来实现,他的用法也非常简单
var s = new SemaphoreSlim(1); // 计数器初始值1
await s.WaitAsync(cancellationToken); // 计数器-1,如果计数不足则等待(这个类的设计是计数到0就不会再减少了)
s.Release(); // 计数器+1
下面就开始实现一个异步队列
六、
先定义一个异步队列的接口
// 异步队列接口
public interface IAsyncQueue<T>: IDisposable
{
// 清空队列。
Task Clear(CancellationToken token);
// 移除并返回位于队列开始处的对象。
Task<T> Dequeue(CancellationToken token);
// 将对象添加到队列的结尾处。
Task Enqueue(T item, CancellationToken token);
}
定义接口的好处是为了方便写扩展方法和以后对实现的修改
七、
定义信号量
从接口中可以看出,入和出2个操作都是异步的,所以需要定义2个信号量
private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
private readonly SemaphoreSlim _out = new SemaphoreSlim(0);
入操作的信号量初始值是1,表示允许1个并发执行
出操作的信号量初始值是0,因为出操作的信号量是根据队列中的元素个数来决定的,初始队列元素个数为0
定义一个内部队列,用于实现队列的基本操作
private readonly Queue<T> _queue = new Queue<T>();
实现类定义:
// 异步消息队列实现
sealed class AsyncQueue<T> : IAsyncQueue<T>
{
// 内部队列实例
private readonly Queue<T> _queue = new Queue<T>();
// 入操作信号量
private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
// 出操作信号量
private readonly SemaphoreSlim _out = new SemaphoreSlim(0);
public Task Clear(CancellationToken token) => throw new NotImplementedException();
public Task<T> Dequeue(CancellationToken token) => throw new NotImplementedException();
public Task Enqueue(T item, CancellationToken token) => throw new NotImplementedException();
public void Dispose() => throw new NotImplementedException();
}
八、
入(Enqueue)操作
public async Task Enqueue(T item, CancellationToken token)
{
await _in.WaitAsync(token); // 入操作信号量-1,并发时等待,只允许一个线程操作
try
{
_queue.Enqueue(item); // 将对象放入队列
_out.Release(); // “出”操作信号量+1
}
finally
{
_in.Release(); // 如果Wait操作完成,则必须将信号量施放
}
}
出(Dequeue)操作
public async Task<T> Dequeue(CancellationToken token)
{
await _out.WaitAsync(token); // 同上,出操作比较简单就不赘述了
return _queue.Dequeue();
}
清空(Clear)操作
public async Task Clear(CancellationToken token)
{
await _in.WaitAsync(token); // 先占中入操作的资源,防止操作中插入新的对象
try
{
// 循环调用出操作的Wait,将信号量减为0
// WaitAsync(100)表示每次操作等待100毫秒,为了防止另一个线程将`_out`的最后一个资源抢先领取后,清空操作无限期等待
while (await _out.WaitAsync(100) || _out.CurrentCount > 0)
{
}
_queue.Clear();
}
finally
{
_in.Release();
}
}
九、
完整代码:
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace blqw
{
sealed class AsyncQueue<T> : IAsyncQueue<T>
{
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
private readonly SemaphoreSlim _out = new SemaphoreSlim(0);
public async Task Clear(CancellationToken token)
{
await _in.WaitAsync(token);
try
{
while (await _out.WaitAsync(100) || _out.CurrentCount > 0)
{
_queue.TryDequeue(out _);
}
}
finally
{
_in.Release();
}
}
public async Task<T> Dequeue(CancellationToken token)
{
await _out.WaitAsync(token);
return _queue.TryDequeue(out var val) ? val : throw new System.InvalidOperationException();
}
public async Task Enqueue(T item, CancellationToken token)
{
await _in.WaitAsync(token);
try
{
_queue.Enqueue(item);
_out.Release();
}
finally
{
_in.Release();
}
}
void DisposeSemaphoreSlim(SemaphoreSlim ss)
{
try
{
ss.Dispose();
}
catch { }
}
public void Dispose()
{
DisposeSemaphoreSlim(_in);
DisposeSemaphoreSlim(_out);
}
}
}
64行
十、
工厂类
/// <summary>
/// 异步队列
/// </summary>
public static class AsyncQueue
{
public static IAsyncQueue<T> Create<T>() => new AsyncQueue<T>();
}
不直接公开 AsyncQueue<T> 是考虑到以后方便替换实现类
拓展类
public static class AsyncQueueExtensions
{
public static Task Clear<T>(this IAsyncQueue<T> aq) => aq.Clear(CancellationToken.None);
public static Task Clear<T>(this IAsyncQueue<T> aq, int millisecondsTimeout)
{
var source = new CancellationTokenSource(millisecondsTimeout);
return aq.Clear(source.Token).ContinueWith(t => source.Dispose());
}
public static Task Clear<T>(this IAsyncQueue<T> aq, TimeSpan timeout)
{
var source = new CancellationTokenSource(timeout);
return aq.Clear(source.Token).ContinueWith(t => source.Dispose());
}
public static Task<T> Dequeue<T>(this IAsyncQueue<T> aq) => aq.Dequeue(CancellationToken.None);
public static async Task<T> Dequeue<T>(this IAsyncQueue<T> aq, int millisecondsTimeout)
{
using (var source = new CancellationTokenSource(millisecondsTimeout))
{
return await aq.Dequeue(source.Token);
}
}
public static async Task<T> Dequeue<T>(this IAsyncQueue<T> aq, TimeSpan timeout)
{
using (var source = new CancellationTokenSource(timeout))
{
return await aq.Dequeue(source.Token);
}
}
public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item) => aq.Enqueue(item, CancellationToken.None);
public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item, int millisecondsTimeout)
{
var source = new CancellationTokenSource(millisecondsTimeout);
return aq.Enqueue(item, source.Token).ContinueWith(t => source.Dispose());
}
public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item, TimeSpan timeout)
{
var source = new CancellationTokenSource(timeout);
return aq.Enqueue(item, source.Token).ContinueWith(t => source.Dispose());
}
public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items)
{
if (items != null)
{
foreach (var item in items)
{
await aq.Enqueue(item, CancellationToken.None);
}
}
}
public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items, int millisecondsTimeout)
{
if (items != null)
{
using (var source = new CancellationTokenSource(millisecondsTimeout))
{
foreach (var item in items)
{
await aq.Enqueue(item, CancellationToken.None);
}
}
}
}
public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items, TimeSpan timeout)
{
if (items != null)
{
using (var source = new CancellationTokenSource(timeout))
{
foreach (var item in items)
{
await aq.Enqueue(item, CancellationToken.None);
}
}
}
}
}
十一、
现在来测试一下
为了方便观察测试结果,先写一个将结果改为彩色的类,并且是异步的,不影响测试代码
static class ColorConsole
{
public static void WriteLine(string value, ConsoleColor? backgroundColor = null, ConsoleColor? foregroundColor = null)
{
Task.Run(() =>
{
lock (typeof(Console))
{
Console.ResetColor();
if (backgroundColor != null)
{
Console.BackgroundColor = backgroundColor.Value;
}
if (foregroundColor != null)
{
Console.ForegroundColor = foregroundColor.Value;
}
Console.WriteLine(value);
}
});
}
}
发送者:
class Sender
{
private readonly int _index;
private readonly IAsyncQueue<string> _queue;
private readonly ConsoleColor _background;
public Sender(int index, IAsyncQueue<string> queue, ConsoleColor background)
{
_index = index;
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
_background = background;
}
public async Task Send(string message)
{
ColorConsole.WriteLine($"{_index}号发送者写入{message}", backgroundColor: _background);
await Task.Delay(100 + Math.Abs(new object().GetHashCode() % 300)); // 加入延迟模拟实际场景
await _queue.Enqueue(message); // 关键代码
}
}
接收者
class Receiver
{
private readonly int _index;
private readonly IAsyncQueue<string> _queue;
private readonly ConsoleColor _foreground;
public Receiver(int index, IAsyncQueue<string> queue, ConsoleColor foreground)
{
_index = index;
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
_foreground = foreground;
}
public async Task Receive(CancellationToken token)
{
try
{
while (true)
{
var str = await _queue.Dequeue(token); // 关键代码
ColorConsole.WriteLine($"{_index}号接收者获取到:{str}", foregroundColor: _foreground);
await Task.Delay(100 + Math.Abs(new object().GetHashCode() % 300)); // 加入延迟,模拟实际场景
}
}
catch (OperationCanceledException)
{
ColorConsole.WriteLine($"{_index}号接收者关闭", foregroundColor: _foreground);
}
}
}
测试类
static void Main(string[] args)
{
var queue = AsyncQueue.Create<string>(); // 初始化异步队列
var source = new CancellationTokenSource(); // 初始化取消标志
var token = source.Token;
var senders = Enumerable.Range(0, 3).Select(index => new Sender(index, queue, (ConsoleColor)(index+13))).ToArray(); // 初始化3个发送者
var receivers = Enumerable.Range(0, 10).Select(index => new Receiver(index, queue, (ConsoleColor)(index + 5))).ToArray(); // 初始化10个接收者
Parallel.ForEach(receivers, async x => await x.Receive(token)); // 并行启动10个接收者
Thread.Sleep(1000); // 延迟1秒 等待接收者全部启动完成
var message = 0;
// 并行启动3个发送者,每个发送者发送10次,发送内容为从1开始自增的整型数字,也就是1~30
Parallel.ForEach(senders, async x =>
{
for (var i = 0; i < 10; i++)
{
await x.Send(Interlocked.Increment(ref message).ToString());
}
});
Console.ReadLine();
source.Cancel(); // 停止所有接收者
Console.ReadLine();
}
十二、
由于整个过程都是异步的,所以打印结果并不会是顺序的
十三、
十四、
如果可以帮到你,别忘了帮我点一下喜欢,让更多的人看到