<>今天介绍一下生产队列中常用的队列配置,怎么设置队列中任务使得任务之间有个优先级关系,怎么动态指定任务发送的队列。

<>一、Celery队列配置简介
CELERY_QUEUES = ( Queue("celery", Exchange("celery"), routing_key="celery") )
celery是设置的队列名
Exchange 是交换机的名称
routing_key 交换机跟队列交流的key
简单讲一下流程就是 celery服务端从rabbitmq中指定的交换机拿到对应的数据,然后发送给有这个交换机的指定的队列。

这个是celery默认的队列设置。

现在我们加多一个队列,设置这个队列具备优先级属性。
# 优先级队列设置 CELERY_ACKS_LATE = True CELERYD_PREFETCH_MULTIPLIER = 1 CELERY_QUEUES
= ( Queue("celery", Exchange("celery"), routing_key="celery"), Queue(
"celery_demo", Exchange("celery_demo"), routing_key="celery_demo",
queue_arguments={'x-max-priority': 9}) )
可以看到队列配置加多了两个属性,以及设置了队列优先级数值queue_arguments最高为9。
注意CELERY_ACKS_LATE,CELERYD_PREFETCH_MULTIPLIER这两个属性一定要加到配置里面,不然优先级属性不会生效。

<>二、动态指定任务队列,设置优先级

在提交任务的时候apply_async设置队列指定为优先级队列celery_demo,并且设置优先级为5
from celery_test.task_register import * from celery import group from celery
import chord sig = add.s(1, 1) sig.apply_async(queue='celery_demo',priority=5)
<>三、执行 任务,对比结果

为了更加方便进行对比,我们对注册一个任务,睡眠10s再进行相加
@app.task def add_time(x, y): time.sleep(10) return x + y
<>1、启动服务端监听任务


可以看到这个服务端监听了两个队列,默认的celery队列以及有优先级属性的celery_demo,从RabbitMQ也可以看出来,具有pri的是则是设置了优先级的队列。

<>2、执行任务

为了方便进行对比优先级有没有生效,我们这里设置了三个任务,分别如下
sig = add_time.s(1, 1) sig.apply_async(queue='celery') sig = add_time.s(2, 2)
sig.apply_async(queue='celery') sig = add_time.s(3, 3) sig.apply_async(queue=
'celery')
先看在默认队列的执行结果

可以看到这三个任务都是顺序执行,哪个任务先提交的就先执行哪个任务,这也符合没有优先级的时候。
接着我们将这三个任务放到具有优先级队列celery_demo
并且设置三个任务的优先级参数
sig = add_time.s(1, 1) sig.apply_async(queue='celery_demo',priority=5) sig =
add_time.s(2, 2) sig.apply_async(queue='celery_demo',priority=7) sig = add_time.
s(3, 3) sig.apply_async(queue='celery_demo',priority=8)

可以看到先执行了任务1,然后执行了任务3,最后执行了任务2
为什么会这样呢,不应该是优先级最高的任务3最先执行吗?
这里我们要考虑一个问题,在当前我们的队列是空的,也就是在队列非阻塞的情况下,当然是哪个任务先到先提交,哪个任务就先执行。
接着任务1在执行的时候sleep了10s,也就是把队列阻塞了10s中,这时候队列中还有任务2,任务3。
这时候任务2.任务3就要进行排序了,怎么排序呢,就是根据priority设置的数值,谁大谁优先执行。
这种只会在设置了优先级队列的时候才会进行这种排序,不然都是按照任务提交的顺序进行。

可能会有人会问如果提交到优先级队列,但是不设置priority,会默认最高级还是最低级呢?
OK,我们看一下结果
sig = add_time.s(1, 1) sig.apply_async(queue='celery_demo',priority=5) sig =
add_time.s(2, 2) sig.apply_async(queue='celery_demo') sig = add_time.s(3, 3) sig
.apply_async(queue='celery_demo',priority=1)

从结果上来看队列会默认没有设置优先级的最晚执行,按照顺序执行。

<>Celery队列的一些基础设置,优先级设置,动态指定队列就先讲到这里,如果有什么问题可以评论告诉我哦。

我是一只前进的蚂蚁,希望能一起前行。

如果对您有一点帮助,您的三连就是我创作的最大的动力,感谢!

下一篇继续讲一下Celery怎么创建并行任务,工作流任务,工作链任务。

注:如果本篇博客有任何错误和建议,欢迎各位指出,不胜感激!!!

技术
©2019-2020 Toolsou All rights reserved,
2021年2月程序员工资统计,平均15144元初识MySQL之综合复习篇(干货)Faster RCNN系列算法原理讲解(笔记)谷歌称居家办公影响工作效率!2021 年将回归线下办公C语言控制台小游戏,打砖块GDOI2019 游记CSS架构设计Python基础知识整理笔记2019年终总结——工作第二年用C++跟你聊聊“原型模式” (复制/拷贝构造函数)