python 基于Apscheduler實現定時任務
在工作場景遇到了這么一個場景,就是需要定期去執行一個緩存接口,用于同步設備配置。首先想到的就是Linux上的crontab,可以定期,或者間隔一段時間去執行任務。但是如果你想要把這個定時任務作為一個模塊集成到Python項目中,或者想持久化任務,顯然crontab不太適用。Python的APScheduler模塊能夠很好的解決此類問題,所以專門寫這篇文章,從簡單入門開始記錄關于APScheduler最基礎的使用場景,以及解決持久化任務的問題,最后結合其他框架深層次定制定時任務模塊這幾個點入手。
簡單介紹先簡單介紹一下Apscheduler模塊包含的四種組件:
Trigger觸發器 Job作業 Excutor執行器 Scheduler調度器大概了解了Apscheduler包含的幾種概念,現在先來看一下一個簡單的示例:
# -*- coding: utf-8 -*-from apscheduler.schedulers.blocking import BlockingSchedulerimport timedef hello(): print(time.strftime('%c'))if __name__ == '__main__': scheduler = BlockingScheduler() scheduler.add_job(hello, ’interval’, seconds=5) scheduler.start()
示例的輸出:
Thu Dec 3 16:01:20 2020Thu Dec 3 16:01:25 2020Thu Dec 3 16:01:30 2020Thu Dec 3 16:01:35 2020Thu Dec 3 16:01:40 2020..........
這個簡單的示例,我們用上面提到幾種組件分析一下運行邏輯:
首先是Scheduler調度器,這個示例使用的BlockingScheduler調度器,在官方文檔中的解釋是,BlockingScheduler適合當你的這個定時任務程序是唯一運行的程序;換言之,則是BlockingScheduler調度器是一個阻塞調度器,當程序運行這種調度器,進程則會阻塞,無法執行其他操作; 其次是Job作業和觸發器,這兩個放在一起講是因為,在定義作業的時候,你就需要選擇一個觸發器,這里選擇的是interval觸發器,這種觸發器會以固定時間間隔運行作業。換言之,為調度器添加一個hello的工作,并以每5秒的時間間隔執行任務。 最后就是執行器,默認是ThreadPoolExcutor執行器,他們將任務中可調用對象交給線程池執行操作,等完成操作后,執行器會通知調度程序。內置的三種Trigger觸發器類型:
date:特定時間僅運行一次作業 interval: 固定的時間間隔內運行一次作業 cron: 在一天內特定的時間定期運行作業常見的Scheduler調度器:
BlockingScheduler: 調度程序是流程中唯一運行的東西 BackgroundScheduler: 調度程序在應用程序內部的后臺運行時使用 AsyncIOScheduler: 應用程序使用asyncio模塊 GeventScheduler: 應用程序使用gevent模塊 TornadoScheduler:構建Tornado應用程序時使用 TwistedScheduler: 構建Tornado應用程序時使用 QtScheduler: 在構建QT應用程序時使用常見的JobStore:
MemoryJobStore MongoDBJobStore SQLAlchemyJobStore RedisJobStore 進階使用通過上面一個簡單的示例了解大概的工作流程,以及各個組件在整個流程中的作用,以下的示例是Flask Web框架結合使用Apscheduler定時器,定時執行任務。
# -*- coding: utf-8 -*-from flask import Flask, Blueprint, requestfrom apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.schedulers.background import BackgroundSchedulerfrom apscheduler.jobstores.redis import RedisJobStoreimport timeapp = Flask(__name__)executors = {'default': ThreadPoolExecutor(5)}default_redis_jobstore = RedisJobStore(db=2, jobs_key='apschedulers.default_jobs', run_times_key='apschedulers.default_run_times', host = ’127.0.0.1’, port = 6379 )scheduler = BackgroundScheduler(executors=executors)scheduler.add_jobstore(default_redis_jobstore)scheduler.start()def say_hello(): print(time.strftime('%c'))@app.route('/get_job', methods=[’GET’])def get_job(): if scheduler.get_job('say_hello_test'): return 'YES' else: return 'NO'@app.route('/start_job', methods=['GET'])def start_job(): if not scheduler.get_job('say_hello_test'): scheduler.add_job(say_hello, 'interval', seconds=5, id='say_hello_test') return 'Start Scuessfully!' else: return 'Started Failed' @app.route('/remove_job', methods=['GET'])def remove_job(): if scheduler.get_job('say_hello_test'): scheduler.remove_job('say_hello_test') return 'Delete Successfully!' else: return 'Delete Failed'if __name__ == '__main__': app.run(host='127.0.0.1', port=8787, debug=True) 先分析Jobstore,這里使用的是RedisJobstore,將任務序列化存入到Redis數據庫中。這里順便提一下,為什么需要設置作業存儲器,原因是當調度器程序崩潰時,仍然能夠保留作業,當然選擇什么作業存儲器,可以根據具體的工作場景,目前主流的mysql,mongodb,redis,SQLite基本都支持; 然后再看看Scheduler,這里使用的時BackgroundScheduler,因為這里要求調度程序不能阻塞flask程序的正常接收請求,所以選在BackgrounScheduler讓它在開始執行任務時是在后臺運行的,不會阻塞主線程; 最后看看工作的邏輯,這里get_job獲取作業的狀態,查看作業是否存在,start_job則是先判斷作業是否啟動,然后再決定啟動操作,remove_job則是停止作業。而這里的作業定義則是通過interval觸發器,每五秒執行一次say_hello任務;總結
最后總結一下,首先你要設置一個作業存儲器用于在調度程序崩潰重新恢復時,還能夠在作業存儲器中獲取到作業繼續執行;然后你需要設置一個執行器,這個根據作業的類型,比如時一個CPU密集型的任務,那就可以用進程池執行器,默認是用線程池執行器;最后創建配置調度器,啟動調度,可以在啟動前添加作業,也可以在啟動后添加,刪除,獲取作業。(在這里需要明白的一點就是應用程序不會直接去操作作業存儲器,作業或者執行器,而是調度器提供適當的接口來處理這些接口。)
ApScheduler是一個不錯的定時任務庫,能夠動態的添加刪除,同時也支持不同的觸發器類型,這也是它的優勢,相反一些如果是靜態任務,其實可以用如linux的crontab工具去做定時任務。有關這方面的記錄還會持續更新,如果有什么問題,可以提出來,大家一起探討。
以上就是python Apscheduler的使用方法的詳細內容,更多關于python Apscheduler的資料請關注好吧啦網其它相關文章!
相關文章: