celery的异步任务设计建议

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
笔者在学习这个框架的时候,发现它的许多用法代表了分布式系统设计的一些有意义的策略和方法,选取了Celery User Guide的Task章节做如下翻译。

原文地址:http://docs.celeryproject.org/en/master/userguide/tasks.html
翻译者:茶客furu声
邮箱:shawntai.ds@gmail.com

小建议与最佳实践

忽略掉你不需要的任务结果

如果你不关心任务的执行结果,请确保ignore_result
是关闭的状态,因为存储结果将耗费时间并且占用资源。

@app.task(ignore_result=True) def mytask():
     something()

另外,任务结果的设置还可以由task_ignore_result来指定为全局设置。

更多的优化建议

你将在Optimizing Guide看到更多的优化建议。

避免创建同步的子任务

让一个任务等待另一个任务是十分低效的,如果并行工作池的资源耗尽了,还将造成任务死锁。
建议采用异步任务的方式,比如可以使用回调函数来实现。

  • 不好的方式:
@app.task def update_page_info(url):
     page = fetch_page.delay(url).get()
     info = parse_page.delay(url, page).get()
     store_page_info.delay(url, info)
 @app.task def fetch_page(url):
     return myhttplib.get(url)
 @app.task def parse_page(url, page):
     return myparser.parse_document(page)
 @app.task def store_page_info(url, info):
     return PageInfo.objects.create(url, info)
  • 好的方式:
def update_page_info(url):
     # fetch_page -> parse_page -> store_page
     chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
     chain() @app.task() def fetch_page(url):
     return myhttplib.get(url)
 @app.task() def parse_page(page):
     return myparser.parse_document(page)
 @app.task(ignore_result=True)
 def store_page_info(info, url):
     PageInfo.objects.create(url=url, info=info)

这里我将不同的任务通过签名signature()连接起来,创建一个任务链。在Canvas: Designing Work-flows你可以看到关于任务链和其他强大的结构的使用方法。
默认的,celery不允许你运行同步任务,但在罕见的极端情况下你可能必须这么做?!揪妗坎唤ㄒ榻尤挝褡龀赏缴杓?。

@app.task def update_page_info(url):
     page = fetch_page.delay(url).get(disable_sync_subtasks=False)
     info = parse_page.delay(url, page).get(disable_sync_subtasks=False)
     store_page_info.delay(url, info)
 @app.task def fetch_page(url):
     return myhttplib.get(url)
 @app.task def parse_page(url, page):
     return myparser.parse_document(page)
 @app.task def store_page_info(url, info):
     return PageInfo.objects.create(url, info)

性能与策略

颗粒度

任务颗粒度是指每个子任务所需的计算量。总体上说,把大的问题拆分成很多个小的任务,比很少的几个长任务要好一点。因为采用小任务的方式,你可以并行执行很多小任务,并且这些小任务不会因为执行时间太长而阻塞了执行进程,使它无法运行其他正在等待的任务。
但是,执行任务是有开销的,比如需要发送一个消息、数据不在本地(译者注:需要到远端请求数据或将数据存储到远端)等。所以如果任务粒度被拆分的过细,这些开销很可能将使你得不偿失。

参考
Art of Concurrency 这本书有一个章节介绍了关于任务开销的问题[AOC1]。

数据位置

任务执行进程离数据越近越好。最好是在本地的内存里,最差的情况,可别是从另一个大陆传输过来啊。如果数据离你很远,你可以考虑在那个地方跑另一个任务执行进程。如果这都做不到,那就把常用的数据缓存起来,或者预先把一些常用数据读过来。
在任务执行器之间共享数据的最简单的办法,就是使用一个分布式缓存系统,例如memcached。

参考
由Jim Gray写的Distributed Computing Economics这篇文章是介绍数据位置这个课题的很棒的介绍。

状态

由于celery是一个分布式系统,你不知道任务在哪个进程、或者在什么机器上执行,你甚至不知道任务是否将及时被运行。
古老的异步名言告诉我们,“要靠每个任务来负责维护整个世界”,意思是说,在某个任务要执行的时候,这个世界可能看起来已经变化了,因此任务端要肩负起保证这个世界是他应有的样子的责任。例如,如果你有一个任务需要对一个搜索引擎进行重新索引,并且这个重新索引的过程需要最多5分钟来执行,那么必须由任务端来负责这个事情,而不是调用端。
假设以下的场景,你有一片文章,并且有一个任务是自动添加缩略语的扩写:

class Article(models.Model):
     title = models.CharField()
     body = models.TextField()

 @app.task
 def expand_abbreviations(article):
     article.body.replace('MyCorp', 'My Corporation')
     article.save()

首先,有一个作者创建了一篇文章并且保存了它,然后他点击按钮来启动这个自动添加缩略语扩写的任务。

>>> article = Article.objects.get(id=102)
>>> expand_abbreviations.delay(article)

刚好,这个队列十分繁忙,这个任务在2分钟内不会被执行到。这个时候,另一个作者修改了这篇文章,但这个任务仍然带着旧的文章主体作为参数,所以当最后这个任务被运行的时候,文章就被覆盖为旧版本了。
解决这个争抢问题十分简单,只需要使用文章id代替文章主体或文章对象作为任务参数,并且在执行任务之前对文章重新读取就可以了。

@app.task
 def expand_abbreviations(article_id):
     article = Article.objects.get(id=article_id)
     article.body.replace('MyCorp', 'My Corporation')
     article.save()
>>> expand_abbreviations.delay(article_id)

另外,因为发送大体积的消息是十分昂贵的,采用以上的方法还可以有额外的性能提升。

数据库事务

来看看另一个例子:

from django.db import transaction

@transaction.commit_on_success
def create_article(request):
     article = Article.objects.create()
     expand_abbreviations.delay(article.pk)

这是一个Django视图,它在数据库创建了一个文章对象,然后将主键传给任务去执行。这个任务定义时使用了commit_on_success这个修饰器,作用是,当返回视图的时候将执行事务提交,或者视图产生异常的时候将执行事务回滚。
如果在事务完成提交之前,任务启动了,那么此时将造成争抢的情况。而这个时候数据库对象还没有被创建出来。
解决的方案是,在所有的事务被成功提交之后,使用on_commit回调函数来创建你的celery任务。

from django.db.transaction import on_commit

def create_article(request):
     article = Article.objects.create()
     on_commit(lambda: expand_abbreviations.delay(article.pk))

示例

让我们找一个真实世界的例子:某个博客的评论发表时,需要经过垃圾过滤,当评论创建的时候,垃圾过滤器将在后台运行,执行过滤操作,这样用户就不用干等着评论过滤执行完成了。
我有一个基于Django的博客应用,它允许用户给博客文章发标评论。我将介绍一下这个博客的模型/视图和任务??椤?/p>

  • blog/models.py
    评论??榭雌鹄聪裾庋?/li>
from django.db import models
from django.utils.translation import ugettext_lazy as _

class Comment(models.Model):
     name = models.CharField(_('name'), max_length=64)
     email_address = models.EmailField(_('email address'))
     homepage = models.URLField(_('home page'),                                blank=True, verify_exists=False)
     comment = models.TextField(_('comment'))
     pub_date = models.DateTimeField(_('Published date'),                                     editable=False, auto_add_now=True)
     is_spam = models.BooleanField(_('spam?'),                                   default=False, editable=False)

    class Meta:
         verbose_name = _('comment')
         verbose_name_plural = _('comments')

在视图部分,当评论发表的时候,我先将评论写进数据库,然后在启动垃圾过滤任务。

  • blog/views.py
from django import forms 
from django.http import HttpResponseRedirect 
from django.template.context import RequestContext 
from django.shortcuts import get_object_or_404, render_to_response 

from blog import tasks 
from blog.models import Comment

class CommentForm(forms.ModelForm):

     class Meta:
         model = Comment

def add_comment(request, slug, template_name='comments/create.html'):
     post = get_object_or_404(Entry, slug=slug)
     remote_addr = request.META.get('REMOTE_ADDR')

     if request.method == 'post':
         form = CommentForm(request.POST, request.FILES)
         if form.is_valid():
             comment = form.save()
             # Check spam asynchronously.             
             tasks.spam_filter.delay(comment_id=comment.id, remote_addr=remote_addr)
             return HttpResponseRedirect(post.get_absolute_url())

     else:
         form = CommentForm()

     context = RequestContext(request, {'form': form})
     return render_to_response(template_name, context_instance=context)

为了过滤垃圾评论,我用上了Akismet,这个服务以往是用在开源博客wordpress上的。Akismet对个人用户免费,但商用时需要付费。你必须注册到他们的服务来获取API key。为了调用这个Akismet,我使用了Michael Foord写的akismet.py库。

  • blog/tasks.py
from celery import Celery 
from akismet import Akismet 
from django.core.exceptions import ImproperlyConfigured 
from django.contrib.sites.models import Site 
from blog.models import Comment 
app = Celery(broker='amqp://') 
@app.task 
def spam_filter(comment_id, remote_addr=None):
     logger = spam_filter.get_logger()
     logger.info('Running spam filter for comment %s', comment_id)
     comment = Comment.objects.get(pk=comment_id)
     current_domain = Site.objects.get_current().domain
     akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))
     if not akismet.verify_key():
         raise ImproperlyConfigured('Invalid AKISMET_KEY')
     is_spam = akismet.comment_check(user_ip=remote_addr, comment_content=comment.comment, comment_author=comment.name, comment_author_email=comment.email_address)
     if is_spam:
         comment.is_spam = True
         comment.save()
     return is_spam
最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容