django+celery+redis实现异步周期任务 · 梁苏莉的博客

 注:python的celery模块 4.2.0版本, 刚开始安装的未4.1.1版本,但是定时任务居然不执行  

  1、在settings.py中配置celery


复制代码
#1、如果在django中需要周期性执行,在这里需要注册 django_celery_beat 中间件
INSTALLED_APPS = [
    '''
    'django_celery_beat',
    '''
]


TIME_ZONE = 'Asia/Shanghai'  # 将默认的UTC时区给成中国时区


#2、celery:配置celery
BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC=False
CELERY_ANNOTATIONS = {'*': {'rate_limit': '500/s'}}
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
复制代码
  2、在与项目同名的目录下创建celery.py 

    更多定时参考官网:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules 


复制代码
# -*- coding: utf-8 -*-

from __future__ import absolute_import
import os
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
from kombu import Queue


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings')

from django.conf import settings

app = Celery('celery_test')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
class Config:
    BROKER_URL = 'redis://1.1.1.3:6379'
    CELERY_RESULT_BACKEND = 'redis://1.1.1.3:6379'
    CELERY_ACCEPT_CONTENT = ['application/json']
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_TIMEZONE = 'Asia/Shanghai'
    ENABLE_UTC = False
    CELERY_TASK_RESULT_EXPIRES = 60 * 60
    CELERY_ANNOTATIONS = {'*': {'rate_limit': '500/s'}}
    # 每次取任务的数量
    # CELERYD_PREFETCH_MULTIPLIER = 10
    # 每个worker执行多少次任务之后就销毁,防止内存泄漏。相当于--maxtasksperchild参数
    CELERYD_MAX_TASKS_PER_CHILD = 16
    # 防止死锁
    # CELERYD_FORCE_EXECV = True
    # 任务发出后,经过一段时间还未收到acknowledge , 就将任务重新交给其他worker执行
    # CELERY_DISABLE_RATE_LIMITS = True
    # CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'


app.config_from_object(Config)
app.autodiscover_tasks()

#crontab config
app.conf.update(
    CELERYBEAT_SCHEDULE = {
        # 每隔三分钟执行一次add函数
        'every-3-min-add': {
            'task': 'app01.tasks.add',
            'schedule': timedelta(seconds=180)
        },
        # 每天下午15:420执行
        '[email protected]:50': {
            'task': 'app01.tasks.minus',
            'schedule': crontab(hour=15, minute=20, day_of_week='*/1'),
        },
    },
)

Queue('transient', routing_key='transient',delivery_mode=1)
复制代码
  3、在任意app下创建tasks.py (django会自动到各app中找到此tasks文件)


复制代码
# -*- coding:utf8 -*-
from __future__ import absolute_import, unicode_literals
from celery import shared_task

# 这里不再使用@app.task,而是用@shared_task,是指定可以在其他APP中也可以调用这个任务
@shared_task
def add():
   print 'app01.tasks.add'
   return 222 + 333

@shared_task
def minus():
   print 'app01.tasks.minus'
   return 222 - 333
复制代码
  4、在与项目同名的目录下的 __init__.py 文件中添加下面内容


复制代码
# -*- coding:utf8 -*-
from __future__ import absolute_import, unicode_literals

# 告诉Django在启动时别忘了检测我的celery文件
from .celery import app as celery_ap
__all__ = ['celery_app']
复制代码
  5、启动脚本(记得开启celery服务)

    1、启动django程序

         python manage.py runserver 0.0.0.0:8000


复制代码
#!/usr/bin/env bash

source ../env/bin/activate

export DJANGO_SETTINGS_MODULE=celery_test.settings

base_dir=`pwd`
mup_pid() {
echo `ps -ef | grep -E "(manage.py)(.*):8000" | grep -v grep| awk '{print $2}'`
}
start() {
 python $base_dir/manage.py runserver 0.0.0.0:8000 &>> $base_dir/django.log 2>&1 &
 pid=$(mup_pid)
 echo -e "e[00;31mmup is running (pid: $pid)e[00m"
}

stop() {
 pid=$(mup_pid)
 echo -e "e[00;31mmup is stop (pid: $pid)e[00m"
 ps -ef | grep -E "(manage.py)(.*):8000" | grep -v grep| awk '{print $2}' | xargs kill -9 &> /dev/null

}

restart(){
    stop
    start
}

# See how we were called.
case "$1" in
  start)
        start
        ;;
  stop)
        stop
        ;;

  restart)
        restart
        ;;

  *)
        echo $"Usage: $0 {start|stop|restart}"
        exit 2
esac
复制代码
    2、启动celery的worker:每台机器可以启动8个worker

         celery -A celery_test worker -l info


复制代码
#!/bin/bash
source ../env/bin/activate
export C_FORCE_ROOT="true"
base_dir=`pwd`


celery_pid() {
    echo `ps -ef | grep -E "celery -A celery_test worker" | grep -v grep| awk '{print $2}'`
}
start() {
    celery  multi start celery_test -A celery_test -l debug --autoscale=50,5 --logfile=$base_dir/var/celery-%I.log --pidfile=celery_test.pid
}
restart() {
    celery  multi restart celery_test -A celery_test -l debug
}
stop() {
    celery  multi stop celery_test -A celery_test -l debug
}
#restart(){
#    stop
#    start
#}


# See how we were called.
case "$1" in
  start)
        start
        ;;
  restart)
        restart
        ;;
  stop)
        stop
        ;;
  *)
        echo $"Usage: $0 {start|stop|restart}"
        exit 2
esac

#nohup celery -A celery_test worker -l debug --concurrency=10 --autoreload  & >>celery.log
复制代码
    3、启动celery 定时任务运行

        celery -A celery_test beat -l debug


复制代码
#!/bin/bash
#celery 定时任务运行
source ../env/bin/activate
export C_FORCE_ROOT="true"
base_dir=`pwd`


celery_pid() {
    echo `ps -ef | grep -E "celery -A celery_test beat" | grep -v grep| awk '{print $2}'`
}
start() {
    #django 调度定时任务
    #celery -A celery_test beat -l info -S django >> $base_dir/var/celery-cron.log 2>&1 &
    celery -A celery_test beat -l debug >> $base_dir/var/Scheduler.log 2>&1 &
    sleep 3
    pid=$(celery_pid)
    echo -e "e[00;31mcelery is start (pid: $pid)e[00m"
}
restart() {
    pid=$(celery_pid)
    echo -e "e[00;31mcelery is restart (pid: $pid)e[00m"
    ps auxf | grep -E "celery -A celery_test beat" | grep -v grep| awk '{print $2}' | xargs kill -HUP &> /dev/null
}
stop() {
    pid=$(celery_pid)
    echo -e "e[00;31mcelery is stop (pid: $pid)e[00m"
    ps -ef | grep -E "celery -A celery_test beat" | grep -v grep| awk '{print $2}' | xargs kill -TERM &> /dev/null
}


case "$1" in
  start)
        start
        ;;
  restart)
        restart
        ;;
  stop)
        stop
        ;;
  *)
        echo $"Usage: $0 {start|stop|restart}"
        exit 2
esac
复制代码
    4、windows下编写的脚本文件,放到Linux中无法识别格式

        在Linux中执行.sh脚本,异常/bin/sh^M: bad interpreter: No such file or directory

        set ff=unix

 

        dos2unix start-celery.sh 
        dos2unix celery-crond.sh

 

  6、常见报错 

    1、Received unregistered task of type ‘XXX’ Celery报错(定时任务中无法找到对应tasks.py文件)

        app = Celery('opwf', include=['api_workflow.tasks'])       # api_workflow这个app中的tasks文件