Python服务器编程:使用django-celery进行任务队列
随着Web应用程序的日益流行和用户数量的增加,现代Web应用程序需要通过处理复杂且时间敏感的任务来保持生产力和稳定性。从电子商务网站上的订单处理和系统日志文件的处理到计算机视觉和自然语言处理的高级应用,这些任务需要独立的进程来处理。
常规做法是使用 cron 或者类似的作业调度器,但存在以下问题:
- 难以动态管理和分配任务。
- 难以重试失败的任务。
- 无法轻松地将任务分发到多个服务器。
- 无法跟踪和监控作业和任务的状态。
因此,为了解决这些问题,我们需要一种任务队列服务。
在Python生态系统中,Celery是最常用的任务队列。它是一种面向分布式系统设计的任务队列,适用于高并发,高吞吐量的Web应用程序。
在本文中,我们将介绍如何使用Celery和Django开发任务队列服务。我们将使用Django-Celery作为Celery的Django集成。
- 安装相关依赖
首先,我们需要将Celery和Django-Celery的依赖项安装到项目中。您可以使用pip工具来安装它们。
pip install celery django-celery
- 配置Celery
在我们开始使用Celery之前,我们需要配置Celery。为此,请创建一个名为celery.py的文件,该文件应该位于您的项目的根目录中。该文件的内容如下:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'example.settings')
app = Celery('example')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
注意:如果您希望使用在settings.py文件中指定的配置来配置Celery,请将'example.settings'替换为实际的Django项目名称。
- 配置Django
现在,我们需要在settings.py文件中配置Django,以便它支持Celery。
# Celery Configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# app注册
INSTALLED_APPS = (
...
'django_celery_results',
'django_celery_beat',
...
)
在这里,我们配置了两个关键设置。(1)CELERY_BROKER_URL – 这告诉Celery使用Redis作为其中间件服务。(2)INSTALLED_APPS – 我们需要在我们的应用程序中注册Django-Celery的两个应用程序。
- 创建一个任务
现在我们已经配置好了Celery和Django,我们可以开始定义一些任务了。我们将创建一个示例任务来演示任务的结构和语法。在app/tasks.py文件中,添加以下内容。
from django.core.mail import send_mail
from celery import shared_task
from datetime import datetime
@shared_task
def send_email_task():
subject = 'Celery Email Demo'
message = 'This email is sent using celery!'
from_email = 'demo@example.com'
recipient_list = ['recipient@example.com']
send_mail(subject, message, from_email, recipient_list)
print('Email Sent Successfully')
return None
@shared_task
def print_time():
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
return None
在这里,我们定义了两个任务。它们分别是send_email_task和print_time任务。注意这一点,我们将任务修饰为shared_task装饰器。这使得我们的任务可以在任何地方访问,使它们可以被多个进程调用。
- 启动Worker进程
现在,我们已经定义了任务,我们需要启动worker进程并告诉它们执行哪些任务。
打开一个终端窗口,并输入以下命令:
$ celery -A example worker --loglevel=info
注意,example代表Django项目的名称。在这里,我们使用--loglevel = info来控制worker的日志级别。
- 通过Django管理界面调度任务
Django-Celery支持在Django Admin界面中管理和调度任务。我们需要注册Django-Celery的两个应用程序。我们可以在admin.py文件中添加以下代码。
from django.contrib import admin
from django_celery_beat.admin import PeriodicTaskAdmin, IntervalScheduleAdmin
from django_celery_results.models import TaskResult
from django_celery_results.admin import TaskResultAdmin
from core.tasks import send_email_task, print_time
class Tasks(admin.ModelAdmin):
actions = ['send_email_task', 'print_time']
def send_email_task(self, request, queryset):
send_email_task.delay()
send_email_task.short_description = "Send Email Task"
def print_time(self, request, queryset):
print_time.delay()
print_time.short_des
.........................................................