python异步实现定时任务和周期任务的方法

yipeiwu_com5年前Python基础

一. 如何调用

def f1(arg1, arg2):
  print('f1', arg1, arg2)
 
 
def f2(arg1):
  print('f2', arg1)
 
 
def f3():
  print('f3')
 
 
def f4():
  print('周期任务', int(time.time()))
 
 
timer = TaskTimer()
# 把任务加入任务队列
timer.join_task(f1, [1, 2], timing=15.5) # 每天15:30执行
timer.join_task(f2, [3], timing=14) # 每天14:00执行
timer.join_task(f3, [], timing=15) # 每天15:00执行
timer.join_task(f4, [], interval=10) # 每10秒执行1次
# 开始执行(此时才会创建线程)
timer.start()

f1~f4是我们需要定时执行的函数。

首先创建TaskTimer对象(TaskTimer的代码在下面)。调用join_task函数,把需要执行的函数加入到任务队列。最后调用start,任务就开始执行了。

join_task参数:

fun:需要执行的函数

arg:fun的参数,如果没有就传一个空列表

interval:如果有此参数,说明任务是周期任务,单位为秒(注意interval最少5秒)

timing:如果有此参数,说明任务是定时任务,单位为时

注意:interval和timing只能选填1个

二. 源码

import datetime
import time
from threading import Thread
from time import sleep
 
 
class TaskTimer:
  __instance = None
 
  def __new__(cls, *args, **kwargs):
    """
    单例模式
    """
    if not cls.__instance:
      cls.__instance = object.__new__(cls)
    return cls.__instance
 
  def __init__(self):
    if not hasattr(self, 'task_queue'):
      setattr(self, 'task_queue', [])
 
    if not hasattr(self, 'is_running'):
      setattr(self, 'is_running', False)
 
  def write_log(self, level, msg):
    cur_time = datetime.datetime.now()
    with open('./task.log', mode='a+', encoding='utf8') as file:
      s = "[" + str(cur_time) + "][" + level + "]  " + msg
      print(s)
      file.write(s + "\n")
 
  def work(self):
 
    """
    处理任务队列
    """
    while True:
      for task in self.task_queue:
        if task['interval']:
          self.cycle_task(task)
        elif task['timing']:
          self.timing_task(task)
 
      sleep(5)
 
  def cycle_task(self, task):
    """
    周期任务
    """
    if task['next_sec'] <= int(time.time()):
      try:
        task['fun'](*task['arg'])
        self.write_log("正常", "周期任务:" + task['fun'].__name__ + " 已执行")
      except Exception as e:
        self.write_log("异常", "周期任务:" + task['fun'].__name__ + " 函数内部异常:" + str(e))
      finally:
        task['next_sec'] = int(time.time()) + task['interval']
 
  def timing_task(self, task):
    """
    定时任务
    """
    # 今天已过秒数
    today_sec = self.get_today_until_now()
 
    # 到了第二天,就重置任务状态
    if task['today'] != self.get_today():
      task['today'] = self.get_today()
      task['today_done'] = False
 
    # 第一次执行
    if task['first_work']:
      if today_sec >= task['task_sec']:
        task['today_done'] = True
        task['first_work'] = False
      else:
        task['first_work'] = False
 
    # 今天还没有执行
    if not task['today_done']:
      if today_sec >= task['task_sec']: # 到点了,开始执行任务
        try:
          task['fun'](*task['arg'])
          self.write_log("正常", "定时任务:" + task['fun'].__name__ + " 已执行")
        except Exception as e:
          self.write_log("异常", "定时任务:" + task['fun'].__name__ + " 函数内部异常:" + str(e))
        finally:
          task['today_done'] = True
          if task['first_work']:
            task['first_work'] = False
 
  def get_today_until_now(self):
    """
    获取今天凌晨到现在的秒数
    """
    i = datetime.datetime.now()
    return i.hour * 3600 + i.minute * 60 + i.second
 
  def get_today(self):
    """
    获取今天的日期
    """
    i = datetime.datetime.now()
    return i.day
 
  def join_task(self, fun, arg, interval=None, timing=None):
    """
    interval和timing只能存在1个
    :param fun: 你要调用的任务
    :param arg: fun的参数
    :param interval: 周期任务,单位秒
    :param timing: 定时任务,取值:[0,24)
    """
    # 参数校验
    if (interval != None and timing != None) or (interval == None and timing == None):
      raise Exception('interval和timing只能选填1个')
 
    if timing and not 0 <= timing < 24:
      raise Exception('timing的取值范围为[0,24)')
 
    if interval and interval < 5:
      raise Exception('interval最少为5')
 
    # 封装一个task
    task = {
      'fun': fun,
      'arg': arg,
      'interval': interval,
      'timing': timing,
    }
    # 封装周期或定时任务相应的参数
    if timing:
      task['task_sec'] = timing * 3600
      task['today_done'] = False
      task['first_work'] = True
      task['today'] = self.get_today()
    elif interval:
      task['next_sec'] = int(time.time()) + interval
 
    # 把task加入任务队列
    self.task_queue.append(task)
 
    self.write_log("正常", "新增任务:" + fun.__name__)
 
  def start(self):
    """
    开始执行任务
    返回线程标识符
    """
    if not self.is_running:
      thread = Thread(target=self.work)
 
      thread.start()
 
      self.is_running = True
 
      self.write_log("正常", "TaskTimer已开始运行!")
 
      return thread.ident
 
    self.write_log("警告", "TaskTimer已运行,请勿重复启动!")

以上这篇python异步实现定时任务和周期任务的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

Python使用scrapy采集时伪装成HTTP/1.1的方法

本文实例讲述了Python使用scrapy采集时伪装成HTTP/1.1的方法。分享给大家供大家参考。具体如下: 添加下面的代码到 settings.py 文件 复制代码 代码如下:DOW...

利用python如何处理nc数据详解

前言 这两天帮一个朋友处理了些 nc 数据,本以为很简单的事情,没想到里面涉及到了很多的细节和坑,无论是“知难行易”还是“知易行难”都不能充分的说明问题,还是“知行合一”来的更靠谱些,...

Python使用flask框架操作sqlite3的两种方式

本文实例讲述了Python使用flask框架操作sqlite3的两种方式。分享给大家供大家参考,具体如下: 方式一:raw_sql import sqlite3 from flask...

人工神经网络算法知识点总结

人工神经网络算法知识点总结

人工神经网络的许多算法已在智能信息处理系统中获得广泛采用,尤为突出是是以下4种算法:ART网络、LVQ网络、Kohonen网络Hopfield网络,下面就具体介绍一下这这四种算法: 1....

在Python中利用pickle保存变量的实例

在Python中利用pickle保存变量的实例

在工作中出于某些原因,我们可能需要将变量保存下来,这样下次就可以直接去赋值而不用重新执行某些重复耗时的操作了,这里我们用到了Python的pickle包来做变量的存储和变量加载,大家注意...