Django中使用Celery的方法示例

yipeiwu_com6年前Python基础

起步

在 《分布式任务队列Celery使用说明》 中介绍了在 Python 中使用 Celery 来实验异步任务和定时任务功能。本文介绍如何在 Django 中使用 Celery。

安装

pip install django-celery

这个命令使用的依赖是 Celery 3.x 的版本,所以会把我之前安装的 4.x 卸载,不过对功能上并没有什么影响。我们也完全可以仅用Celery在django中使用,但使用 django-celery 模块能更好的管理 celery。

使用

可以把有关 Celery 的配置放到 settings.py 里去,但我比较习惯单独一个文件来放,然后在 settings.py 引入进来:

# celery_config.py
import djcelery
import os

os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
djcelery.setup_loader()

BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'

# UTC
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'Asia/Shanghai'

CELERY_IMPORTS = (
 'app.tasks',
)

# 有些情况可以防止死锁
CELERY_FORCE_EXECV = True

# 设置并发的worker数量
CELERYD_CONCURRENCY = 4

# 任务发送完成是否需要确认,这一项对性能有一点影响
CELERY_ACKS_LATE = True

# 每个worker执行了多少任务就会销毁,防止内存泄露,默认是无限的
CELERYD_MAX_TASKS_PER_CHILD = 40

# 规定完成任务的时间
CELERYD_TASK_TIME_LIMIT = 15 * 60 # 在15分钟内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程

# 设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中
CELERY_DEFAULT_QUEUE = "default"

# 设置详细的队列
CELERY_QUEUES = {
 "default": { # 这是上面指定的默认队列
  "exchange": "default",
  "exchange_type": "direct",
  "routing_key": "default"
 },
 "beat_queue": {
  "exchange": "beat_queue",
  "exchange_type": "direct",
  "routing_key": "beat_queue"
 }

}

配置文件中设置了 CELERY_IMPORTS 导入的任务,所以在django app中创建相应的任务文件:

# app/tasks.py
from celery.task import Task
import time

class TestTask(Task):
 name = 'test-task' # 给任务设置个自定义名称

 def run(self, *args, **kwargs):
  print('start test task')
  time.sleep(4)
  print('args={}, kwargs={}'.format(args, kwargs))
  print('end test task')

在 settings.py 添加:

INSTALLED_APPS = [
 # ...
 'djcelery',
]

# Celery
from learn_django.celery_config import *

触发任务或提交任务可以在view中来调用:

# views.py
from django.http import HttpResponse
from app.tasks import TestTask

def test_task(request):
 # 执行异步任务
 print('start do request')
 t = TestTask()
 t.delay()
 print('end do request')
 return HttpResponse('ok')

启动 woker 的命令是:

python manage.py celery worker -l info

再启动django,访问该view,可以看到任务在worker中被消费了。

定时任务

在celery的配置文件 celery_config.py 文件中添加:

CELERYBEAT_SCHEDULE = {
 'task1-every-1-min': { # 自定义名称
  'task': 'test-task', # 与任务中name名称一致
  'schedule': datetime.timedelta(seconds=5),
  'args': (2, 15),
  'options': {
   'queue': 'beat_queue', # 指定要使用的队列
  }
 },

}

通过 options 的 queque 来指定要使用的队列,这里需要单独的队列是因为,如果所有任务都使用同一队列,对于定时任务来说,任务提交后会位于队列尾部,任务的执行时间会靠后,所以对于定时任务来说,使用单独的队列。

启动 beat:

python manage.py celery beat -l info

监控工具 flower

如果celery中的任务执行失败了,有些场景是需要对这些任务进行监控, flower 是基于 Tornado 开发的web应用。安装用 pip install flower ;启动它可以是:

python manage.py celery flower

# python manage.py celery flower --basic_auth=admin:admin

用浏览器访问 http://localhost:5555 即可查看:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

解决Mac下首次安装pycharm无project interpreter的问题

Pycharm本身并不带编译器,所以第一次用需要自己下载编译器插件。 1、首先去 https://www.python.org/downloads/ 这个网址去下载对应的python版本...

python通过配置文件共享全局变量的实例

在使用Python编写的应用的过程中,有时会遇到多个文件之间传递同一个全局变量的情况,此时通过配置文件定义全局变量是一个比较好的选择。 首先配置config.py模块,config需要设...

python实现员工管理系统

python实现员工管理系统

这是一个简易的员工管理系统,实现最简单的功能: 1.登录用户密码验证(错误三次自动退出) 2.支持文本员工的搜索、添加、删除、修改 3.一级层级多个选项、二级层级多个选项,都支持判空...

python 编程之twisted详解及简单实例

python 编程之twisted详解 前言:  我不擅长写socket代码。一是用c写起来比较麻烦,二是自己平时也没有这方面的需求。等到自己真正想了解的时候,才发现自己在这方...

python运用pygame库实现双人弹球小游戏

python运用pygame库实现双人弹球小游戏

使用python pygame库实现一个双人弹球小游戏,两人分别控制一个左右移动的挡板用来拦截小球,小球会在两板间不停弹跳,拦截失败的一方输掉游戏,规则类似于简化版的乒乓球。 因为是第一...