python Celery定时任务的示例

yipeiwu_com6年前Python基础

本文介绍了python Celery定时任务的示例,分享给大家,具体如下:

配置

启用Celery的定时任务需要设置CELERYBEAT_SCHEDULE 。


Celery的定时任务都由celery beat来进行调度。celery beat默认按照settings.py之中的时区时间来调度定时任务。

创建定时任务

一种创建定时任务的方式是配置CELERYBEAT_SCHEDULE:

#每30秒调用task.add
from datetime import timedelta

CELERYBEAT_SCHEDULE = {
  'add-every-30-seconds': {
    'task': 'tasks.add',
    'schedule': timedelta(seconds=30),
    'args': (16, 16)
  },
}
#crontab任务
#每周一7:30调用task.add
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
  # Executes every Monday morning at 7:30 A.M
  'add-every-monday-morning': {
    'task': 'tasks.add',
    'schedule': crontab(hour=7, minute=30, day_of_week=1),
    'args': (16, 16),
  },
}

使用数据库存储定时任务

使用数据库存储定时任务需要设置CELERYBEAT_SCHEDULE如下:

import datetime
import json
from djcelery import models as celery_models
from django.utils import timezone
#创建任务
def create_task(name, task, task_args, crontab_time):
  '''
  name # 任务名字
  task # 执行的任务 "myapp.tasks.add"
  task_args # 任务参数 {"x":1, "Y":1}

  crontab_time # 定时任务时间 格式:
  {
    'month_of_year': 9 # 月份
    'day_of_month': 5 # 日期
    'hour': 01 # 小时
    'minute':05 # 分钟
  }
  '''

  # task任务, created是否定时创建
  task, created = celery_models.PeriodicTask.objects.
              get_or_create(name=name,task=task)
  # 获取 crontab
  crontab = celery_models.CrontabSchedule.objects.
              filter(**crontab_time).first()
  if crontab is None:
  # 如果没有就创建,有的话就继续复用之前的crontab
    crontab = celery_models.CrontabSchedule.objects.
              create(**crontab_time)
  task.crontab = crontab # 设置crontab
  task.enabled = True # 开启task
  task.kwargs = json.dumps(task_args) # 传入task参数
  expiration = timezone.now() + datetime.timedelta(day=1)
  task.expires = expiration # 设置任务过期时间为现在时间的一天以后
  task.save()
  return True 

#关闭任务
def disable_task(name):
'''
关闭任务
'''
  try:
    task = celery_models.PeriodicTask.objects.get(name=name)
    task.enabled = False # 设置关闭
    task.save()
    return True
  except celery_models.PeriodicTask.DoesNotExist:
    return True

启动beat

执行定时任务时, Celery会通过celery beat进程来完成。Celery beat会保持运行, 一旦到了某一定时任务需要执行时, Celery beat便将其加入到queue中. 不像worker进程, Celery beat只需要一个即可。而且为了避免有重复的任务被发送出去,所以Celery beat仅能有一个。

启动:

python manage.py celery beat --loglevel=info

其实还有一种简单的启动方式worker和beat一起启动:

python manage.py celery worker --loglevel=info --beat

定时删除

由于很多任务都是一次执行完就不需要,留在数据库里就是垃圾数据了有没有办法清除。方法肯定有因为django-celery本身就有定时任务功能我们加个任务就解决了。好我们看代码:在django app目录中打开taske.py加入如下代码

from djcelery import models as celery_models
from django.utils import timezone
@task()
def delete():
  '''
  删除任务
  从models中过滤出过期时间小于现在的时间然后删除
  '''
  return celery_models.PeriodicTask.objects.filter(
              expires__lt=timezone.now()).delete()

创建任务脚本里设置了 expires 1天以后过期,这样在filter的时候就能当做条件把过期的任务找到并且删除。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

Python Threading 线程/互斥锁/死锁/GIL锁

导入线程包 import threading 准备函数线程,传参数 t1 = threading.Thread(target=func,args=(args,)) 类继承线程,创建线程对...

python中property属性的介绍及其应用详解

Python的property属性的功能是:property属性内部进行一系列的逻辑计算,最终将计算结果返回。 使用property修饰的实例方法被调用时,可以把它当做实例属性一样 pr...

python绘制评估优化算法性能的测试函数

python绘制评估优化算法性能的测试函数

测试函数主要是用来评估优化算法特性的,这里我用python3绘制了部分测试函数的图像。具体的测试函数可以结合维基百科来了解。想要显示某个测试函数的图片把代码结尾对应的注释去掉即可,具体代...

github配置使用指南

1.建立项目,进入项目文件夹 2.初始化ssh key 参见官网指南.(本机生成一对key,public key传到官网sshkey下面) https://help.github.com...

6行Python代码实现进度条效果(Progress、tqdm、alive-progress​​​​​​​和PySimpleGUI库)

6行Python代码实现进度条效果(Progress、tqdm、alive-progress​​​​​​​和PySimpleGUI库)

在项目开发过程中加载、启动、下载项目难免会用到进度条,如何使用Python实现进度条呢? 这里为小伙伴们分享四种Python实现进度条的库:Progress库、tqdm库、alive-p...