This post was originally published at https://rudra.dev/posts/setting-up-a-task-scheduler-in-flask
The first thing that comes to mind while considering a task scheduler is a cron job. As most of the today’s servers are hosted on linux machines, setting a cron job for periodic task might seem like a good option for many. However in production having a crontab is nothing but a pain in the a**. It can be a bit tricky to configure different timezones depending upon the location of the server.
The biggest problem with this approach is when the application is scaled into multiple web servers. In that case instead of running one we could be running multiple cron jobs which might lead to race conditions. Also it’s hard to debug if something goes wrong with the task.
With Flask there are multiple ways to address third problem and Celery is one of the most popular ones. Celery addresses the above problems quite gracefully. It uses same timezones of pytz which helps in calculating timezones and setting the scheduler timings accurately.
Celery uses a backend message broker (redis or RabbitMQ) to save the state of the schedule which acts as a centralized database server for multiple celery workers running on different web servers.The message broker ensures that the task is run only once as per the schedule, hence eliminating the race condition.
Monitoring real time events is also supported by Celery. It includes a beautiful built-in terminal interface that shows all the current events.A nice standalone project Flower provides a web based tool to administer Celery workers and tasks.It also supports asynchronous task execution which comes in handy for long running tasks.
Let’s go hacking
flask-celery││ app.py│ docker-compose.yml│ Dockerfile│ entrypoint.sh│ requirements.txt│└────────────────────────flask-celery │ │ app.py │ docker-compose.yml │ Dockerfile │ entrypoint.sh │ requirements.txt │ └────────────────────────flask-celery │ │ app.py │ docker-compose.yml │ Dockerfile │ entrypoint.sh │ requirements.txt │ └────────────────────────
Enter fullscreen mode Exit fullscreen mode
<span>FROM</span><span> python:3.7</span><span># Create a directory named flask</span><span>RUN </span><span>mkdir </span>flask<span># Copy everything to flask folder</span><span>COPY</span><span> . /flask/</span><span># Make flask as working directory</span><span>WORKDIR</span><span> /flask</span><span># Install the Python libraries</span><span>RUN </span>pip3 <span>install</span> <span>--no-cache-dir</span> <span>-r</span> requirements.txt<span>EXPOSE</span><span> 5000</span><span># Run the entrypoint script</span><span>CMD</span><span> ["bash", "entrypoint.sh"]</span><span>FROM</span><span> python:3.7</span> <span># Create a directory named flask</span> <span>RUN </span><span>mkdir </span>flask <span># Copy everything to flask folder</span> <span>COPY</span><span> . /flask/</span> <span># Make flask as working directory</span> <span>WORKDIR</span><span> /flask</span> <span># Install the Python libraries</span> <span>RUN </span>pip3 <span>install</span> <span>--no-cache-dir</span> <span>-r</span> requirements.txt <span>EXPOSE</span><span> 5000</span> <span># Run the entrypoint script</span> <span>CMD</span><span> ["bash", "entrypoint.sh"]</span>FROM python:3.7 # Create a directory named flask RUN mkdir flask # Copy everything to flask folder COPY . /flask/ # Make flask as working directory WORKDIR /flask # Install the Python libraries RUN pip3 install --no-cache-dir -r requirements.txt EXPOSE 5000 # Run the entrypoint script CMD ["bash", "entrypoint.sh"]
Enter fullscreen mode Exit fullscreen mode
The packages required for this application are mentioned in the requirement.txt file.
Flask==1.0.2celery==4.3.0redis==3.3.11Flask==1.0.2 celery==4.3.0 redis==3.3.11Flask==1.0.2 celery==4.3.0 redis==3.3.11
Enter fullscreen mode Exit fullscreen mode
The entry point script goes here.
<span>#!/bin/sh</span>flask run <span>--host</span><span>=</span>0.0.0.0 <span>--port</span> 5000<span>#!/bin/sh</span> flask run <span>--host</span><span>=</span>0.0.0.0 <span>--port</span> 5000#!/bin/sh flask run --host=0.0.0.0 --port 5000
Enter fullscreen mode Exit fullscreen mode
Celery uses a message broker to pass messages between the web app and celery workers. Here we will setup a Redis container which will be used as the message broker.
<span>version</span><span>:</span> <span>"</span><span>3.7"</span><span>services</span><span>:</span><span>redis</span><span>:</span><span>container_name</span><span>:</span> <span>redis_dev_container</span><span>image</span><span>:</span> <span>redis</span><span>ports</span><span>:</span><span>-</span> <span>"</span><span>6379:6379"</span><span>flask_service</span><span>:</span><span>container_name</span><span>:</span> <span>flask_dev_container</span><span>restart</span><span>:</span> <span>always</span><span>image</span><span>:</span> <span>flask</span><span>build</span><span>:</span><span>context</span><span>:</span> <span>./</span><span>dockerfile</span><span>:</span> <span>Dockerfile</span><span>depends_on</span><span>:</span><span>-</span> <span>redis</span><span>ports</span><span>:</span><span>-</span> <span>"</span><span>5000:5000"</span><span>volumes</span><span>:</span><span>-</span> <span>./:/flask</span><span>environment</span><span>:</span><span>-</span> <span>FLASK_DEBUG=1</span><span>version</span><span>:</span> <span>"</span><span>3.7"</span> <span>services</span><span>:</span> <span>redis</span><span>:</span> <span>container_name</span><span>:</span> <span>redis_dev_container</span> <span>image</span><span>:</span> <span>redis</span> <span>ports</span><span>:</span> <span>-</span> <span>"</span><span>6379:6379"</span> <span>flask_service</span><span>:</span> <span>container_name</span><span>:</span> <span>flask_dev_container</span> <span>restart</span><span>:</span> <span>always</span> <span>image</span><span>:</span> <span>flask</span> <span>build</span><span>:</span> <span>context</span><span>:</span> <span>./</span> <span>dockerfile</span><span>:</span> <span>Dockerfile</span> <span>depends_on</span><span>:</span> <span>-</span> <span>redis</span> <span>ports</span><span>:</span> <span>-</span> <span>"</span><span>5000:5000"</span> <span>volumes</span><span>:</span> <span>-</span> <span>./:/flask</span> <span>environment</span><span>:</span> <span>-</span> <span>FLASK_DEBUG=1</span>version: "3.7" services: redis: container_name: redis_dev_container image: redis ports: - "6379:6379" flask_service: container_name: flask_dev_container restart: always image: flask build: context: ./ dockerfile: Dockerfile depends_on: - redis ports: - "5000:5000" volumes: - ./:/flask environment: - FLASK_DEBUG=1
Enter fullscreen mode Exit fullscreen mode
Now we are all set to start our little experiment. We have a redis container running on port 6379 and a flask container running on localhost:5000
. Let’s add a simple api to test whether our tiny web application works.
<span>from</span> <span>flask</span> <span>import</span> <span>Flask</span><span>app</span> <span>=</span> <span>Flask</span><span>(</span><span>__name__</span><span>)</span><span>@app.route</span><span>(</span><span>"</span><span>/</span><span>"</span><span>)</span><span>def</span> <span>index_view</span><span>():</span><span>return</span> <span>"</span><span>Flask-celery task scheduler!</span><span>"</span><span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span><span>app</span><span>.</span><span>run</span><span>()</span><span>from</span> <span>flask</span> <span>import</span> <span>Flask</span> <span>app</span> <span>=</span> <span>Flask</span><span>(</span><span>__name__</span><span>)</span> <span>@app.route</span><span>(</span><span>"</span><span>/</span><span>"</span><span>)</span> <span>def</span> <span>index_view</span><span>():</span> <span>return</span> <span>"</span><span>Flask-celery task scheduler!</span><span>"</span> <span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span> <span>app</span><span>.</span><span>run</span><span>()</span>from flask import Flask app = Flask(__name__) @app.route("/") def index_view(): return "Flask-celery task scheduler!" if __name__ == "__main__": app.run()
Enter fullscreen mode Exit fullscreen mode
And voila!
Now we will be building a simple timer application which will show the elapsed time since the application has started. We need to configure celery with the Redis server URL and also we will be using another Redis database to store the time.
<span>from</span> <span>flask</span> <span>import</span> <span>Flask</span><span>from</span> <span>celery</span> <span>import</span> <span>Celery</span><span>import</span> <span>redis</span><span>app</span> <span>=</span> <span>Flask</span><span>(</span><span>__name__</span><span>)</span><span># Add Redis URL configurations </span><span>app</span><span>.</span><span>config</span><span>[</span><span>"</span><span>CELERY_BROKER_URL</span><span>"</span><span>]</span> <span>=</span> <span>"</span><span>redis://redis:6379/0</span><span>"</span><span>app</span><span>.</span><span>config</span><span>[</span><span>"</span><span>CELERY_RESULT_BACKEND</span><span>"</span><span>]</span> <span>=</span> <span>"</span><span>redis://redis:6379/0</span><span>"</span><span># Connect Redis db </span><span>redis_db</span> <span>=</span> <span>redis</span><span>.</span><span>Redis</span><span>(</span><span>host</span><span>=</span><span>"</span><span>redis</span><span>"</span><span>,</span> <span>port</span><span>=</span><span>"</span><span>6379</span><span>"</span><span>,</span> <span>db</span><span>=</span><span>1</span><span>,</span> <span>charset</span><span>=</span><span>"</span><span>utf-8</span><span>"</span><span>,</span> <span>decode_responses</span><span>=</span><span>True</span><span>)</span><span># Initialize timer in Redis </span><span>redis_db</span><span>.</span><span>mset</span><span>({</span><span>"</span><span>minute</span><span>"</span><span>:</span> <span>0</span><span>,</span> <span>"</span><span>second</span><span>"</span><span>:</span> <span>0</span><span>})</span><span># Add periodic tasks </span><span>celery_beat_schedule</span> <span>=</span> <span>{</span><span>"</span><span>time_scheduler</span><span>"</span><span>:</span> <span>{</span><span>"</span><span>task</span><span>"</span><span>:</span> <span>"</span><span>app.timer</span><span>"</span><span>,</span><span># Run every second </span> <span>"</span><span>schedule</span><span>"</span><span>:</span> <span>1.0</span><span>,</span><span>}</span><span>}</span><span># Initialize Celery and update its config </span><span>celery</span> <span>=</span> <span>Celery</span><span>(</span><span>app</span><span>.</span><span>name</span><span>)</span><span>celery</span><span>.</span><span>conf</span><span>.</span><span>update</span><span>(</span><span>result_backend</span><span>=</span><span>app</span><span>.</span><span>config</span><span>[</span><span>"</span><span>CELERY_RESULT_BACKEND</span><span>"</span><span>],</span><span>broker_url</span><span>=</span><span>app</span><span>.</span><span>config</span><span>[</span><span>"</span><span>CELERY_BROKER_URL</span><span>"</span><span>],</span><span>timezone</span><span>=</span><span>"</span><span>UTC</span><span>"</span><span>,</span><span>task_serializer</span><span>=</span><span>"</span><span>json</span><span>"</span><span>,</span><span>accept_content</span><span>=</span><span>[</span><span>"</span><span>json</span><span>"</span><span>],</span><span>result_serializer</span><span>=</span><span>"</span><span>json</span><span>"</span><span>,</span><span>beat_schedule</span><span>=</span><span>celery_beat_schedule</span><span>,</span><span>)</span><span>@app.route</span><span>(</span><span>"</span><span>/</span><span>"</span><span>)</span><span>def</span> <span>index_view</span><span>():</span><span>return</span> <span>"</span><span>Flask-celery task scheduler!</span><span>"</span><span>@app.route</span><span>(</span><span>"</span><span>/timer</span><span>"</span><span>)</span><span>def</span> <span>timer_view</span><span>():</span><span>time_counter</span> <span>=</span> <span>redis_db</span><span>.</span><span>mget</span><span>([</span><span>"</span><span>minute</span><span>"</span><span>,</span> <span>"</span><span>second</span><span>"</span><span>])</span><span>return</span> <span>f</span><span>"</span><span>Minute: </span><span>{</span><span>time_counter</span><span>[</span><span>0</span><span>]</span><span>}</span><span>, Second: </span><span>{</span><span>time_counter</span><span>[</span><span>1</span><span>]</span><span>}</span><span>"</span><span>@celery.task</span><span>def</span> <span>timer</span><span>():</span><span>second_counter</span> <span>=</span> <span>int</span><span>(</span><span>redis_db</span><span>.</span><span>get</span><span>(</span><span>"</span><span>second</span><span>"</span><span>))</span> <span>+</span> <span>1</span><span>if</span> <span>second_counter</span> <span>>=</span> <span>59</span><span>:</span><span># Reset the counter </span> <span>redis_db</span><span>.</span><span>set</span><span>(</span><span>"</span><span>second</span><span>"</span><span>,</span> <span>0</span><span>)</span><span># Increment the minute </span> <span>redis_db</span><span>.</span><span>set</span><span>(</span><span>"</span><span>minute</span><span>"</span><span>,</span> <span>int</span><span>(</span><span>redis_db</span><span>.</span><span>get</span><span>(</span><span>"</span><span>minute</span><span>"</span><span>))</span> <span>+</span> <span>1</span><span>)</span><span>else</span><span>:</span><span># Increment the second </span> <span>redis_db</span><span>.</span><span>set</span><span>(</span><span>"</span><span>second</span><span>"</span><span>,</span> <span>second_counter</span><span>)</span><span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span><span>app</span><span>.</span><span>run</span><span>()</span><span>from</span> <span>flask</span> <span>import</span> <span>Flask</span> <span>from</span> <span>celery</span> <span>import</span> <span>Celery</span> <span>import</span> <span>redis</span> <span>app</span> <span>=</span> <span>Flask</span><span>(</span><span>__name__</span><span>)</span> <span># Add Redis URL configurations </span><span>app</span><span>.</span><span>config</span><span>[</span><span>"</span><span>CELERY_BROKER_URL</span><span>"</span><span>]</span> <span>=</span> <span>"</span><span>redis://redis:6379/0</span><span>"</span> <span>app</span><span>.</span><span>config</span><span>[</span><span>"</span><span>CELERY_RESULT_BACKEND</span><span>"</span><span>]</span> <span>=</span> <span>"</span><span>redis://redis:6379/0</span><span>"</span> <span># Connect Redis db </span><span>redis_db</span> <span>=</span> <span>redis</span><span>.</span><span>Redis</span><span>(</span> <span>host</span><span>=</span><span>"</span><span>redis</span><span>"</span><span>,</span> <span>port</span><span>=</span><span>"</span><span>6379</span><span>"</span><span>,</span> <span>db</span><span>=</span><span>1</span><span>,</span> <span>charset</span><span>=</span><span>"</span><span>utf-8</span><span>"</span><span>,</span> <span>decode_responses</span><span>=</span><span>True</span> <span>)</span> <span># Initialize timer in Redis </span><span>redis_db</span><span>.</span><span>mset</span><span>({</span><span>"</span><span>minute</span><span>"</span><span>:</span> <span>0</span><span>,</span> <span>"</span><span>second</span><span>"</span><span>:</span> <span>0</span><span>})</span> <span># Add periodic tasks </span><span>celery_beat_schedule</span> <span>=</span> <span>{</span> <span>"</span><span>time_scheduler</span><span>"</span><span>:</span> <span>{</span> <span>"</span><span>task</span><span>"</span><span>:</span> <span>"</span><span>app.timer</span><span>"</span><span>,</span> <span># Run every second </span> <span>"</span><span>schedule</span><span>"</span><span>:</span> <span>1.0</span><span>,</span> <span>}</span> <span>}</span> <span># Initialize Celery and update its config </span><span>celery</span> <span>=</span> <span>Celery</span><span>(</span><span>app</span><span>.</span><span>name</span><span>)</span> <span>celery</span><span>.</span><span>conf</span><span>.</span><span>update</span><span>(</span> <span>result_backend</span><span>=</span><span>app</span><span>.</span><span>config</span><span>[</span><span>"</span><span>CELERY_RESULT_BACKEND</span><span>"</span><span>],</span> <span>broker_url</span><span>=</span><span>app</span><span>.</span><span>config</span><span>[</span><span>"</span><span>CELERY_BROKER_URL</span><span>"</span><span>],</span> <span>timezone</span><span>=</span><span>"</span><span>UTC</span><span>"</span><span>,</span> <span>task_serializer</span><span>=</span><span>"</span><span>json</span><span>"</span><span>,</span> <span>accept_content</span><span>=</span><span>[</span><span>"</span><span>json</span><span>"</span><span>],</span> <span>result_serializer</span><span>=</span><span>"</span><span>json</span><span>"</span><span>,</span> <span>beat_schedule</span><span>=</span><span>celery_beat_schedule</span><span>,</span> <span>)</span> <span>@app.route</span><span>(</span><span>"</span><span>/</span><span>"</span><span>)</span> <span>def</span> <span>index_view</span><span>():</span> <span>return</span> <span>"</span><span>Flask-celery task scheduler!</span><span>"</span> <span>@app.route</span><span>(</span><span>"</span><span>/timer</span><span>"</span><span>)</span> <span>def</span> <span>timer_view</span><span>():</span> <span>time_counter</span> <span>=</span> <span>redis_db</span><span>.</span><span>mget</span><span>([</span><span>"</span><span>minute</span><span>"</span><span>,</span> <span>"</span><span>second</span><span>"</span><span>])</span> <span>return</span> <span>f</span><span>"</span><span>Minute: </span><span>{</span><span>time_counter</span><span>[</span><span>0</span><span>]</span><span>}</span><span>, Second: </span><span>{</span><span>time_counter</span><span>[</span><span>1</span><span>]</span><span>}</span><span>"</span> <span>@celery.task</span> <span>def</span> <span>timer</span><span>():</span> <span>second_counter</span> <span>=</span> <span>int</span><span>(</span><span>redis_db</span><span>.</span><span>get</span><span>(</span><span>"</span><span>second</span><span>"</span><span>))</span> <span>+</span> <span>1</span> <span>if</span> <span>second_counter</span> <span>>=</span> <span>59</span><span>:</span> <span># Reset the counter </span> <span>redis_db</span><span>.</span><span>set</span><span>(</span><span>"</span><span>second</span><span>"</span><span>,</span> <span>0</span><span>)</span> <span># Increment the minute </span> <span>redis_db</span><span>.</span><span>set</span><span>(</span><span>"</span><span>minute</span><span>"</span><span>,</span> <span>int</span><span>(</span><span>redis_db</span><span>.</span><span>get</span><span>(</span><span>"</span><span>minute</span><span>"</span><span>))</span> <span>+</span> <span>1</span><span>)</span> <span>else</span><span>:</span> <span># Increment the second </span> <span>redis_db</span><span>.</span><span>set</span><span>(</span><span>"</span><span>second</span><span>"</span><span>,</span> <span>second_counter</span><span>)</span> <span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span> <span>app</span><span>.</span><span>run</span><span>()</span>from flask import Flask from celery import Celery import redis app = Flask(__name__) # Add Redis URL configurations app.config["CELERY_BROKER_URL"] = "redis://redis:6379/0" app.config["CELERY_RESULT_BACKEND"] = "redis://redis:6379/0" # Connect Redis db redis_db = redis.Redis( host="redis", port="6379", db=1, charset="utf-8", decode_responses=True ) # Initialize timer in Redis redis_db.mset({"minute": 0, "second": 0}) # Add periodic tasks celery_beat_schedule = { "time_scheduler": { "task": "app.timer", # Run every second "schedule": 1.0, } } # Initialize Celery and update its config celery = Celery(app.name) celery.conf.update( result_backend=app.config["CELERY_RESULT_BACKEND"], broker_url=app.config["CELERY_BROKER_URL"], timezone="UTC", task_serializer="json", accept_content=["json"], result_serializer="json", beat_schedule=celery_beat_schedule, ) @app.route("/") def index_view(): return "Flask-celery task scheduler!" @app.route("/timer") def timer_view(): time_counter = redis_db.mget(["minute", "second"]) return f"Minute: {time_counter[0]}, Second: {time_counter[1]}" @celery.task def timer(): second_counter = int(redis_db.get("second")) + 1 if second_counter >= 59: # Reset the counter redis_db.set("second", 0) # Increment the minute redis_db.set("minute", int(redis_db.get("minute")) + 1) else: # Increment the second redis_db.set("second", second_counter) if __name__ == "__main__": app.run()
Enter fullscreen mode Exit fullscreen mode
Let’s update the entrypoint.js
to run both Celery worker and beat server as background processes.
<span>#!/bin/sh</span><span># Run Celery worker</span>celery <span>-A</span> app.celery worker <span>--loglevel</span><span>=</span>INFO <span>--detach</span> <span>--pidfile</span><span>=</span><span>''</span><span># Run Celery Beat</span>celery <span>-A</span> app.celery beat <span>--loglevel</span><span>=</span>INFO <span>--detach</span> <span>--pidfile</span><span>=</span><span>''</span>flask run <span>--host</span><span>=</span>0.0.0.0 <span>--port</span> 5000<span>#!/bin/sh</span> <span># Run Celery worker</span> celery <span>-A</span> app.celery worker <span>--loglevel</span><span>=</span>INFO <span>--detach</span> <span>--pidfile</span><span>=</span><span>''</span> <span># Run Celery Beat</span> celery <span>-A</span> app.celery beat <span>--loglevel</span><span>=</span>INFO <span>--detach</span> <span>--pidfile</span><span>=</span><span>''</span> flask run <span>--host</span><span>=</span>0.0.0.0 <span>--port</span> 5000#!/bin/sh # Run Celery worker celery -A app.celery worker --loglevel=INFO --detach --pidfile='' # Run Celery Beat celery -A app.celery beat --loglevel=INFO --detach --pidfile='' flask run --host=0.0.0.0 --port 5000
Enter fullscreen mode Exit fullscreen mode
Our very own timer
The application is only for demonstration purpose. The counter won’t be accurate as the task processing time is not taken into account while calculating time.
Monitoring events
Celery has a rich support for monitoring various statistics for tasks, workers and events. We need to log into the container to enable and monitor events.
docker <span>exec</span> <span>-it</span> flask_dev_container bashdocker <span>exec</span> <span>-it</span> flask_dev_container bashdocker exec -it flask_dev_container bash
Enter fullscreen mode Exit fullscreen mode
Enable and list all events
celery <span>-A</span> app.celery control enable_eventscelery <span>-A</span> app.celery control eventscelery <span>-A</span> app.celery control enable_events celery <span>-A</span> app.celery control eventscelery -A app.celery control enable_events celery -A app.celery control events
Enter fullscreen mode Exit fullscreen mode
This spins up a nice interactive terminal ui listing all the details of the scheduled tasks.
Conclusion
In this post I have used Celery as an better alternative to crontabs even though the primary purpose of Celery is processing tasks queues. Both Celery worker and beat server can be run on different containers as running background processes on the web container is not regarded as best practice.
Unless you are creating a stupid timer application.
The above mentioned code can be found here. repo
Adios!
暂无评论内容