Non Blocking Scheduled tasks in python
A lot of times we need to run some kind of automatic update scripts in our python api webserver.
One example of it might be automatically fetching and updating your machine learning models if they have been updated in a remote location with least disruption.
Also the automatic update task needs to be non blocking in nature.
Based on the above requirements, we can either use a threading or multiprocessing module to make it non blocking in nature
We use the threading module, because we require a access to shared variables of other object. Machine Learning model in this case.
Below is a scheduled task class which uses the power of Threading and Event api's to perform it.
import threading
import typing
from datetime import timedelta
class ScheduledTask(threading.Thread):
def __init__(self, time_interval_seconds: int, scheduled_fn: typing.Callable, *args, **kwargs):
threading.Thread.__init__(self)
self.daemon = True
self.stopped = threading.Event()
self.time_interval_seconds = timedelta(seconds=time_interval_seconds)
self.scheduled_fn = scheduled_fn
self.args = args
self.kwargs = kwargs
def run(self):
while not self.stopped.wait(self.time_interval_seconds.total_seconds()):
self.scheduled_fn(*self.args, **self.kwargs)
class MachineLearningModel:
def __init__(self):
self.model = self._fetch_model()
self.automatic_update_task = ScheduledTask(100, self._automatic_update_model)
self.automatic_update_task.start()
def _fetch_model(self):
"""Code to fetch model from remote repo"""
pass
def _automatic_update_model(self):
self.model = self._fetch_model()
def predict(self,input):
"""code to perform prediction"""
pass
model_object = MachineLearningModel()
# model.predict()
Thats it.
Thanks for reading.