本文實(shí)例為大家分享了celery動(dòng)態(tài)設(shè)置定時(shí)任務(wù)的具體代碼,供大家參考,具體內(nèi)容如下
首先celery是一種異步任務(wù)隊(duì)列,如果還不熟悉這個(gè)開(kāi)源軟件的請(qǐng)先看看官方文檔,快速入門(mén)。
這里講的動(dòng)態(tài)設(shè)置定時(shí)任務(wù)的方法不使用數(shù)據(jù)庫(kù)保存定時(shí)任務(wù)的信息,所以是項(xiàng)目重啟后定時(shí)任務(wù)配置就會(huì)丟失,如果想保存成永久配置,可以考慮保存到數(shù)據(jù)庫(kù)、redis或者使用pickle、json保存成文件,在項(xiàng)目啟動(dòng)時(shí)自動(dòng)載入。
先來(lái)看一下celery的beat運(yùn)行過(guò)程。
上圖是beat的主要組成結(jié)構(gòu),beat中包含了一個(gè)service對(duì)象,service中包含了一個(gè)scheduler對(duì)象,scheduler中包含了一個(gè)schedule字典,schedule中key對(duì)應(yīng)的的value才是真正的定時(shí)任務(wù),是整個(gè)beat中最小的單元。
首先分別介紹一下各個(gè)對(duì)象和它們運(yùn)行的過(guò)程,beat是celery.apps.beat.Beat類(lèi)創(chuàng)建的對(duì)象,調(diào)用beat.run()方法就可以啟動(dòng)beat,下面是beat.run()方法的源碼。
def run(self): print(str(self.colored.cyan( 'celery beat v{0} is starting.'.format(VERSION_BANNER)))) self.init_loader() self.set_process_title() self.start_scheduler()
重點(diǎn)是在run()方法里調(diào)用了start_scheduler()方法,而start_scheduler()方法本質(zhì)上是創(chuàng)建了一個(gè)service對(duì)象(celery.beat.Service類(lèi)),并調(diào)用service.start()方法,下面是beat.start_scheduler()方法的源碼。
def start_scheduler(self): if self.pidfile: platforms.create_pidlock(self.pidfile) service = self.Service( app=self.app, max_interval=self.max_interval, scheduler_cls=self.scheduler_cls, schedule_filename=self.schedule, ) print(self.banner(service)) self.setup_logging() if self.socket_timeout: logger.debug('Setting default socket timeout to %r', self.socket_timeout) socket.setdefaulttimeout(self.socket_timeout) try: self.install_sync_handler(service) service.start() except Exception as exc: logger.critical('beat raised exception %s: %r', exc.__class__, exc, exc_info=True) raise
調(diào)用了service.start()之后,會(huì)進(jìn)入一個(gè)死循環(huán),先使用self.scheduler.tick()獲取下一個(gè)任務(wù)a的定時(shí)點(diǎn)到現(xiàn)在時(shí)間的間隔,然后進(jìn)入睡眠,睡眠結(jié)束之后判斷如果self.scheduler里的下一個(gè)任務(wù)a可以執(zhí)行,就立即執(zhí)行,并獲取self.scheduler里的下下一個(gè)任務(wù)b的定時(shí)點(diǎn)到現(xiàn)在時(shí)間的間隔,進(jìn)入下一次循環(huán)。下面是service.start()的源碼。
def start(self, embedded_process=False): info('beat: Starting...') debug('beat: Ticking with max interval->%s', humanize_seconds(self.scheduler.max_interval)) signals.beat_init.send(sender=self) if embedded_process: signals.beat_embedded_init.send(sender=self) platforms.set_process_title('celery beat') try: while not self._is_shutdown.is_set(): interval = self.scheduler.tick() if interval and interval > 0.0: debug('beat: Waking up %s.', humanize_seconds(interval, prefix='in ')) time.sleep(interval) if self.scheduler.should_sync(): self.scheduler._do_sync() except (KeyboardInterrupt, SystemExit): self._is_shutdown.set() finally: self.sync()
service.scheduler默認(rèn)是celery.beat.PersistentScheduler類(lèi)的實(shí)例對(duì)象,而celery.beat.PersistentScheduler其實(shí)是celery.beat.Scheduler的子類(lèi),所以scheduler.schedule是celery.beat.Scheduler類(lèi)中的字典,保存的是celery.beat.ScheduleEntry類(lèi)型的對(duì)象。ScheduleEntry的實(shí)例對(duì)象保存了定時(shí)任務(wù)的名稱(chēng)、參數(shù)、定時(shí)信息、過(guò)期時(shí)間等信息。celery.beat.Scheduler類(lèi)實(shí)現(xiàn)了對(duì)schedule的更新方法即update_from_dict(self, dict_)方法。下面是update_from_dict(self, dict_)方法的源碼。
def _maybe_entry(self, name, entry): if isinstance(entry, self.Entry): entry.app = self.app return entry return self.Entry(**dict(entry, name=name, app=self.app)) def update_from_dict(self, dict_): self.schedule.update({ name: self._maybe_entry(name, entry) for name, entry in items(dict_) })
可以看到update_from_dict(self, dict_)方法實(shí)際上是向schedule中更新了self.Entry的實(shí)例對(duì)象,而self.Entry從celery.beat.Scheduler的源碼知道是celery.beat.ScheduleEntry。
到這里整個(gè)流程就粗略的介紹完了,基本過(guò)程是這個(gè)樣子。
但是從前面start_scheduler()的源碼可以看到,beat在內(nèi)部創(chuàng)建一個(gè)service之后,就直接進(jìn)入死循環(huán)了,所以從外面無(wú)法拿到service對(duì)象,就不能對(duì)service里的scheduler對(duì)象操作,就不能對(duì)scheduler的schedule字典操作,所以就無(wú)法在beat運(yùn)行的過(guò)程中動(dòng)態(tài)添加定時(shí)任務(wù)。
前面介紹完原理,現(xiàn)在來(lái)講一下解決思路。主要思路就是讓start_scheduler方法中創(chuàng)建的service暴露出來(lái)。所以就想到手寫(xiě)一個(gè)類(lèi)去繼承Beat,重寫(xiě)start_scheduler()方法。
import socket from celery import platforms from celery.apps.beat import Beat class MyBeat(Beat): ''' 繼承Beat 添加一個(gè)獲取service的方法 ''' def start_scheduler(self): if self.pidfile: platforms.create_pidlock(self.pidfile) # 修改了獲取service的方式 service = self.get_service() print(self.banner(service)) self.setup_logging() if self.socket_timeout: logger.debug('Setting default socket timeout to %r', self.socket_timeout) socket.setdefaulttimeout(self.socket_timeout) try: self.install_sync_handler(service) service.start() except Exception as exc: logger.critical('beat raised exception %s: %r', exc.__class__, exc, exc_info=True) raise def get_service(self): ''' 這個(gè)是自定義的 目的是為了把service暴露出來(lái),方便對(duì)service的scheduler操作,因?yàn)槎〞r(shí)任務(wù)信息都存放在service.scheduler里 :return: ''' service = getattr(self, "service", None) if service is None: service = self.Service( app=self.app, max_interval=self.max_interval, scheduler_cls=self.scheduler_cls, schedule_filename=self.schedule, ) setattr(self, "service", service) return self.service
在MyBeat類(lèi)中添加一個(gè)get_service()方法,如果beat沒(méi)有servic對(duì)象就創(chuàng)建一個(gè),如果有就直接返回,方便對(duì)service的scheduler操作。
然后在此基礎(chǔ)上實(shí)現(xiàn)對(duì)定時(shí)任務(wù)的增刪改查操作。
def add_cron_task(task_name: str, cron_task: str, minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*', **kwargs): ''' 創(chuàng)建或更新定時(shí)任務(wù) :param task_name: 定時(shí)任務(wù)名稱(chēng) :param cron_task: task名稱(chēng) :param minute: 以下是時(shí)間 :param hour: :param day_of_week: :param day_of_month: :param month_of_year: :param kwargs: :return: ''' service = beat.get_service() scheduler = service.scheduler entries = dict() entries[task_name] = { 'task': cron_task, 'schedule': crontab(minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month, month_of_year=month_of_year, **kwargs), 'options': {'expires': 3600}} scheduler.update_from_dict(entries) def del_cron_task(task_name: str): ''' 刪除定時(shí)任務(wù) :param task_name: :return: ''' service = beat.get_service() scheduler = service.scheduler if scheduler.schedule.get(task_name, None) is not None: del scheduler.schedule[task_name] def get_cron_task(): ''' 獲取當(dāng)前所有定時(shí)任務(wù)的配置 :return: ''' service = beat.get_service() scheduler = service.scheduler ret = [{k: {"task": v.task, "crontab": v.schedule}} for k, v in scheduler.schedule.items()] return ret
但是僅僅是這樣還不能解決問(wèn)題,從前面的serive.start()的源碼看到,beat啟動(dòng)后會(huì)進(jìn)入一個(gè)死循環(huán),如果直接在主線程啟動(dòng)beat,必然會(huì)阻塞在死循環(huán)中,所以需要為beat創(chuàng)建一個(gè)子線程,這樣才影響主線程的其他操作。
flag = False beat = MyBeat(max_interval=10, app=celery_app, socket_timeout=30, pidfile=None, no_color=None, loglevel='INFO', logfile=None, schedule=None, scheduler='celery.beat.PersistentScheduler', scheduler_cls=None, # XXX use scheduler redirect_stdouts=None, redirect_stdouts_level=None) # 設(shè)置主動(dòng)啟動(dòng)beat是為了避免使用celery -A celery_demo worker 命令重復(fù)啟動(dòng)worker def run(): ''' 啟動(dòng)Beat :return: ''' beat.run() def new_thread(): ''' 創(chuàng)建一個(gè)線程啟動(dòng)Beat 最多只能創(chuàng)建一個(gè) :return: ''' global flag if not flag: t = threading.Thread(target=run, daemon=True) t.start() # 啟動(dòng)成功2s后才能操作定時(shí)任務(wù) 否則可能會(huì)報(bào)錯(cuò) time.sleep(2) flag = True
可能看到上面的代碼有人會(huì)想,為什么不在主程序加載完成就啟動(dòng)為beat創(chuàng)建一個(gè)子線程,還非要寫(xiě)個(gè)函數(shù)等待主動(dòng)調(diào)用?這是因?yàn)槔缭谑褂胐jango+celery組合時(shí),一般啟動(dòng)django和啟動(dòng)celery woker是兩個(gè)獨(dú)立的進(jìn)程,如果讓django在加載代碼的時(shí)候自動(dòng)啟動(dòng)beat的子線程,那么在使用celery -A demo_name worker 啟動(dòng)celery時(shí),會(huì)重新加載一邊django的代碼,因?yàn)閏elery需要掃描每個(gè)app下的tasks.py文件,加載異步任務(wù)函數(shù),這時(shí)啟動(dòng)celery woker就會(huì)也啟動(dòng)一個(gè)beat子線程,可能會(huì)造成定時(shí)任務(wù)重復(fù)執(zhí)行的情況。所以在這里設(shè)置成主動(dòng)開(kāi)啟beat子線程,目的就是為了celery worker啟動(dòng)不重復(fù)創(chuàng)建beat線程。
完整的代碼如下:
import socket import time import threading from celery import platforms from celery.schedules import crontab from celery.apps.beat import Beat from celery.utils.log import get_logger from celery_demo import celery_app logger = get_logger('celery.beat') flag = False class MyBeat(Beat): ''' 繼承Beat 添加一個(gè)獲取service的方法 ''' def start_scheduler(self): if self.pidfile: platforms.create_pidlock(self.pidfile) # 修改了獲取service的方式 service = self.get_service() print(self.banner(service)) self.setup_logging() if self.socket_timeout: logger.debug('Setting default socket timeout to %r', self.socket_timeout) socket.setdefaulttimeout(self.socket_timeout) try: self.install_sync_handler(service) service.start() except Exception as exc: logger.critical('beat raised exception %s: %r', exc.__class__, exc, exc_info=True) raise def get_service(self): ''' 這個(gè)是自定義的 目的是為了把service暴露出來(lái),方便對(duì)service的scheduler操作,因?yàn)槎〞r(shí)任務(wù)信息都存放在service.scheduler里 :return: ''' service = getattr(self, "service", None) if service is None: service = self.Service( app=self.app, max_interval=self.max_interval, scheduler_cls=self.scheduler_cls, schedule_filename=self.schedule, ) setattr(self, "service", service) return self.service beat = MyBeat(max_interval=10, app=celery_app, socket_timeout=30, pidfile=None, no_color=None, loglevel='INFO', logfile=None, schedule=None, scheduler='celery.beat.PersistentScheduler', scheduler_cls=None, # XXX use scheduler redirect_stdouts=None, redirect_stdouts_level=None) # 設(shè)置主動(dòng)啟動(dòng)beat是為了避免使用celery -A celery_demo worker 命令重復(fù)啟動(dòng)worker def run(): ''' 啟動(dòng)Beat :return: ''' beat.run() def new_thread(): ''' 創(chuàng)建一個(gè)線程啟動(dòng)Beat 最多只能創(chuàng)建一個(gè) :return: ''' global flag if not flag: t = threading.Thread(target=run, daemon=True) t.start() # 啟動(dòng)成功2s后才能操作定時(shí)任務(wù) 否則可能會(huì)報(bào)錯(cuò) time.sleep(2) flag = True def add_cron_task(task_name: str, cron_task: str, minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*', **kwargs): ''' 創(chuàng)建或更新定時(shí)任務(wù) :param task_name: 定時(shí)任務(wù)名稱(chēng) :param cron_task: task名稱(chēng) :param minute: 以下是時(shí)間 :param hour: :param day_of_week: :param day_of_month: :param month_of_year: :param kwargs: :return: ''' service = beat.get_service() scheduler = service.scheduler entries = dict() entries[task_name] = { 'task': cron_task, 'schedule': crontab(minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month, month_of_year=month_of_year, **kwargs), 'options': {'expires': 3600}} scheduler.update_from_dict(entries) def del_cron_task(task_name: str): ''' 刪除定時(shí)任務(wù) :param task_name: :return: ''' service = beat.get_service() scheduler = service.scheduler if scheduler.schedule.get(task_name, None) is not None: del scheduler.schedule[task_name] def get_cron_task(): ''' 獲取當(dāng)前所有定時(shí)任務(wù)的配置 :return: ''' service = beat.get_service() scheduler = service.scheduler ret = [{k: {"task": v.task, "crontab": v.schedule}} for k, v in scheduler.schedule.items()] return ret
另外還可以參考我的github,相關(guān)的注釋在代碼里寫(xiě)的較為清晰。
注意:使用這種方式添加/刪除定時(shí)任務(wù)只是保存在內(nèi)存中的,項(xiàng)目重啟后就會(huì)丟失。如果想要持久化,可以參照上面的方法,把相關(guān)信息保存到數(shù)據(jù)庫(kù)或其他可持久保存文件中,在beat線程啟動(dòng)時(shí)加載相關(guān)任務(wù)信息,在對(duì)定時(shí)任務(wù)修改做增刪改時(shí)及時(shí)修改數(shù)據(jù)庫(kù)或文件中內(nèi)容。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
標(biāo)簽:梅州 昆明 西寧 錫林郭勒盟 文山 懷化 石家莊 浙江
巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《celery實(shí)現(xiàn)動(dòng)態(tài)設(shè)置定時(shí)任務(wù)》,本文關(guān)鍵詞 celery,實(shí)現(xiàn),動(dòng)態(tài),設(shè)置,定時(shí),;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問(wèn)題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無(wú)關(guān)。