celery筆記八之數(shù)據(jù)庫操作定時任務
本文首發(fā)于公眾號:Hunter后端原文鏈接:celery筆記八之數(shù)據(jù)庫操作定時任務
前面我們介紹定時任務是在 celery.py 中的 app.conf.beat_schedule
定義,這一篇筆記我們介紹一下如何在 Django 系統(tǒng)中的表里來操作這些任務。
我們先通過 app.conf.beat_schedule 定義定時任務:
(資料圖片僅供參考)
app.conf.beat_schedule = { "add-every-60-seconds": { "task": "blog.tasks.add", "schedule": 60, "args": (16, 16), }, "schedule_minus": { "task": "blog.tasks.minus", "schedule": crontab(minute=5, hour=2), "args": (12, 24), },}
如果我們就這樣啟動 Django 系統(tǒng),worker 和 beat 服務,系統(tǒng)的定時任務就只有一個,寫死在系統(tǒng)里。
當然,我們也可以使用一些 celery 的函數(shù)來手動向系統(tǒng)里添加定時任務,但是我們有一個更好的方法來管理操作這些定時任務,那就是將這些定時任務寫入到數(shù)據(jù)庫中,來進行增刪改查操作,定制開發(fā)。
將定時任務寫入數(shù)據(jù)庫,我們需要進行以下幾步操作:
安裝依賴INSTALLED_APP添加模塊執(zhí)行migrate安裝依賴
通過 pip 安裝一個 django-celery-beat 依賴:
pip3 install django-celery-beat
INSTALLED_APP添加模塊
安裝后,要正常使用還需要將其添加到 settings.py 的 INSTALLED_APPS 中:
# settings.pyINSTALLED_APPS = [ ..., "django_celery_beat",]
執(zhí)行migrate
接下來我們執(zhí)行 migrate 操作將需要創(chuàng)建的表寫入數(shù)據(jù)庫:
python3 manage.py migrate
可以看到如下輸出:
Running migrations: Applying django_celery_beat.0001_initial... OK Applying django_celery_beat.0002_auto_20161118_0346... OK Applying django_celery_beat.0003_auto_20161209_0049... OK Applying django_celery_beat.0004_auto_20170221_0000... OK Applying django_celery_beat.0005_add_solarschedule_events_choices... OK Applying django_celery_beat.0006_auto_20180322_0932... OK Applying django_celery_beat.0007_auto_20180521_0826... OK Applying django_celery_beat.0008_auto_20180914_1922... OK Applying django_celery_beat.0006_auto_20180210_1226... OK Applying django_celery_beat.0006_periodictask_priority... OK Applying django_celery_beat.0009_periodictask_headers... OK Applying django_celery_beat.0010_auto_20190429_0326... OK Applying django_celery_beat.0011_auto_20190508_0153... OK Applying django_celery_beat.0012_periodictask_expire_seconds... OK Applying django_celery_beat.0013_auto_20200609_0727... OK Applying django_celery_beat.0014_remove_clockedschedule_enabled... OK Applying django_celery_beat.0015_edit_solarschedule_events_choices... OK
然后可以看到在 Django 系統(tǒng)對應的數(shù)據(jù)庫里新增了幾張表,表的介紹及使用我們在后面再介紹。
2、beat 的啟動在啟動 beat 前,我們需要對時區(qū)進行設置,前面我們介紹過在 Django 和 celery 中都需要設置成北京時間:
TIME_ZONE = "Asia/Shanghai"USE_TZ = False# celery 時區(qū)設置 CELERY_TIMEZONE = "Asia/Shanghai"CELERY_ENABLE_UTC = FalseDJANGO_CELERY_BEAT_TZ_AWARE = False
啟動 beat 我們需要添加參數(shù)將數(shù)據(jù)指定存儲在數(shù)據(jù)庫中,可以在啟動 beat 的時候添加參數(shù):
celery -A hunter beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
也可以通過 app.conf.beat_scheduler 指定值:
app.conf.beat_scheduler = "django_celery_beat.schedulers:DatabaseScheduler"
然后直接啟動 beat:
celery -A hunter beat -l INFO
3、表介紹在執(zhí)行完 migrate 之后系統(tǒng)會多出幾張表,都是定時任務相關的表:
django_celery_beat_clockedscheduledjango_celery_beat_crontabscheduledjango_celery_beat_intervalscheduledjango_celery_beat_solarscheduledjango_celery_beat_periodictaskdjango_celery_beat_periodictasks其中 django_celery_beat_clockedschedule 和 django_celery_beat_solarschedule 暫時不介紹
django_celery_beat_crontabschedule
是我們的周期任務表,比如我們上面定義的:
"schedule_minus": { "task": "blog.tasks.minus", "schedule": crontab(minute=5, hour=2), "args": (12, 24), },
執(zhí)行 celery 的 beat 后,會在該表新增一條數(shù)據(jù),表的字段就是我們設置的 crontab() 里的值,包括 minute,hour,day_of_week,day_off_month,month_of_year 和 timezone。
除了 timezone 字段,前面的字段如何定義和使用上一篇筆記中已經(jīng)詳細介紹過,timezone 字段則是我們在 settings.py 里定義的時區(qū)信息。
django_celery_beat_intervalschedule
這張表的數(shù)據(jù)是我們定義的間隔時間任務的表,比如每隔多少秒,多少分鐘執(zhí)行一次。
該表只有 id, every 和 period 字段,every 表示的是時間的間隔,填寫的數(shù)字,period 則是單位,可選項有:
microseconds:毫秒seconds:秒minutes:分鐘hours:小時days:天我們在定義間隔任務的時候,除了直接使用數(shù)字表示秒之外,還可以使用 datetime.timedelta() 來定義其他時間,比如:
from datetime import timedeltaapp.conf.beat_schedule = { "add-every-60-seconds": { "task": "blog.tasks.add", "schedule": timedelta(minutes=5), "args": (16, 16), },}
但是當我們啟動 beat 的時候,系統(tǒng)在寫入數(shù)據(jù)庫的時候還是會自動為我們將其轉化為秒數(shù),比如 minutes=5,會給我們加入的數(shù)據(jù)是:
every=300, period="seconds"
django_celery_beat_periodictask
這張表其實是對前面幾張表的任務的一個匯總,
crontab_id,interval_id 等外鍵字段來判斷是屬于哪張表的定時/周期任務last_run_at 上次運行時間total_run_count 總運行次數(shù)name 表示任務名稱task 字段表示任務來源等還有參數(shù),隊列等信息。
每一條在 django_celery_beat_crontabschedule 和 django_celery_beat_intervalschedule 表中的數(shù)據(jù)都必須在該表中有一個匯總的信息記錄才可以正常運行。
也就是說在前面的兩張表中可以添加各種任務執(zhí)行的策略,然后在 django_celery_beat_periodictask 中有一個數(shù)據(jù)指向該策略,就可以使用該策略進行周期任務的執(zhí)行。
其中,name 字段上是有唯一鍵的,但是 task 可以重復寫入,這也就意味著我們可以針對同一個 task 制定不同的定時策略。
django_celery_beat_periodictasks
這個表就一條數(shù)據(jù),保存的是系統(tǒng)上一次執(zhí)行任務的時間。
4、手動操作定時任務接下來我們自己定義兩個周期任務,一個是 blog.tasks.add 函數(shù),每隔20s運行一次,另一個是 blog.tasks.minus 函數(shù),每天晚上 23點15分執(zhí)行一次。
我們首先還是運行 beat 和 worke,然后在 python3 manage.py shell 中執(zhí)行下面的代碼:
import jsonfrom django_celery_beat.models import IntervalSchedule, CrontabSchedule, PeriodicTasktwenty_second_interval, _ = IntervalSchedule.objects.get_or_create(every=20, period=IntervalSchedule.SECONDS)eleven_clock_crontab, _ = CrontabSchedule.objects.get_or_create(minute=18, hour=23)PeriodicTask.objects.get_or_create( interval_id=twenty_second_interval.id, name="twenty_second_interval", task="blog.tasks.add", args=json.dumps([1, 2]),)PeriodicTask.objects.get_or_create( crontab_id=eleven_clock_crontab.id, name="eleven_clock_crontab", task="blog.tasks.minus", args=json.dumps([8, 2]),)
然后可以看到運行 beat 的 shell 中或者日志文件有輸出下面的信息:
DatabaseScheduler: Schedule changed.
其實就是系統(tǒng)監(jiān)測了 PeriodicTask 表,發(fā)現(xiàn)它的數(shù)據(jù)有變化就會重新更改一次,當 beat 服務啟動,系統(tǒng)會去 PeriodicTask 表里獲取數(shù)據(jù)。
如果這些任務的數(shù)據(jù)有更改,系統(tǒng)就會檢測到然后發(fā)出 Schedule changed
的信息。
我這邊測試了 name、enabled、one_off、args 等字段,發(fā)現(xiàn)修改后系統(tǒng)都會捕獲到任務的變化。
其中,one_off 字段的含義是該任務僅執(zhí)行一次。
如果想獲取更多后端相關文章,可掃碼關注閱讀: