64行C# 代码实现异步队列

不喜欢看废话的同学直接跳到 看代码

一、

有时候我们会有这样的需求:
一个或多个线程(Senders)向一个队列(FIFO)中写入数据,
另外一个或多个线程(Receivers)从这个队列中取数据,并对数据进行处理或加工
这就是异步队列


图片来自网络

PS:发送者(sender)/接收者(receiver)有时也被叫做生产者(producer)/消费者(consumer )

二、

最近在项目中有使用(本地)异步队列的需求,在网上搜了一圈,都不是很满意:(所以说.NET生态还有待加强)
一种是通过事件机制来触发,这种方式写出的代码比较“分散”,不易阅读,理解和维护,这种当然不能接受啦,
另一种是通过“阻塞”模式来优化代码的可读性,代价就是浪费性能,
拜托,现在都什么年代了,怎么可能阻塞线程呢?当然是使用 C# 5.0 引入的 async/await啦。
因为搜不到,所以只能自己动手了

三、

我们的目标当然是写出这样的代码:

var x = await queue.Dequeue(cancellationToken);

并且内部的实现必须是非阻塞式的,
基于这个目标我们需要知道一个知识点信号量

四、

信号量简单来说就是对一个资源打上一个数字的标记,
这个数字(正数)表示了这个资源可以同时被多少个对象访问,(负数)还有多少个对象需要访问他
打个比方:一支笔,他同时只能被一个人使用,所以我可以初始给他打上一个信号量1
当第一个小朋友来借笔时,首先观察信号量1(大于0),则表示可以将笔(资源)借(分配)给小朋友(对象),并将信号量-1,此时信号量为0
第二个小朋友来借笔时,信号量为0,表示需要等待,并将信号量-1,此时信号量为-1(表示有1个对象正在等待资源释放)
如果这时,第一个小朋友,将笔(资源)归还(释放),则将信号量+1,并将笔借给第二个小朋友,此时信号量为0(表示无等待)
如果在第一个小朋友还没有将笔归还之前,第二个小朋友表示不愿意再等了,则信号量也-1

例子2:
一个小游泳池,可以同时允许10个人一起下水,则初始信号量为10
第一个人来,信号量-1,得到9,大于等于0,表示可以进去玩
第二人人来,信号量-1,得到8,大于等于0,表示可以进去玩
......
第十个人来,信号量-1,得到0,大于等于0,表示可以进去玩
第十一个人来,信号量-1,得到-1,小于0,表示需要等待
第十二个人来,信号量-1,得到-2,小于0,表示需要等待
第十三个人来,信号量-1,得到-3,小于0,表示需要等待
第一个人走了,信号量+1,将第十个人放进去,信号量等于-2,有2个人在等待
第十二个人走了,信号量+1,信号量等于-1,有1个人在等待

与信号量的处理相关的还有一个PV操作,了解一下

五、

在C#中有专门用于解决信号量问题的类:SemaphoreSemaphoreSlim

Semaphore:限制可同时访问某一资源或资源池的线程数。
SemaphoreSlim:对可同时访问资源或资源池的线程数加以限制的 System.Threading.Semaphore 的轻量替代。

这里我选择更轻量的SemaphoreSlim来实现,他的用法也非常简单

var s = new SemaphoreSlim(1);         // 计数器初始值1
await s.WaitAsync(cancellationToken); // 计数器-1,如果计数不足则等待(这个类的设计是计数到0就不会再减少了)
s.Release();                          // 计数器+1

下面就开始实现一个异步队列

六、

先定义一个异步队列的接口

// 异步队列接口
public interface IAsyncQueue<T>: IDisposable
{
    // 清空队列。
    Task Clear(CancellationToken token);
    // 移除并返回位于队列开始处的对象。
    Task<T> Dequeue(CancellationToken token);
    // 将对象添加到队列的结尾处。
    Task Enqueue(T item, CancellationToken token);
}

定义接口的好处是为了方便写扩展方法和以后对实现的修改

七、

定义信号量
从接口中可以看出,入和出2个操作都是异步的,所以需要定义2个信号量

private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
private readonly SemaphoreSlim _out = new SemaphoreSlim(0);

入操作的信号量初始值是1,表示允许1个并发执行
出操作的信号量初始值是0,因为出操作的信号量是根据队列中的元素个数来决定的,初始队列元素个数为0

定义一个内部队列,用于实现队列的基本操作

private readonly Queue<T> _queue = new Queue<T>();

实现类定义:

// 异步消息队列实现
sealed class AsyncQueue<T> : IAsyncQueue<T>
{
    // 内部队列实例
    private readonly Queue<T> _queue = new Queue<T>();
    // 入操作信号量
    private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
    // 出操作信号量
    private readonly SemaphoreSlim _out = new SemaphoreSlim(0);

    public Task Clear(CancellationToken token) => throw new NotImplementedException();
    public Task<T> Dequeue(CancellationToken token) => throw new NotImplementedException();
    public Task Enqueue(T item, CancellationToken token) => throw new NotImplementedException();
    public void Dispose() => throw new NotImplementedException();
}

八、

入(Enqueue)操作

public async Task Enqueue(T item, CancellationToken token)
{
    await _in.WaitAsync(token); // 入操作信号量-1,并发时等待,只允许一个线程操作
    try
    {
        _queue.Enqueue(item);   // 将对象放入队列
        _out.Release();         // “出”操作信号量+1
    }
    finally
    {
        _in.Release();          // 如果Wait操作完成,则必须将信号量施放
    }
}

出(Dequeue)操作

public async Task<T> Dequeue(CancellationToken token)
{
    await _out.WaitAsync(token);  // 同上,出操作比较简单就不赘述了
    return _queue.Dequeue();
}

清空(Clear)操作

public async Task Clear(CancellationToken token)
{
    await _in.WaitAsync(token);  // 先占中入操作的资源,防止操作中插入新的对象
    try
    {
        // 循环调用出操作的Wait,将信号量减为0
        // WaitAsync(100)表示每次操作等待100毫秒,为了防止另一个线程将`_out`的最后一个资源抢先领取后,清空操作无限期等待
        while (await _out.WaitAsync(100) || _out.CurrentCount > 0) 
        {
        }
        _queue.Clear();
    }
    finally
    {
        _in.Release();
    }
}

九、
完整代码:

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace blqw
{
    sealed class AsyncQueue<T> : IAsyncQueue<T>
    {
        private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
        private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
        private readonly SemaphoreSlim _out = new SemaphoreSlim(0);

        public async Task Clear(CancellationToken token)
        {
            await _in.WaitAsync(token);
            try
            {
                while (await _out.WaitAsync(100) || _out.CurrentCount > 0)
                {
                    _queue.TryDequeue(out _);
                }
            }
            finally
            {
                _in.Release();
            }
        }

        public async Task<T> Dequeue(CancellationToken token)
        {
            await _out.WaitAsync(token);
            return _queue.TryDequeue(out var val) ? val : throw new System.InvalidOperationException();
        }

        public async Task Enqueue(T item, CancellationToken token)
        {
            await _in.WaitAsync(token);
            try
            {
                _queue.Enqueue(item);
                _out.Release();
            }
            finally
            {
                _in.Release();
            }
        }

        void DisposeSemaphoreSlim(SemaphoreSlim ss)
        {
            try
            {
                ss.Dispose();
            }
            catch { }
        }

        public void Dispose()
        {
            DisposeSemaphoreSlim(_in);
            DisposeSemaphoreSlim(_out);
        }
    }
}

64行

十、

工厂类

/// <summary>
/// 异步队列
/// </summary>
public static class AsyncQueue
{
    public static IAsyncQueue<T> Create<T>() => new AsyncQueue<T>();
}

不直接公开 AsyncQueue<T> 是考虑到以后方便替换实现类

拓展类

public static class AsyncQueueExtensions
{
    public static Task Clear<T>(this IAsyncQueue<T> aq) => aq.Clear(CancellationToken.None);

    public static Task Clear<T>(this IAsyncQueue<T> aq, int millisecondsTimeout)
    {
        var source = new CancellationTokenSource(millisecondsTimeout);
        return aq.Clear(source.Token).ContinueWith(t => source.Dispose());
    }

    public static Task Clear<T>(this IAsyncQueue<T> aq, TimeSpan timeout)
    {
        var source = new CancellationTokenSource(timeout);
        return aq.Clear(source.Token).ContinueWith(t => source.Dispose());
    }

    public static Task<T> Dequeue<T>(this IAsyncQueue<T> aq) => aq.Dequeue(CancellationToken.None);

    public static async Task<T> Dequeue<T>(this IAsyncQueue<T> aq, int millisecondsTimeout)
    {
        using (var source = new CancellationTokenSource(millisecondsTimeout))
        {
            return await aq.Dequeue(source.Token);
        }
    }

    public static async Task<T> Dequeue<T>(this IAsyncQueue<T> aq, TimeSpan timeout)
    {
        using (var source = new CancellationTokenSource(timeout))
        {
            return await aq.Dequeue(source.Token);
        }
    }

    public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item) => aq.Enqueue(item, CancellationToken.None);

    public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item, int millisecondsTimeout)
    {
        var source = new CancellationTokenSource(millisecondsTimeout);
        return aq.Enqueue(item, source.Token).ContinueWith(t => source.Dispose());
    }

    public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item, TimeSpan timeout)
    {
        var source = new CancellationTokenSource(timeout);
        return aq.Enqueue(item, source.Token).ContinueWith(t => source.Dispose());
    }


    public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items)
    {
        if (items != null)
        {
            foreach (var item in items)
            {
                await aq.Enqueue(item, CancellationToken.None);
            }
        }
    }

    public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items, int millisecondsTimeout)
    {
        if (items != null)
        {
            using (var source = new CancellationTokenSource(millisecondsTimeout))
            {
                foreach (var item in items)
                {
                    await aq.Enqueue(item, CancellationToken.None);
                }
            }
        }
    }

    public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items, TimeSpan timeout)
    {
        if (items != null)
        {
            using (var source = new CancellationTokenSource(timeout))
            {
                foreach (var item in items)
                {
                    await aq.Enqueue(item, CancellationToken.None);
                }
            }
        }
    }
}

十一、
现在来测试一下
为了方便观察测试结果,先写一个将结果改为彩色的类,并且是异步的,不影响测试代码

static class ColorConsole
{
    public static void WriteLine(string value, ConsoleColor? backgroundColor = null, ConsoleColor? foregroundColor = null)
    {
        Task.Run(() =>
        {
            lock (typeof(Console))
            {
                Console.ResetColor();
                if (backgroundColor != null)
                {
                    Console.BackgroundColor = backgroundColor.Value;
                }
                if (foregroundColor != null)
                {
                    Console.ForegroundColor = foregroundColor.Value;
                }
                Console.WriteLine(value);
            }
        });
    }
}

发送者:

class Sender
{
    private readonly int _index;
    private readonly IAsyncQueue<string> _queue;
    private readonly ConsoleColor _background;

    public Sender(int index, IAsyncQueue<string> queue, ConsoleColor background)
    {
        _index = index;
        _queue = queue ?? throw new ArgumentNullException(nameof(queue));
        _background = background;
    }

    public async Task Send(string message)
    {
        ColorConsole.WriteLine($"{_index}号发送者写入{message}", backgroundColor: _background);
        await Task.Delay(100 + Math.Abs(new object().GetHashCode() % 300)); // 加入延迟模拟实际场景
        await _queue.Enqueue(message);  // 关键代码
    }
}

接收者

class Receiver
{
    private readonly int _index;
    private readonly IAsyncQueue<string> _queue;
    private readonly ConsoleColor _foreground;

    public Receiver(int index, IAsyncQueue<string> queue, ConsoleColor foreground)
    {
        _index = index;
        _queue = queue ?? throw new ArgumentNullException(nameof(queue));
        _foreground = foreground;
    }

    public async Task Receive(CancellationToken token)
    {
        try
        {
            while (true)
            {
                var str = await _queue.Dequeue(token); // 关键代码
                ColorConsole.WriteLine($"{_index}号接收者获取到:{str}", foregroundColor: _foreground);
                await Task.Delay(100 + Math.Abs(new object().GetHashCode() % 300)); // 加入延迟,模拟实际场景
            }
        }
        catch (OperationCanceledException)
        {
            ColorConsole.WriteLine($"{_index}号接收者关闭", foregroundColor: _foreground);
        }
    }
}

测试类

static void Main(string[] args)
{
    var queue = AsyncQueue.Create<string>(); // 初始化异步队列
    var source = new CancellationTokenSource(); // 初始化取消标志
    var token = source.Token; 
    var senders = Enumerable.Range(0, 3).Select(index => new Sender(index, queue, (ConsoleColor)(index+13))).ToArray(); // 初始化3个发送者
    var receivers = Enumerable.Range(0, 10).Select(index => new Receiver(index, queue, (ConsoleColor)(index + 5))).ToArray(); // 初始化10个接收者

    Parallel.ForEach(receivers, async x => await x.Receive(token)); // 并行启动10个接收者

    Thread.Sleep(1000); // 延迟1秒 等待接收者全部启动完成
    var message = 0;
    // 并行启动3个发送者,每个发送者发送10次,发送内容为从1开始自增的整型数字,也就是1~30
    Parallel.ForEach(senders, async x =>
    {
        for (var i = 0; i < 10; i++)
        {
            await x.Send(Interlocked.Increment(ref message).ToString());
        }
    });

    Console.ReadLine();
    source.Cancel(); // 停止所有接收者
    Console.ReadLine();
}

十二、

由于整个过程都是异步的,所以打印结果并不会是顺序的

运行效果

十三、

github
nuget

十四、

如果可以帮到你,别忘了帮我点一下喜欢,让更多的人看到


最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容

  • 1.ios高性能编程 (1).内层 最小的内层平均值和峰值(2).耗电量 高效的算法和数据结构(3).初始化时...
    欧辰_OSR阅读 29,353评论 8 265
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,094评论 1 32
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 这周从时间效率上看做事低效,原因是想托,因为这周的事情很多从家务和工作,换季整理衣物和清洗衣物窗帘,打扫房...
    春暖花开刘宁阅读 208评论 0 3
  • 原作者@锦璱 * 版权归(锦璱年华&锦璱)所有,未经授权请勿转载 * 不知因为何故,今晚特馋自制的咖喱饭,利用下班...
    锦璱年华阅读 511评论 2 6