Frameworks like FastAPI are built from the ground up on top of asynchronous event loop. It makes I/O (input/output) operations execute in parallel (almost). But things get complicated when a third-party library that you really need doesn't support an async interface.
What is wrong with that you may ask? When you execute a blocking (synchronous) function inside an asynchronous event loop it stops the whole loop. The entire application becomes unresponsive until the blocking operation is completed.
This article discusses different options to execute synchronous functions in an asynchronous event loop.
A short introduction to asynchronous programming
In traditional synchronous programming, a function executes from start to finish without interruption. However, in the case of I/O-bound operations like network requests or file reads, we often need to wait for some external resource to become available before proceeding with data-handling execution.
This is where asyncio with an async/await design pattern comes in. Asynchronous functions, also called coroutines, are a special type of function that can pause execution at specific points and resume it later when needed. These “specific points” are called yield or await, which indicate the places where the coroutine should pause execution while waiting for an external resource and allow other tasks to run.
When a coroutine yields, it returns control back to the event loop (more on this in a moment) allowing other asynchronous functions to execute while waiting for I/O operations to complete. This is known as cooperative multitasking or coroutine-based concurrency, where multiple coroutines can yield and resume their execution without blocking each other. This allows for efficient handling of I/O-bound operations, such as making HTTP requests while using only one thread on a single CPU core.
Here’s an example:
#!/usr/bin/env python3
# pip install aiohttp==3.9.5
import asyncio
import aiohttp
async def fetch_data(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.text()
return data
if __name__ == '__main__':
# Create a coroutine from the async function
# The coroutine does not run until it is passed to the event loop
coroutine = fetch_data('https://example.com')
# Run the coroutine in the event loop and get the response
response = asyncio.run(coroutine)
print(response)
In this example, the fetch_data function is an asynchronous coroutine that uses the aiohttp library to make a GET request and retrieve some data. The async with statement ensures that the session is properly closed when we’re done.
In this comprehensive guide, we explore the asynchronous design pattern in Python using the asyncio event loop. We began by introducing the basics of asyncio and how to write an asynchronous function, namely a coroutine that can be executed in the event loop.
Next we highlight the advantages of asynchronicity, including improved performance, responsiveness, and scalability. However, we also acknowledge the challenges of incorporating synchronous code into an asynchronous environment. This can lead to blocking operations, poor resource utilization and decreased overall performance. To mitigate these issues, we present solutions using the standard Python library, such as concurrent.futures and asyncio. You will get a comprehensive understanding of asynchronicity in Python, including its benefits, challenges and best practices. Whether you’re new to asyncio or looking to improve your existing knowledge, we hope this article will equip you with the skills and insights necessary to tackle the complexity of asynchronous programming.
Understanding asynchronous functions in Python
You might be familiar with the infamous GIL (Global Interpreter Lock). This is a core feature in Python that makes multithreading slow. The GIL is a kind of scheduler. It allows execution of only one thread at a time. It looks like all the threads work simultaneously, but in fact they don't. So how does asyncio work to achieve asynchronicity?
As always the devil is in the details. Asyncio utilizes threads to execute coroutines. And it is supposed to be used for I/O operations, hence async-IO. The illusion of concurrency comes from the fact that I/O operations are slower than GIL, allowing switching between threads.
A single threaded event loop is constantly checking for callbacks from coroutines in a loop. And if a callback (event) returns, the task is over and the program can move on to the next line of code.
But what actually is an event loop? It is a central component that manages the execution of coroutines. It is responsible for:
- Scheduling: Deciding which coroutine should run next based on their readiness (i.e. whether they’ve yielded or not).
- Yielding/Await: Pausing the current coroutine and allowing another one to execute.
- Resuming: Restarting a paused coroutine when its dependencies are met (i.e. the I/O operation is done).
In Python, the asyncio standard library provides an event loop implementation that allows you to write asynchronous code using coroutines. It shines the most when dealing with I/O-bound operations like network requests or file reads. The time spent waiting for external resources to become available is often much longer than the actual computation required. The system spends most of its time idle (waiting for I/O) making it ideal for parallel execution.
You can spawn multiple HTTP requests at the same time. And since the function will spend most of the runtime waiting for data transfer through the network, the event loop can jump to another coroutine. Then it will jump to another, and another, until all of them return a result. It doesn’t matter in which order they complete because the event loop is callback-driven, executing functions only when the data is ready to process.
Event loop efficiency comes from:
- allowing other tasks to run while waiting for I/O to complete,
- reducing the number of threads or processes needed to handle concurrent requests,
- and management of multiple concurrent connections to reduce the overall latency and improve responsiveness.
But the event loop is not a silver bullet that solves all the problems with concurrency. While it shines with I/O-bound operations it completely flops with CPU-bound ones. When an atomic CPU-bound operation is loaded into CPU registers, there is no way to pause it from the Python application level. The computation has to finish and return control back to the application. You can split the operation into smaller chunks by calling yield in-between. The yield keyword creates a breakpoint that returns control back to the application. But then it will resume from this breakpoint and block the event loop again. There will be no performance gain since all the chunks are blocking operations. The goal is to run the code in such a way that it won't block the event loop. No matter if it is one atomic operation or a number of small chunks. We will deal with this later in the article. But firstly, let’s discuss the struggle with blocking I/O operation.
The sync vs. async struggle: challenges
While asynchronous programming offers many benefits, there are still situations where you have no choice and you need to execute synchronous I/O functions. Often, when you rely on a library that doesn’t provide a designated async interface. This can be problematic when executing in the event loop. It will block other tasks from running until the operation completes. But why?
The blocking situation occurs when a greedy function doesn’t return the control back to the event loop. It doesn’t yield/await which allows it to pause further execution and resume later.
Here’s an example of how synchronous functions can block other tasks:
#!/usr/bin/env python3
import asyncio
import time
def blocking_operation() -> None:
# Simulate a long-running blocking operation
# using synchronous time.sleep
time.sleep(2)
async def with_blocking_operation() -> None:
# Simulate a non-blocking operation using asyncio.sleep
# and then call the blocking operation
await asyncio.sleep(1)
blocking_operation()
async def without_blocking_operation() -> None:
# Simulate a non-blocking operation
# using asyncio.sleep with a total duration equal to with_blocking_operation
await asyncio.sleep(3)
async def main() -> None:
print("Starting concurrent execution with blocking operation")
start = time.perf_counter()
await asyncio.gather(with_blocking_operation(), with_blocking_operation())
stop = time.perf_counter()
print(f"Done in {stop - start:.2f} seconds.")
print("Starting concurrent execution without blocking operation")
start = time.perf_counter()
await asyncio.gather(without_blocking_operation(), without_blocking_operation())
stop = time.perf_counter()
print(f"Done in {stop - start:.2f} seconds.")
if __name__ == '__main__':
# Create a coroutine from the async function
coroutine = main()
# Run the coroutine in the event loop
asyncio.run(coroutine)
And the output of the script:
(.venv) python blocking_operation.py
Starting concurrent execution with blocking operation
Done in 5.00 seconds.
Starting concurrent execution without blocking operation
Done in 3.00 seconds.
The blocking operation blocks the event loop for two seconds every time which adds up during the execution. Meanwhile, the non-blocking operations that individually took the same amount of time were able to finish at the same time; effectively improving the execution time by two seconds in total.
How to sync up async worlds?
Now that you’ve seen the consequences of using blocking functions within an asynchronous event loop, let’s explore some solutions to handle them effectively. There are a few ways of wrapping blocking functions with non-blocking tasks.
The solution depends on the operation type. The I/O-bound operations are easier to handle. We can offload them straight to the event loop as a future. The event loop implementation offers an easy interface to do so, while CPU-bound operations must be handled a bit differently. Let’s focus on I/O operations first.
The available options are:
- run_in_executor(): This can be used for both CPU-intensive and I/O-bound tasks.
- to_thread(): This runs a blocking function in a separate thread, and can only be used to run I/O-bound functions.
Let’s modify the previous example to use the to_thread() method:
#!/usr/bin/env python3
import asyncio
import time
def blocking_operation() -> None:
# Simulate a long-running blocking operation
# using synchronous time.sleep
time.sleep(3)
async def non_blocking_operation() -> None:
# Simulate a non-blocking operation
# using non-blocking asyncio.sleep
await asyncio.sleep(3)
async def main() -> None:
print("Starting concurrent execution with blocking operation")
blocking_in_thread = asyncio.to_thread(blocking_operation)
non_blocking = non_blocking_operation()
start = time.perf_counter()
await asyncio.gather(blocking_in_thread, non_blocking)
stop = time.perf_counter()
print(f"Done in {stop - start:.2f} seconds.")
if __name__ == '__main__':
# Create a coroutine from the async function
coroutine = main()
# Run the coroutine in the event loop
asyncio.run(coroutine)
And the output of the script:
(.venv) python blocking_operation_async.py
Starting concurrent execution with blocking operation
Done in 3.00 seconds.
As you can see, both the blocking and non-blocking operations finished at the same time. Wrapping the blocking function with asyncio.to_thread offloads the execution to a separate thread that is outside of the event loop. The asyncio library keeps track of the blocking function and returns from it when it is done.
Now let’s move on to the last and most robust tool we have to handle synchronous functions: the run_in_executor() interface together with the ThreadPoolExecutor and ProcessPoolExecutor; two powerful tools for managing concurrent tasks.
The ThreadPoolExecutor is a class from the concurrent.futures module in the standard library. It allows you to manage a pool of worker threads, which can be used to execute I/O-bound tasks concurrently. While the ProcessPoolExecutor is another class from the concurrent.futures module that allows managing a pool of worker processes. If you need to execute tasks independently, without sharing memory or resources, and most importantly the tasks are CPU-bound operations, you should always lean towards executing them in the process pool.
Let’s modify the last example one more time.
#!/usr/bin/env python3
import asyncio
import time
import concurrent.futures
def blocking_io_operation() -> None:
# Simulate a long-running blocking operation
# using synchronous time.sleep
time.sleep(3)
def blocking_cpu_operation() -> None:
# Simulate a long-running CPU-bound operation
# using a busy-wait loop
x = 0
for i in range(10**7):
x += i
async def main() -> None:
# Get the default event loop.
# It will be created if it doesn't exist yet.
loop = asyncio.get_running_loop()
print("Starting default pool executor blocking I/O operation")
# In the first example we use the default executor
# created for the event loop
future1 = loop.run_in_executor(None, blocking_cpu_operation)
future2 = loop.run_in_executor(None, blocking_cpu_operation)
future3 = loop.run_in_executor(None, blocking_cpu_operation)
# wait for all futures to finish
start = time.perf_counter()
await asyncio.gather(future1, future2, future3)
stop = time.perf_counter()
print(f"Done in {stop - start:.2f} seconds.")
print("Starting thread pool executor blocking I/O operation")
with concurrent.futures.ThreadPoolExecutor() as pool:
# In the second example we create a new thread pool executor
# and pass it to run_in_executor
future1 = loop.run_in_executor(pool, blocking_io_operation)
future2 = loop.run_in_executor(pool, blocking_io_operation)
future3 = loop.run_in_executor(pool, blocking_io_operation)
# wait for all futures to finish
start = time.perf_counter()
await asyncio.gather(future1, future2, future3)
stop = time.perf_counter()
print(f"Done in {stop - start:.2f} seconds.")
print("Starting default pool executor blocking CPU operation")
with concurrent.futures.ProcessPoolExecutor() as pool:
# In the third example we create a new process pool executor
# and pass it to run_in_executor
future1 = loop.run_in_executor(pool, blocking_cpu_operation)
future2 = loop.run_in_executor(pool, blocking_cpu_operation)
future3 = loop.run_in_executor(pool, blocking_cpu_operation)
# wait for all futures to finish
start = time.perf_counter()
await asyncio.gather(future1, future2, future3)
stop = time.perf_counter()
print(f"Done in {stop - start:.2f} seconds.")
if __name__ == '__main__':
# Create a coroutine from the async function
coroutine = main()
# Run the coroutine in the event loop
asyncio.run(coroutine)
And the execution result:
(.venv) python blocking_operation_in_executor.py
Starting default pool executor blocking I/O operation
Done in 3.00 seconds.
Starting thread pool executor blocking I/O operation
Done in 3.00 seconds.
Starting default pool executor blocking CPU operation
Done in 0.38 seconds.
In the above examples we run three I/O-bound operations in the default thread pool delivered by the asyncio loop, and three I/O-bound operations in the custom thread pool. All of them run concurrently and finish in three seconds.
Because the event loop has its own default thread pool you might get the impression that asyncio as a whole works as a thread pool. And you would be right. Well... almost. The asyncio event loop itself runs in the main thread of the Python process. But it can spawn a thread pool on demand when asynchronous code is executed or when a function is called in a ThreadPoolExecutor context. There is no magic in it. By default, asyncio spawns 32 threads or CPU count + 4; whichever is lower. It is possible to create a custom thread pool with more threads but increasing the number of workers beyond a certain point may not yield better performance due to the overhead from thread management and context switching. So, be careful when you create custom thread pools to not oversaturate the machine.
The last example was a CPU-bound calculation that was offloaded to a separate process pool to utilize multiprocessing. By default, the maximum number of worker processes is set to the number of CPU cores available. The process pool can also be configured with a custom number of worker processes. So again, be careful to not oversaturate the machine.
The above example showcases how to run blocking functions, but does it really execute concurrently when we mix synchronous and asynchronous operations? Let’s find out!
#!/usr/bin/env python3
import asyncio
import time
import concurrent.futures
async def non_blocking_operation() -> None:
# Simulate a non-blocking operation
# using non-blocking asyncio.sleep
await asyncio.sleep(3)
def blocking_cpu_operation() -> None:
# Simulate a long-running CPU-bound operation
# using a busy-wait loop
x = 0
for i in range(10**7):
x += i
async def main() -> None:
# Get the default event loop.
# It will be created if it doesn't exist yet.
loop = asyncio.get_running_loop()
print("Starting process pool executor blocking CPU operation")
# Create io_bound_operation in the event loop
io_bound_operation = non_blocking_operation()
with concurrent.futures.ProcessPoolExecutor() as pool:
# Create cpu_bound_operation in the process pool executor
cpu_bound_operation = loop.run_in_executor(pool, blocking_cpu_operation)
start = time.perf_counter()
await asyncio.gather(cpu_bound_operation, io_bound_operation)
stop = time.perf_counter()
print(f"Done in {stop - start:.2f} seconds.")
if __name__ == '__main__':
# Create a coroutine from the async function
coroutine = main()
# Run the coroutine in the event loop
asyncio.run(coroutine)
And the result:
(.venv) python mixed_operation_in_executor.py
Starting default pool executor blocking I/O operation
Done in 3.01 seconds
The results speak for themselves. The CPU-bound operation run in a separate process pool finished quicker than the simulated, long non-blocking I/O operation. The code was executed concurrently within the asynchronous environment.
But there is a significant downside to using thread pools. From the documentation : "Almost all asyncio objects are not thread-safe, which is typically not a problem unless there is code that works with them from outside of a Task or a callback." What does this mean in practice?
When asyncio spawns a thread pool, each of the threads is a different OS thread. Therefore one task running in the pool can't safely access a shared object from another task.
This can be seen when two separate threads try to read the same stream of data and update a shared resource. Let’s see it in action.
import asyncio
import io
import concurrent.futures
import time
def load_data(resource, f):
data = f.read()
resource["data"] = data
def process_data(resource):
while resource["data"] is None: # wait for data to be loaded
if resource["data"]:
print("Received data, processing")
time.sleep(1) # simulate processing
else:
print("No data to process")
async def main():
# prepare a file-like object that will simulate a stream of data
f = io.StringIO("I can fit triangles into squares")
# prepare a resource that will be shared between tasks
resource = {"type": "article", "data": None}
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
# spawn process data task that will wait for data to process
loop.run_in_executor(pool, process_data, resource)
# spawn two tasks that will load data concurrently
loop.run_in_executor(pool, load_data, resource, f)
loop.run_in_executor(pool, load_data, resource, f)
if __name__ == "__main__":
asyncio.run(main())
And the result:
(.venv) python threadsafe.py
No data to process
No data to process
No data to process
No data to process
The data was never processed. The first thread read all the bytes from the stream and the second one got access to an empty stream. The resource was updated by two threads at almost the same time, with the data and with the empty string. At the end, the processing loop got an empty string which yields “No data to process”.
A simple resource lock will fix it.
import asyncio
import io
import concurrent.futures
import threading
import time
def load_data(resource, f, lock):
data = f.read()
with lock:
resource["data"] = data
def process_data(resource, lock):
while True:
with lock:
if resource["data"] is None: # wait for data to be loaded
print("Waiting for data to process")
continue
if resource["data"]:
print("Received data, processing")
time.sleep(1) # simulate processing
break
async def main():
# prepare a file-like object that will simulate a stream of data
f = io.StringIO("I can fit triangles into squares")
# prepare a resource that will be shared between tasks
resource = {"type": "article", "data": None}
# create a lock to synchronize access to the shared resource
lock = threading.Lock()
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
# spawn process data task that will wait for data to process
loop.run_in_executor(pool, process_data, resource, lock)
# spawn two tasks that will load data concurrently
loop.run_in_executor(pool, load_data, resource, f, lock)
loop.run_in_executor(pool, load_data, resource, f, lock)
if __name__ == "__main__":
asyncio.run(main())
And the result:
(.venv) python threadsafe_lock.py
Waiting for data to process
Waiting for data to process
Waiting for data to process
Waiting for data to process
… and 31 more lines
Received data, processing
This time the processing function had a chance to get the data and process it.
The above are examples of working with separate OS threads inside an asyncio event loop. But asyncio itself is also not thread-safe. Awaiting tasks from different OS threads is not allowed and nor is sharing coroutines between threads. But safe sharing of data between threads is out of the scope of this article.
Key takeaways and recommendations
In this article, we’ve explored the challenges and consequences of using synchronous functions within an asynchronous event loop. We’ve also discussed various strategies for managing CPU-bound and I/O-bound blocking functions.
Synchronous code will always block other tasks when run inside an event loop. No matter if this is an I/O or CPU-bound operation. It will always execute sequentially and stop the whole application until it completes.
Asynchronous programming allows multiple tasks to run concurrently, improving performance and responsiveness. When your application carries out a lot of I/O operations it mostly waits idle until the data returns. This allows scheduling of multiple I/O operations at once, handling the data when ready while waiting for other tasks.
When working with synchronous functions in an asynchronous environment, it’s essential to use the right strategies to ensure efficient and effective tasks management. In the case of synchronous I/O functions you can spawn them in a separate thread using a asyncio.to_thread() function and it will work right off the bat. But when dealing with CPU-bound blocking functions, consider using a separate ProcessPoolExecutor to offload the CPU-intensive calculations to another processor.
Asyncio itself is not thread-safe and when dealing with a thread pool or process pool executors, manual synchronization of shared resources is needed.
By following these recommendations and understanding the challenges of synchronous functions in asynchronous environments, you'll be well-equipped to write efficient and effective code that takes advantage of Python's concurrency features.
In the end, it is possible to fit triangles into squares.