Last week, we discussed a quick weekend project on building a word game. That project was sparked by a conversation suggesting I integrate LLM functionality into my experimental chatbot, BigMeow. Currently, I’m working on another chatbot project, and I’m reusing some code from BigMeow. Back then, I struggled a lot, especially with the lack of information on asynchronous programming in Python. Using the Telegram Bot API as an example, I hope to provide some helpful insights for those interested in this area.
A cute photo generated by Copilot on the topic
From Synchronous Simplicity to Asynchronous Challenges
The previous incarnation of BigMeow was built with the synchronous version of python-telegram-bot. It was a simple bot to report the current petrol price in Malaysia, which would be updated every week. Because it is synchronous, it is very simple; I just needed to copy-pasta from the tutorial, then add in handlers to fetch and send petrol prices.
I don’t follow the trend of language development closely. Pre-ES6 JavaScript was my first experience in writing code, where things happen asynchronously. We could set a piece of code to run on a timer with setTimeout or setInterval, or fetch a page with jquery.ajax (pre-fetch API era). However, in the code there’s no way to know when it is done, nor direct access to the result upon completion.
I began async programming with JavaScript. Photo by Claudio Schwarz on Unsplash
It took me a while to rewire my brain to get used to this asynchronous code execution. Where things may happen outside normal flow of execution, and extra care is needed when using the returned value. Code simply has to be written in another way with different mindset. Eventually, a convenient construct such as Promise gets built into the language, and async-await syntax follows.
When async-await was introduced in ES6, I already moved on to Python full-time. Most libraries I used daily are still synchronous to this date, though some started exploring the possibility. The telegram library, I mentioned earlier, was the first one to announce its switch completely to the asynchronous world.
Async-await syntax brings the familiarity of synchronous programming, so it shouldn’t be a big deal, right?
Diving into Asynchronous Development: Webhooks, Queues, and AsyncIO
Messages to be put in a queue. Photo by Joanna Kosinska on Unsplash
It could be the case in other language, but not so much in Python. The experience does get better over time, but I still find myself suddenly had to learn about things like asyncio, event loop, coroutines etc. To be fair, I did learn about greenlets at work before this, but didn’t really have a chance to apply that knowledge. However, this new asyncio feels completely different.
Anyway, enough reminiscing about the past, this is not intended to be the ultimate guide on asynchronous programming, but a more pragmatic quick-start guide I wish I had back then. Assuming we are in a properly managed project (either through tools like poetry or uv), let’s start with a new module telegram.py for our telegram bot. Remember to add python-telegram-bot dependency to the project.
import asynciofrom threading import Eventfrom telegram import Updatefrom telegram.ext import (ApplicationBuilder,CommandHandler,ContextTypes,MessageHandler,filters,)application = ApplicationBuilder().token("Your telegram token").build()async def run(exit_event: Event) -> None:async with application:await setup()# Starting the botawait application.start()# Receive messages# Wait for exit signalawait asyncio.to_thread(exit_event.wait)# Stopping the botawait application.stop()async def setup() -> None:passimport asyncio from threading import Event from telegram import Update from telegram.ext import ( ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters, ) application = ApplicationBuilder().token("Your telegram token").build() async def run(exit_event: Event) -> None: async with application: await setup() # Starting the bot await application.start() # Receive messages # Wait for exit signal await asyncio.to_thread(exit_event.wait) # Stopping the bot await application.stop() async def setup() -> None: passimport asyncio from threading import Event from telegram import Update from telegram.ext import ( ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters, ) application = ApplicationBuilder().token("Your telegram token").build() async def run(exit_event: Event) -> None: async with application: await setup() # Starting the bot await application.start() # Receive messages # Wait for exit signal await asyncio.to_thread(exit_event.wait) # Stopping the bot await application.stop() async def setup() -> None: pass
Enter fullscreen mode Exit fullscreen mode
We will build a web application to host a webhook to receive messages from users interacting with our bot. However, before that, we need a queue to relay the message from our webhook to our bot. Considering we will be referencing this queue in both the bot and the web application, let’s define this in a separate settings.py. We also need a secret token to only allow legitimate requests from Telegram; therefore, let’s also define it here.
import multiprocessingtelegram_queue = multiprocessing.Queue()TELEGRAM_SECRET_TOKEN = "Some random token"import multiprocessing telegram_queue = multiprocessing.Queue() TELEGRAM_SECRET_TOKEN = "Some random token"import multiprocessing telegram_queue = multiprocessing.Queue() TELEGRAM_SECRET_TOKEN = "Some random token"
Enter fullscreen mode Exit fullscreen mode
The web application and bot will run in their separate processes, hence we are using a process-safe queue. Next, we will use FastAPI (include fastapi[standard] to the list of dependencies) to build the web application, with the following code in web.py.
import asynciofrom functools import partialfrom threading import Eventfrom typing import Annotatedimport uvicornfrom fastapi import FastAPI, Header, Query, Request, Responseimport settingsapp = FastAPI()async def run(exit_event: Event) -> None:# Configure the serverserver = uvicorn.Server(uvicorn.Config("yourapp.web:app",host="0.0.0.0",port=80,log_level="info",))# Start the webserverasyncio.create_task(server.serve())await asyncio.to_thread(exit_event.wait)# Stop the webserverawait server.shutdown()import asyncio from functools import partial from threading import Event from typing import Annotated import uvicorn from fastapi import FastAPI, Header, Query, Request, Response import settings app = FastAPI() async def run(exit_event: Event) -> None: # Configure the server server = uvicorn.Server( uvicorn.Config( "yourapp.web:app", host="0.0.0.0", port=80, log_level="info", ) ) # Start the webserver asyncio.create_task(server.serve()) await asyncio.to_thread(exit_event.wait) # Stop the webserver await server.shutdown()import asyncio from functools import partial from threading import Event from typing import Annotated import uvicorn from fastapi import FastAPI, Header, Query, Request, Response import settings app = FastAPI() async def run(exit_event: Event) -> None: # Configure the server server = uvicorn.Server( uvicorn.Config( "yourapp.web:app", host="0.0.0.0", port=80, log_level="info", ) ) # Start the webserver asyncio.create_task(server.serve()) await asyncio.to_thread(exit_event.wait) # Stop the webserver await server.shutdown()
Enter fullscreen mode Exit fullscreen mode
Both FastAPI and python-telegram-bot are asynchronous, thus most of our functions are marked async. The run functions are the entry point to both the bot and web application, and the functions both expect an Event object that serves as a cue to exit when needed.
Next, we implement the actual webhook in web.py
@app.post("/webhook/telegram", include_in_schema=False)async def telegram_webhook(request: Request,x_telegram_bot_api_secret_token: Annotated[str, Header(pattern=settings.TELEGRAM_SECRET_TOKEN, strict=True)],) -> None:# pass the message to the relay queueasyncio.create_task(asyncio.to_thread(partial(settings.telegram_queue.put, await request.json()),))@app.post("/webhook/telegram", include_in_schema=False) async def telegram_webhook( request: Request, x_telegram_bot_api_secret_token: Annotated[ str, Header(pattern=settings.TELEGRAM_SECRET_TOKEN, strict=True) ], ) -> None: # pass the message to the relay queue asyncio.create_task( asyncio.to_thread( partial(settings.telegram_queue.put, await request.json()), ) )@app.post("/webhook/telegram", include_in_schema=False) async def telegram_webhook( request: Request, x_telegram_bot_api_secret_token: Annotated[ str, Header(pattern=settings.TELEGRAM_SECRET_TOKEN, strict=True) ], ) -> None: # pass the message to the relay queue asyncio.create_task( asyncio.to_thread( partial(settings.telegram_queue.put, await request.json()), ) )
Enter fullscreen mode Exit fullscreen mode
In the code above, to authenticate the webhook requests, we want to ensure each request has the token defined in settings.py is present in the request header X-Telegram-Bot-Api-Secret-Token. Once that is checked, we pass the incoming data to our process-safe queue. As the operation of putting data into the queue is blocking, we use asyncio.to_thread to execute the operation in a separate thread.
Now we can talk a little about asynchronous programming in python after seeing the code so far. As shown above, we find that async-await brings the familiarity of synchronous programming. In fact, if we want to, we can simply put an await keyword in front of every async function and the code would still work, albeit possibly more inefficient.
The beauty of asynchronous programming is that we can schedule certain parts of the program to run concurrently. Putting the await keyword before the function call, causes the execution to wait for results to return. However, there are times we want to ensure that function is called, or we only care about the progress much later. This is when we schedule it to run by using asyncio.create_task.
The web application part is practically done, we can now move on to the bot telegram.py. Previously, we relayed the messages sent by users to a queue. To consume the queue, we will implement the following coroutine, which is another term for functions defined with the async keyword.
import queuefrom contextlib import suppressfrom typing import Awaitable, Callableasync def loop_queue(coro_func: Callable[[], Awaitable[None]]) -> None:while True:await coro_func()async def message_consume() -> None:with suppress(queue.Empty):await application.update_queue.put(Update.de_json(await asyncio.to_thread(partial(settings.telegram_queue.get,timeout=10,)),application.bot,))import queue from contextlib import suppress from typing import Awaitable, Callable async def loop_queue(coro_func: Callable[[], Awaitable[None]]) -> None: while True: await coro_func() async def message_consume() -> None: with suppress(queue.Empty): await application.update_queue.put( Update.de_json( await asyncio.to_thread( partial( settings.telegram_queue.get, timeout=10, ) ), application.bot, ) )import queue from contextlib import suppress from typing import Awaitable, Callable async def loop_queue(coro_func: Callable[[], Awaitable[None]]) -> None: while True: await coro_func() async def message_consume() -> None: with suppress(queue.Empty): await application.update_queue.put( Update.de_json( await asyncio.to_thread( partial( settings.telegram_queue.get, timeout=10, ) ), application.bot, ) )
Enter fullscreen mode Exit fullscreen mode
In message_consume we first retrieve the message from settings.telegram_queue. A timeout is set as we do not want it to stall indefinitely, such that it allows other async operations to run concurrently. Should we fail to receive anything when the timeout happens, we suppress the error and move on. Otherwise, we parse the message with Update.de_json and pass it to the application queue for further process. The helper coroutine loop_queue was written to avoid excessive nesting in message_consume.
We just want to ensure message_consume is scheduled, therefore we wrap it with asyncio.create_task in the run coroutine, as follows:
async def run(exit_event: Event) -> None:async with application:await setup()# Starting the botawait application.start()# Receive messagesasyncio.create_task(loop_queue(message_consume))# Wait for exit signalawait asyncio.to_thread(exit_event.wait)# Stopping the botawait application.stop()async def run(exit_event: Event) -> None: async with application: await setup() # Starting the bot await application.start() # Receive messages asyncio.create_task(loop_queue(message_consume)) # Wait for exit signal await asyncio.to_thread(exit_event.wait) # Stopping the bot await application.stop()async def run(exit_event: Event) -> None: async with application: await setup() # Starting the bot await application.start() # Receive messages asyncio.create_task(loop_queue(message_consume)) # Wait for exit signal await asyncio.to_thread(exit_event.wait) # Stopping the bot await application.stop()
Enter fullscreen mode Exit fullscreen mode
Next, we want to define the behavior of messages received. Let’s revise the setup coroutine.
async def setup() -> None:# Initializing applicationasyncio.create_task(application.bot.set_webhook("https://the/address/to/the/webhook/telegram/",allowed_updates=Update.ALL_TYPES,secret_token=settings.TELEGRAM_SECRET_TOKEN,))application.add_handlers((CommandHandler("start", update_handle_welcome),# Other handlers))async def setup() -> None: # Initializing application asyncio.create_task( application.bot.set_webhook( "https://the/address/to/the/webhook/telegram/", allowed_updates=Update.ALL_TYPES, secret_token=settings.TELEGRAM_SECRET_TOKEN, ) ) application.add_handlers( ( CommandHandler("start", update_handle_welcome), # Other handlers ) )async def setup() -> None: # Initializing application asyncio.create_task( application.bot.set_webhook( "https://the/address/to/the/webhook/telegram/", allowed_updates=Update.ALL_TYPES, secret_token=settings.TELEGRAM_SECRET_TOKEN, ) ) application.add_handlers( ( CommandHandler("start", update_handle_welcome), # Other handlers ) )
Enter fullscreen mode Exit fullscreen mode
The first statement registers the webhook, and it requires the URL to the endpoint, as well as the secret token to authenticate the web request. The second statement registers the behaviour of the bot. For now, we want to respond to the /start command. The implementation of the handler is shown below,
async def update_handle_welcome(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:try:assert update.effective_chatawait context.bot.send_message(update.effective_chat.id, "Welcome to my bot")except AssertionError as e:logger.error(e)async def update_handle_welcome( update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: try: assert update.effective_chat await context.bot.send_message(update.effective_chat.id, "Welcome to my bot") except AssertionError as e: logger.error(e)async def update_handle_welcome( update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: try: assert update.effective_chat await context.bot.send_message(update.effective_chat.id, "Welcome to my bot") except AssertionError as e: logger.error(e)
Enter fullscreen mode Exit fullscreen mode
Now every time the user starts a conversation with our bot, it responds with a friendly welcome message. Adding another handler is easy, for instance, we want to echo the same message users send us, we simply append to the tuple in the application.add_handlers statement.
async def setup() -> None:...application.add_handlers((CommandHandler("start", update_handle_welcome),# Other handlersMessageHandler(filters.TEXT, update_handle_text),))...async def update_handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:try:assert update.effective_chatassert update.messageassert update.message.from_userassert update.message.textawait context.bot.send_message(update.effective_chat.id,update.message.text,reply_to_message_id=update.message.id,allow_sending_without_reply=True,)except AssertionError as e:logger.exception(e)async def setup() -> None: ... application.add_handlers( ( CommandHandler("start", update_handle_welcome), # Other handlers MessageHandler(filters.TEXT, update_handle_text), ) ) ... async def update_handle_text( update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: try: assert update.effective_chat assert update.message assert update.message.from_user assert update.message.text await context.bot.send_message( update.effective_chat.id, update.message.text, reply_to_message_id=update.message.id, allow_sending_without_reply=True, ) except AssertionError as e: logger.exception(e)async def setup() -> None: ... application.add_handlers( ( CommandHandler("start", update_handle_welcome), # Other handlers MessageHandler(filters.TEXT, update_handle_text), ) ) ... async def update_handle_text( update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: try: assert update.effective_chat assert update.message assert update.message.from_user assert update.message.text await context.bot.send_message( update.effective_chat.id, update.message.text, reply_to_message_id=update.message.id, allow_sending_without_reply=True, ) except AssertionError as e: logger.exception(e)
Enter fullscreen mode Exit fullscreen mode
Finally, we want to write code to run both the bot and web application in parallel, each running in a separate process. For that, we schedule them with a ProcessPoolExecutor
import asyncioimport threadingfrom concurrent.futures import ProcessPoolExecutorfrom telegram import run as tg_runfrom web import run as web_rundef process_run(func, pexit_event: threading.Event) -> None:asyncio.run(func(pexit_event))with ProcessPoolExecutor() as executor:executor.submit(process_run, tg_run, exit_event)executor.submit(process_run, web_run, exit_event)import asyncio import threading from concurrent.futures import ProcessPoolExecutor from telegram import run as tg_run from web import run as web_run def process_run(func, pexit_event: threading.Event) -> None: asyncio.run(func(pexit_event)) with ProcessPoolExecutor() as executor: executor.submit(process_run, tg_run, exit_event) executor.submit(process_run, web_run, exit_event)import asyncio import threading from concurrent.futures import ProcessPoolExecutor from telegram import run as tg_run from web import run as web_run def process_run(func, pexit_event: threading.Event) -> None: asyncio.run(func(pexit_event)) with ProcessPoolExecutor() as executor: executor.submit(process_run, tg_run, exit_event) executor.submit(process_run, web_run, exit_event)
Enter fullscreen mode Exit fullscreen mode
Ah, I forgot about the exit_event we passed to the run coroutines. So both separate applications are expected to run indefinitely. In the past, I’ve seen tutorials including an infinite loop to the end, so the application does not exit prematurely. However we could also pass a process-safe Event object and prompt the application to safely exit as required, for example when we kill the process with CTRL+C.
import signalfrom functools import partialdef shutdown_handler(_signum, _frame, exit_event: threading.Event) -> None:exit_event.set()manager = multiprocessing.Manager()exit_event = manager.Event()for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):signal.signal(s, partial(shutdown_handler, exit_event=exit_event))with ProcessPoolExecutor() as executor:executor.submit(process_run, tg_run, exit_event)executor.submit(process_run, web_run, exit_event)import signal from functools import partial def shutdown_handler( _signum, _frame, exit_event: threading.Event ) -> None: exit_event.set() manager = multiprocessing.Manager() exit_event = manager.Event() for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT): signal.signal( s, partial(shutdown_handler, exit_event=exit_event) ) with ProcessPoolExecutor() as executor: executor.submit(process_run, tg_run, exit_event) executor.submit(process_run, web_run, exit_event)import signal from functools import partial def shutdown_handler( _signum, _frame, exit_event: threading.Event ) -> None: exit_event.set() manager = multiprocessing.Manager() exit_event = manager.Event() for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT): signal.signal( s, partial(shutdown_handler, exit_event=exit_event) ) with ProcessPoolExecutor() as executor: executor.submit(process_run, tg_run, exit_event) executor.submit(process_run, web_run, exit_event)
Enter fullscreen mode Exit fullscreen mode
The final steps are to register the bot with BotFather, replace the placeholders in the code (the webhook URL and the Telegram bot token), and then the bot is ready to go.
Lessons Learned and Architectural Insights
Better QOL developing AsyncIO apps now. Photo by Usman Yousaf on Unsplash
The quality-of-life is definitely improving for application development that utilizes asyncio. For instance we do not have to explicitly set up and manage the event loop any more in recent Python releases. However it can still be challenging in finding tutorials in building applications involving multiple asyncio libraries such as this.
BigMeow started with a goal: I want to run multiple chatbots within the same application. I first started with Telegram and Discord. The earlier iterations were done purely by brute force without knowing much about async-programming in Python. However, it worked, and I just left it be for a while.
It started to break, when I was attempting to switch the telegram bot to receive user messages via a webhook. I couldn’t find a way to properly host a web application within the mess I made. That’s when I started to go around looking for articles and tutorials on the topic. One remarkable series of articles I found was written by Lynn Root on the topic.
The series is very well-written, and provides a good introduction to asyncio. It definitely helped me to properly learn how to do asynchronous programming in Python. Though it was written for earlier versions of Python, it is still a useful guide for newcomers.
Both BigMeow and my current chatbot project owes quite a lot to Root’s tutorial. Hopefully my contribution through this article helps to clear doubts for people who are interested in asynchronous programming.
Some ideas for improvement. Photo by ameenfahmy on Unsplash
The bot we covered today is not perfect, but it is a good enough start for further exploration. Concurrency does not mean parallelism, so cramming both the bot and web application within one single process may not be a great idea. Though through clever scheduling, it reduces the risk of blocking, but if there are too many tasks scheduled lagging is inevitable.
Separating them into separate process, also allow us to split them into individual programs without much change in code in the future. One of the changes, would likely be replacing the process-safe queue, to something that allows inter-process communication, or even message queues systems.
While this tutorial is on telegram bot and web application, it can be adapted to other applications with multiple components by following the general design. If the application does not need to run indefinitely, the exit_event part can be omitted, and the program ends upon completion.
The article is quite a bit longer than I initially expected. So I should probably end here. It is not hard to build a chatbot, and with services offering LLM APIs getting more accessible these days, it is easier than ever to integrate chatbots and LLM together.
Lastly, thanks for reading, and I shall write again, next week.
While a large language model provided editorial assistance to refine the structure and clarity of this article, all ideas, concepts, and code are my own. For project collaborations or job opportunities, please connect with me here on Medium or via my LinkedIn profile.
暂无评论内容