python - celery beat 調度如何運行期間動態添加任務?
問題描述
我嘗試過django-celery-beat,在admin后臺添加任務,可以實現動態添加任務但要重啟celery beat才生效,請問,有其他方試嗎?
問題解答
回答1:無法動態添加,必須重啟 beat。
ask 回答過原因了 #3493
回答2:有個思路,你可以考慮,我目前也在嘗試這個方法,處于摸石過河階段。celery是支持定時任務,但是不符合我的需求,我需要像linux下的crontab這樣動態添加定時任務,我也看了django-celery-beat,因為用的是Flask,發現不值得參考實現,所以一直在看文檔搜資料,終于被我找到一種方式,celery的apply_async這個函數非常有用,它有個eta參數,它的簡化使用是countdown,但是eta的威力是很巨大的,因為它只接受datetime對象,比如你給定一個任務在2017-05-02 20:0:0執行,你可以這樣使用:
job.apply_async(args=args, kwarg=kwargs, eta=datetime(2017,5,2,20,0,0))
是不是很少用,假如我有一個任務需要每天晚上八點執行,我可以利用這個eta參數實現。偽代碼如下:
時間規則 = ’每天晚上八點執行’第一次調用任務,先計算最近的執行時間,作為eta的參數,調用apply_async函數,然后第一次任務執行成功,得到上次任務的eta參數值,在天的值上加一,然后把新的執行時間作為eta的參數再次調用apply_async函數,這里省略了很多判斷,自行腦補。循環往復,是不是一直按每天晚上八點執行。
這里有個非常重要的點是如何在任務執行成功的時候計算下一次的執行時間,做法如下
class MyTask(Task): def on_success(self, retval, task_id, args, kwargs):print ’task done: {0}’.format(retval)return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo):print ’task fail, reason: {0}’.format(exc)return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)@app.task(base=MyTask)def add(x, y): return x + y
它提供了任務執行成功和失敗的函數,我們只要在此基礎上重寫就可以了,我說的只是最核心的部分,具體怎么做有很多方法,
相關文章:
1. 解決Android webview設置cookie和cookie丟失的問題2. angular.js - angularjs如何傳遞id給另一個視圖 根據id獲取json數據?3. android - Windows系統下運行react-native App時,報下面的錯誤?4. python - 我想把下面代碼中的多余空白行去除該怎么做,如何用正則實現5. Navicat for mysql 中以json格式儲存的數據存在大量反斜杠,如何去除?6. 我設置的背景怎么顯示不出來7. android - 安卓webview無法全屏播放iframe騰訊、優酷視頻8. python - tweepy 庫 連接Twitter API 報錯9. java - mysql緩存問題10. python - 如何對列表中的列表進行頻率統計?
