重庆注册公司代办机构北京seo方法
参考文档:Win10下Celery4.2.1基于redis的部署与错误
1.安装
1.1window 安装 Redis
由于 Redis 并没有官方支持 Windows,
下载方法1 https://github.com/MSOpenTech/redis 中下载 Redis 包。
下载方法2(我用的这个,我下载的msi可安装的): https://github.com/MicrosoftArchive/redis/releases
在cmd命令行窗口中输入 redis-server redis.windows.conf 启动redis
redis可视化工具安装: https://redisdesktop.com/download
1.2.安装好python后安装 Celery 相关库
# pip intall celery
# pip install celery-with-redis
2.配置\启动worker
2.1配置文件
首先是clelry的运行配置文件celeryconfig.py(当然你也可以从命令行传入)
#celeryconfig.py
BROKER_URL = 'redis://localhost:6379/0' #redis的配置,如果是rabbitmq就不是这个配置
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'CELERY_REDIS_MAX_CONNECTIONS = 4
CELERYD_CONCURRENCY = 4
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 5} # 5minCELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json'] # Ignore other content
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
2.2 创建celery 任务列表,注册1个任务
创建一个Celery Application用来定义任务列表。实例化一个Celery对象app,然后通过@app.task 装饰器注册一个 task。
app.task函数来produce 任务到rabbitmq或者在redis 或者数据库。
#celery_worker.py
from celery import Celeryapp = Celery('celery_work')
app.config_from_object('celeryconfig')@app.task
def add(x, y):return x + y
2.3运行 worker,启动Celery Worker服务,来开始监听并执行任务
1)在celery_worker.py文件里面启动一个叫做app的Celery Application,编写一个app.task函数来produce 任务到rabbitmq或者在redis 或者数据库。(指的是前面的代码)
2)在每个worker里面通过命令启动worker消费任务(指的是celery worker的命令)
在 celery_worker.py 文件所在目录运行
$ celery worker -A celery_worker.app -l INFO
或者
celery -A celery_worker worker --loglevel=info -P solo
或者
celery -A celery_server worker --loglevel=info
#这种方式默认是多进程启动worker。celery -A celery_server worker --loglevel=info -P solo
#这种方式默认是单进程启动worker。#使用协程的方式启动。当然首先你需要安装eventlet。(pip install eventlet)
celery -A celery_server worker --loglevel=info -P eventlet
这个命令会开启一个在前台运行的 worker,解释这个命令的意义:
worker: 运行 worker 模块。
-A: –app=APP, 指定使用的 Celery 实例。
-l: –loglevel=INFO, 指定日志级别,可选:DEBUG, INFO, WARNING, ERROR, CRITICAL, FATAL
其它常用的选项:
-P: –pool=prefork, 并发模型,可选:prefork (默认,multiprocessing), eventlet, gevent, threads.
-c: –concurrency=10, 并发级别,prefork 模型下就是子进程数量,默认等于 CPU 核心数
完整的命令行选项可以这样查看:
$ celery worker --help
2.4 为什么python3.7版本启动worker报错
celery -A celery_worker worker --loglevel=info -P solo 执行报错如下(截取最后几行):
$ celery worker -A tasks.app -l INFO
from celery.utils import timer2File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/celery/utils/timer2.py", line 19from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger^
SyntaxError: invalid syntax
https://stackoverflow.com/questions/53466160/unable-to-start-celery-worker-instance-syntax-error
原因是版本冲突。要降低版本
参考文档也有说这个
2.5 降低版本到python3.5启动worker服务成功
启动worker服务了,此后该worker会一直等待任务并执行。具体启动有3种,我用的是第2种
celery -A celery_server worker --loglevel=info
#这种方式默认是多进程启动worker。celery -A celery_server worker --loglevel=info -P solo
#这种方式默认是单进程启动worker。#使用协程的方式启动。当然首先你需要安装eventlet。(pip install eventlet)
celery -A celery_server worker --loglevel=info -P eventlet
2.6 启动work服务成功控制台信息
celery -A celery_worker worker --loglevel=info -P solo
D:\Python\Python35\Scripts\celery-test>celery -A celery_worker worker --loglevel=info -P solo-------------- celery@DESKTOP-8MJ35H4 v3.1.26.post2 (Cipater)
---- **** -----
--- * *** * -- Windows-10-10.0.17763-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: celery_work:0x20008c91438
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ----
--- ***** ----- [queues]-------------- .> celery exchange=celery(direct) key=celery[tasks]. celery_worker.add[2019-02-20 20:17:16,340: INFO/MainProcess] Connected to redis://localhost:6379/0
[2019-02-20 20:17:19,399: INFO/MainProcess] mingle: searching for neighbors
[2019-02-20 20:17:21,423: INFO/MainProcess] mingle: all alone
[2019-02-20 20:17:27,489: WARNING/MainProcess] celery@DESKTOP-8MJ35H4 ready.
3.调用task 代码
我们创建一个简单的 Python 程序,来触发 add这个任务。
具体代码如下:
#task_test.py
from celery_worker import add
result = add.delay(4, 4)result.ready()#获取任务执行状态
result.get(timeout=1, propagate=False) # 获取任务执行结果
result.traceback # 获取任务执行异常时的堆栈信息
add.delay(1,2)
或者
add.apply_async(args=(1,2))
上面两种调用方式等价,delay() 方法是 apply_async() 方法的简写。这个调用会把 add 操作放入到队列里,然后立即返回一个 AsyncResult 对象。如果关心处理结果,需要给 app 配置 CELERY_RESULT_BACKEND,指定一个存储后端保存任务的返回值。
redis 3.2.0时,任务调用run报错
控制台的error信息(截取最后几行),百度的结构,说是redis版本太高,我报错时候的版本是:redis 3.2.0
pip install redis==2.10.6
File "d:\python\python35\lib\site-packages\kombu\transport\redis.py", line 146, in appendpipe.zadd(self.unacked_index_key, delivery_tag, time()) \File "d:\python\python35\lib\site-packages\redis\client.py", line 2320, in zaddfor pair in iteritems(mapping):File "d:\python\python35\lib\site-packages\redis\_compat.py", line 109, in iteritemsreturn iter(x.items())
AttributeError: 'str' object has no attribute 'items'
降低了redis版本=2.10.6后,任务调用代码执行成功
pycharm执行结果截图
(注意:前面redis版本问题抛的error,pycharm 看不出来,要去dos里面的控制台看error信息)
项目实战参考这篇
任务队列在 Web 服务里的应用
在 Web2.0 后的时代,社交网站、搜索引擎的的迅猛发展对 Web 服务的后台管理系统提出了更高的需求。考虑几个常见的使用场景:
- 社交网站的用户在其主页发布了一组新的照片,这条新鲜事需要适时地推送至该用户的所有好友。该网站的活跃用户有千万级别,在同一时刻会有非常多的“新鲜事推送”任务需要处理,并且每个用户的好友数会达到 1000+的级别。出于用户体验的考虑,用户发布照片的这个操作需要在较短时间内得到反馈。
- 在文献搜索系统的主页,用户可以查到当前一小时内最热门的十大文献,并且能够直接访问该文献。该文献管理系统所管理的文献数量非常多,达到 PB 的级别。处于用户体验的考虑,用户获得十大热门文献这个动作需要在较短时间内获得反馈。
考虑对于高并发大用户量的 Web 服务系统,对于场景一和场景二中的需求,如果在请求处理周期内完成这些任务,然后再返回结果,这种传统的做法会导致用户等待的时间过长。同时 Web 服务管理后台对任务处理能力也缺乏扩展性。
在这种场景下,任务队列是有效的解决方案。在一个任务队列系统中,“将新鲜事推送至用户 A 的所有好友”或者“查询当前最热门的十大文献”这种查询或者计算工作可以被当成一个“任务”。在任务队列系统中,一般有任务生产者、任务处理中间方以及任务消费者三方。其中任务生产者负责生产任务,比如“将新鲜事推送至用户 A 的所有好友”这一任务的发起方就可以称作任务生产者。任务处理中间方负责接收任务生产者的任务处理请求,对任务进行调度,最后将任务分发给任务消费者来进行处理。任务消费者就是执行任务的一方,它负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。在生产方、消费者和任务处理中间方之间一般使用消息传递的方式来进行通信。
在任务队列系统框架中,任务消费者可以跨越不同的服务节点,可以动态地增加节点来增加系统的任务处理能力,非常适合高并发、需要横向扩展的 Web 服务后台。
修改配置文件 ,加上配置, task每10秒执行1次
Celery 的调度器的配置是在 CELERYBEAT_SCHEDULE 这个全局变量上配置的,我们可以将配置写在一个独立的 Python 模块,在定义 Celery 对象的时候加载这个模块。我们将add这个任务定义为每 100 秒执行一次。
from datetime import timedeltaCELERYBEAT_SCHEDULE = {'select_populate_book': {'task': 'celery_worker.add','schedule': timedelta(seconds=100),},
}
启动 Celery beat
启动 Celery beat 调度器,Celery beat 会周期性地执行在 CELERYBEAT_SCHEDULE 中定义的任务,即周期性地查询当前一小时最热门的书籍。