Перейти к основному содержимому

Асинхронное программирование

📺 Слайды к лекции

Генераторы — откуда растёт async

Чтобы понять, как работает async/await, нужно сначала разобраться в генераторах. Не потому что «так принято», а потому что asyncio буквально построен на механике генераторов. Без этого фундамента await выглядит как магия, а с ним — как логичное продолжение идей, заложенных ещё в Python 2.5.

Генератор — функция с yield вместо return. Вместо одного результата она выдаёт последовательность значений, приостанавливаясь между ними:

generators.py
def countdown(n):
while n > 0:
yield n
n -= 1

for i in countdown(5):
print(i, end=" ")
# 5 4 3 2 1
примечание

yield делает две вещи одновременно: возвращает значение наружу и приостанавливает выполнение генератора до следующего вызова next(). Запомните эту механику — именно она потом станет основой для await.

Вызов функции-генератора не запускает её тело. Он создаёт объект-генератор. Код начинает работать только при первом next():

generator_object.py
def countdown(n):
print("Counting down from", n)
while n > 0:
yield n
n -= 1

x = countdown(10)
print(x) # <generator object countdown at 0x...>
print(next(x)) # Counting down from 10 \n 10
print(next(x)) # 9

Генераторы как пайплайн

Генераторы можно складывать в цепочки. Один генератор тянет данные из другого через for, и так далее:

pipeline.py
def sub_gen():
yield 1.1
yield 1.2

def gen():
yield 1
for i in sub_gen():
yield i
yield 2

for x in gen():
print(x)
# 1, 1.1, 1.2, 2

Вот как выглядит собственная реализация chain — функции, которая объединяет несколько итерируемых объектов в один поток:

chain.py
def chain(*iterables):
for it in iterables:
for i in it:
yield i

s = 'ABC'
r = range(3)
print(list(chain(s, r)))
# ['A', 'B', 'C', 0, 1, 2]

Генераторы как корутины (PEP 342)

В Python 2.5 генераторы научились не только отдавать значения, но и принимать их (PEP 342). Появился метод .send() — он отправляет значение внутрь генератора на место yield. Плюс .throw() и .close() для исключений. Так родились классические корутины:

classic_coroutine.py
def accumulator():
total = 0
while True:
value = yield total # приостанавливаемся и ждём value
total += value

coro = accumulator()
next(coro) # запускаем до первого yield → 0
print(coro.send(5)) # отправляем 5, получаем сумму 5
print(coro.send(10)) # отправляем 10, получаем сумму 15

Что тут происходит: accumulator после запуска останавливается на yield и ждёт. Каждый send() возобновляет её, передаёт значение, и она снова засыпает на следующем yield. Получается кооперативная многозадачность: корутина сама решает, когда уступить управление.

примечание

Классическая корутина — это генератор, используемый по-другому. «Объект сопрограммы» физически является объектом генератора. Путаница между этими двумя применениями — одна из главных причин, почему потом придумали отдельный синтаксис async/await.

Корутина vs генератор

Чтобы почувствовать разницу, возьмём замыкание для скользящего среднего:

averager_closure.py
def make_averager():
total, counter = 0, 0

def averager(new_value):
nonlocal total, counter
total += new_value
counter += 1
return total / counter

return averager

g = make_averager()
print(g(10)) # 10.0
print(g(10)) # 10.0
print(g(40)) # 20.0
print(g(10)) # 17.5

А теперь то же самое, но как корутина:

averager_coroutine.py
def averager():
total = 0.0
count = 0
average = 0.0
while True:
term = yield average
total += term
count += 1
average = total / count

coro_avg = averager()
next(coro_avg) # 0.0
print(coro_avg.send(10)) # 10.0
print(coro_avg.send(30)) # 20.0
print(coro_avg.send(5)) # 15.0
warning

Генераторы производят данные для итерации. Сопрограммы потребляют данные. Это разные вещи, и смешивать их в голове — верный путь к путанице.

yield from (PEP 380)

В Python 3.3 (PEP 380) появился yield from — способ делегировать работу другому генератору без ручного цикла for. Заодно генераторам разрешили возвращать значение через return:

yield_from.py
def sub_gen():
yield 1.1
yield 1.2

def gen():
yield 1
yield from sub_gen() # вместо for i in sub_gen(): yield i
yield 2

for x in gen():
print(x)
# 1, 1.1, 1.2, 2

С возвратом значения из субгенератора:

yield_from_return.py
def sub_gen():
yield 1.1
yield 1.2
return 'Done!'

def gen():
yield 1
result = yield from sub_gen()
print('<--', result)
yield 2

for x in gen():
print(x)
# 1, 1.1, 1.2, <-- Done!, 2

Зачем это нужно: обход дерева

Без yield from обход дерева произвольной глубины выглядит так — и это тупик, потому что количество вложенных циклов нужно знать заранее:

tree_nested.py
def sub_tree(cls):
for sub_cls in cls.__subclasses__():
yield sub_cls.__name__, 1
for sub_sub_cls in sub_cls.__subclasses__():
yield sub_sub_cls.__name__, 2
for sub_sub_sub_cls in sub_sub_cls.__subclasses__():
yield sub_sub_sub_cls.__name__, 3
# А если глубина 10? 20?

С yield from — рекурсия на произвольную глубину:

tree_yield_from.py
def tree(cls, level=0):
yield cls.__name__, level
for sub_cls in cls.__subclasses__():
yield from tree(sub_cls, level + 1)

def display(cls):
for cls_name, level in tree(cls):
indent = ' ' * 4 * level
print(f'{indent}{cls_name}')

if __name__ == '__main__':
display(BaseException)
# BaseException
# Exception
# TypeError
# StopAsyncIteration
# ...
# GeneratorExit
# SystemExit
# KeyboardInterrupt

yield from убрал кучу шаблонного кода при объединении генераторов. И именно он стал строительным блоком для первых реализаций async I/O в Python 3.4.

От генераторов к asyncio (Python 3.4)

В Python 3.4 в стандартную библиотеку вошёл модуль asyncio (PEP 3156, проект Tulip). Синтаксиса async/await ещё не было, поэтому корутины помечали декоратором @asyncio.coroutine, а ожидание делали через yield from:

asyncio_34.py
import asyncio

@asyncio.coroutine
def old_style_coro():
data = yield from asyncio.sleep(1, result="data")
return data
Только для Python 3.4-3.10

@asyncio.coroutine deprecated в Python 3.8, удалён в 3.11. Этот код не работает в современном Python — он тут как исторический контекст.

При yield from управление возвращалось event loop, и тот мог переключиться на другую корутину. По сути yield from тут делал то же, что сейчас делает await — просто синтаксис был корявый.

примечание

До asyncio в Python были Twisted и Tornado — фреймворки с callbacks и генераторными корутинами. asyncio собрал эти идеи в стандартную библиотеку.

async/await (Python 3.5+)

Python 3.5 добавил отдельный синтаксис для корутин (PEP 492). Сравните:

old_vs_new.py
# Python 3.4 — удалён в 3.11
@asyncio.coroutine
def fetch_page(url):
resp = yield from aiohttp.request('GET', url) # псевдокод
data = yield from resp.text()
return data

# Python 3.5+ ✅
async def fetch_page(url):
async with aiohttp.ClientSession() as session:
resp = await session.get(url)
data = await resp.text()
return data

Что дал новый синтаксис: компилятор теперь видит, где корутина, а где генератор. Ошибки ловятся раньше. Код читается яснее — видно, где именно происходит переключение. Генераторные корутины удалены из Python 3.11+.

Правила нативных корутин:

  • async def всегда определяет корутину, даже если внутри нет await
  • await можно использовать только внутри async def
  • Вызов async def-функции возвращает coroutine-объект, а не результат — его нужно await'ить

Проблема: блокирующий I/O

Теперь, когда мы понимаем механику корутин, зачем всё это нужно на практике?

Обычная программа ждёт завершения каждой I/O-операции, прежде чем перейти к следующей:

blocking.py
import requests

def fetch_all(urls):
for url in urls:
requests.get(url) # ждём 1-2 сек
# ничего не делаем пока ждём!

# 10 URL × 2 сек = 20 секунд

А что если запустить все запросы одновременно и ждать все сразу? Именно это делает asyncio.

Модели I/O

Выполнение I/O состоит из двух шагов: проверка готовности устройства (блокирующая или нет) и передача данных (синхронная или асинхронная). Комбинации дают три модели:

Синхронный, блокирующий — вы сами везёте коробки и стоите в пробке. Ничего не делаете, пока не доедете.

Синхронный, неблокирующий — вы проверяете пробки перед выездом и едете только когда свободно. Между проверками занимаетесь другим.

Асинхронный, неблокирующий — вы наняли перевозчика. Он спрашивает, что везти, а вы свободны. Когда всё готово — вас уведомят.

На деле «неблокирующими» мы называем операции, которые можно раздробить на мелкие кусочки. Благодаря этому появляется возможность переключаться между задачами.

Asyncio: как это работает

Идея простая. Есть event loop — цикл обработки событий — и корутины с I/O-операциями. Мы передаём корутины event loop, он их запускает. Когда корутина делает await, она засыпает, и loop переключается на другую задачу. Когда I/O готов — loop будит нужную корутину. Всё в одном потоке.

first_async.py
import asyncio

async def main():
print("Привет...")
await asyncio.sleep(1) # уступаем управление на 1 сек
print("...мир!")

asyncio.run(main()) # точка входа — создаёт loop, запускает, закрывает

asyncio.run() — единственный рекомендуемый способ запуска event loop (начиная с Python 3.7).

Event loop изнутри

Под капотом event loop использует мультиплексирование I/O — механизм ОС, который уведомляет, когда сокет готов к чтению/записи или таймер истёк. На Linux это epoll, на macOS — kqueue, на Windows — IOCP. Отсюда разные реализации loop: SelectorEventLoop (Linux) и ProactorEventLoop (Windows).

Event loop API

event_loop_api.py
import asyncio

# Рекомендуемый способ
asyncio.run(coro()) # создаёт loop, запускает, закрывает

# Из корутины
asyncio.get_running_loop() # текущий loop; RuntimeError если нет
asyncio.create_task(coro()) # планирует корутину как Task

# Low-level (редко нужно)
loop.run_until_complete(coro()) # запускает loop до завершения
loop.run_forever() # до вызова stop()
loop.stop() / loop.close()
Что изменилось в Python 3.12-3.14

asyncio.get_event_loop() в Python 3.12 выдаёт DeprecationWarning, если нет текущего loop. В Python 3.14 бросает RuntimeError. Система политик event loop (set_event_loop_policy() и т.д.) deprecated в 3.14, удаление в 3.16. Проблемы с RuntimeError: Event loop is closed на Windows были характерны для Python 3.8-3.10 — в 3.12+ asyncio.run() работает без костылей.

Spinner: кооперативная многозадачность на практике

Лучший способ понять asyncio — увидеть, как две задачи работают в одном потоке. Классический пример: spinner крутится в консоли, пока в фоне идёт вычисление.

spinner_cancel.py
import asyncio
import itertools

async def spin(msg: str) -> None:
for char in itertools.cycle(r"\|/-"):
status = f"\r{char} {msg}"
print(status, flush=True, end="")
try:
await asyncio.sleep(0.1) # уступаем управление
except asyncio.CancelledError:
break
blanks = " " * len(status)
print(f"\r{blanks}\r", end="")

async def slow() -> int:
await asyncio.sleep(3)
return 42

async def supervisor() -> int:
spinner = asyncio.create_task(spin("thinking!"))
print(f"spinner object: {spinner}")
result = await slow()
spinner.cancel()
return result

def main() -> None:
result = asyncio.run(supervisor())
print(f"Answer: {result}")

if __name__ == "__main__":
main()

create_task() планирует spin как отдельную задачу. Пока supervisor ждёт slow() (3 секунды), event loop переключается на spin каждые 0.1 секунды. Когда slow() возвращает результат, cancel() выбрасывает CancelledError в spin, и она завершается.

Альтернативный подход — остановка через asyncio.Event (явная сигнализация вместо отмены):

spinner_event.py
import itertools
import asyncio
from asyncio import Event

async def spin(msg: str, done: Event) -> None:
for char in itertools.cycle(r"\|/-"):
status = f"\r{char} {msg}"
print(status, end="", flush=True)
if done.is_set():
break
await asyncio.sleep(0.1)
blanks = " " * len(status)
print(f"\r{blanks}\r", end="")

async def slow() -> int:
await asyncio.sleep(3)
return 42

async def supervisor() -> int:
done = Event()
spinner = asyncio.create_task(spin("thinking!", done))
result = await slow()
done.set()
await spinner
return result

def main() -> None:
result = asyncio.run(supervisor())
print(f"Answer: {result}")

if __name__ == "__main__":
main()

cancel() — идиоматичный asyncio-подход. Event полезен, когда отмена нежелательна и нужна явная координация.

Три способа запуска корутины

СпособОткудаЧто делает
asyncio.run(coro())Обычная функцияТочка входа, блокирует до завершения
asyncio.create_task(coro())Из корутиныПланирует задачу, возвращает Task
await coro()Из корутиныПриостанавливает текущую, ждёт результат
three_ways.py
import asyncio

async def fetch_data(url: str) -> str:
await asyncio.sleep(1)
return f"data from {url}"

async def main():
# await — последовательно (~2 сек)
r1 = await fetch_data("url1")
r2 = await fetch_data("url2")

# create_task + gather — параллельно (~1 сек)
t1 = asyncio.create_task(fetch_data("url1"))
t2 = asyncio.create_task(fetch_data("url2"))
results = await asyncio.gather(t1, t2)

asyncio.run(main())
осторожно

Event loop работает в одном потоке. time.sleep(), requests.get() и любой блокирующий вызов замораживает весь loop. Внутри корутин — только асинхронные аналоги.

Task, Future и координация задач

Task и Future

Future — низкоуровневый объект, который представляет результат асинхронной операции. У него есть методы result(), done(), cancel(), cancelled(). Напрямую с Future работают редко — обычно через Task.

Task наследует Future и оборачивает корутину. Когда вы вызываете asyncio.create_task(), event loop начинает выполнять корутину в фоне. Результат можно забрать через await task или task.result():

task_basics.py
import asyncio

async def fetch(n):
await asyncio.sleep(n)
return f"result-{n}"

async def main():
task = asyncio.create_task(fetch(2))

# task уже работает в фоне, пока мы тут
print(task.done()) # False — ещё не завершён

result = await task # ждём завершения
print(result) # "result-2"
print(task.done()) # True

asyncio.run(main())

У Task есть полезные методы:

  • task.cancel() — отменяет задачу (выбрасывает CancelledError в точке await)
  • task.cancelled() — проверяет, была ли задача отменена
  • task.done()True если задача завершена (успешно, с ошибкой или отменена)
  • task.result() — возвращает результат или выбрасывает исключение

asyncio.gather() — запуск нескольких задач и ожидание всех

gather принимает набор корутин или Task, запускает их параллельно и возвращает список результатов в том же порядке:

gather.py
import asyncio

async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: factorial({number}), i={i}")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
return f

async def main():
# Все три задачи работают параллельно
results = await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
print(results) # [2, 6, 24] — порядок сохранён

asyncio.run(main())

Если одна из корутин упадёт с ошибкой, gather по умолчанию бросит это исключение, а остальные задачи продолжат работать (не отменяются!). Чтобы получить ошибки как объекты вместо исключения:

gather_exceptions.py
results = await asyncio.gather(
coro_ok(), coro_fail(), coro_ok(),
return_exceptions=True
)
# results = ["ok", ValueError("fail"), "ok"]
for r in results:
if isinstance(r, Exception):
print(f"Ошибка: {r}")

asyncio.wait() — гибкий контроль над группой задач

wait возвращает два множества: (done, pending). В отличие от gather, позволяет задать условие завершения и таймаут:

wait.py
import asyncio

async def fetch(n):
await asyncio.sleep(n)
return f"result-{n}"

async def main():
tasks = {asyncio.create_task(fetch(i)) for i in [3, 1, 2]}

# Ждём ВСЕ (по умолчанию)
done, pending = await asyncio.wait(tasks)
for task in done:
print(task.result())

# Ждём ПЕРВЫЙ завершившийся
tasks = {asyncio.create_task(fetch(i)) for i in [3, 1, 2]}
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
print(f"Первый: {done.pop().result()}") # result-1
print(f"Ещё работают: {len(pending)}") # 2

# Ждём с таймаутом
tasks = {asyncio.create_task(fetch(i)) for i in [3, 1, 2]}
done, pending = await asyncio.wait(tasks, timeout=1.5)
print(f"Успели: {len(done)}, не успели: {len(pending)}")

asyncio.run(main())

Варианты return_when:

  • FIRST_COMPLETED — вернуться, как только хоть одна задача завершится
  • FIRST_EXCEPTION — вернуться при первом исключении (или когда все завершатся)
  • ALL_COMPLETED — ждать все (по умолчанию)
warning

wait принимает только Task/Future, не голые корутины. Оборачивайте через create_task(). Незавершённые задачи из pending продолжают работать — не забудьте их отменить, если они больше не нужны.

asyncio.as_completed() — обработка по мере готовности

as_completed возвращает итератор, который отдаёт результаты в порядке завершения, а не в порядке запуска. Удобно, когда нужно обработать первый готовый результат, не дожидаясь остальных:

as_completed.py
import asyncio

async def fetch(url, delay):
await asyncio.sleep(delay)
return f"{url}: {delay}s"

async def main():
coros = [fetch("fast.io", 1), fetch("slow.io", 3), fetch("mid.io", 2)]

async for earliest in asyncio.as_completed(coros):
result = await earliest
print(result)
# Выведет в порядке завершения:
# fast.io: 1s
# mid.io: 2s
# slow.io: 3s

asyncio.run(main())

asyncio.sleep() и asyncio.shield()

asyncio.sleep(delay) — приостанавливает корутину на delay секунд, уступая управление event loop. Это не time.sleep() — loop свободен и может выполнять другие задачи. sleep(0) — способ явно уступить управление, не ожидая ничего.

asyncio.shield(aw) — защищает awaitable от отмены. Если внешнюю задачу отменяют, операция внутри shield продолжает работать:

shield.py
async def critical_operation():
await asyncio.sleep(5) # важная операция, которую нельзя прерывать
return "done"

async def main():
task = asyncio.create_task(asyncio.shield(critical_operation()))
await asyncio.sleep(1)
task.cancel() # внешний Task отменён, но critical_operation продолжает работать

asyncio.run(main())

Когда что использовать

ЗадачаИнструмент
Запустить несколько корутин, дождаться всех, получить список результатовgather
Запустить несколько, обработать первый готовыйas_completed
Запустить несколько, гибко контролировать (таймаут, первый завершившийся)wait
Запустить несколько с гарантией завершения и отменой при ошибкеTaskGroup
Защитить операцию от отменыshield

aiohttp: настоящий сетевой I/O

Теория — это хорошо, но вот как asyncio используется в реальности:

aiohttp_example.py
import aiohttp
import asyncio

async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()

async def main():
urls = [
"https://python.org",
"https://docs.python.org",
"https://pypi.org",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, html in zip(urls, results):
print(f"{url}: {len(html)} chars")

asyncio.run(main()) # 3 запроса за ~1 сек вместо ~3
осторожно

Не используйте requests в async коде. Он блокирует event loop. Берите aiohttp или httpx с async-API.

Что выбрать: asyncio, потоки или процессы

decision.py
if io_bound:
if many_concurrent_connections: # сотни/тысячи
print("Use Asyncio")
else:
print("Use Threads")
else: # cpu_bound
print("Multi Processing")

asyncio хорош, когда нужно держать тысячи одновременных соединений — скрейпинг, API-шлюзы, чат-серверы. Для десятка потоков I/O обычный threading проще.

Критерийasynciothreadingmultiprocessing
ПереключениеКооперативное (await)Вытесняющее (GIL)Вытесняющее (ОС)
GILОдин поток, не мешаетОграничивает CPUСвой на процесс
Накладные расходыМинимальныеСредниеВысокие
Лучше дляI/O (много соединений)I/O (немного потоков)CPU-bound
Race conditionsРедки (только на await)ЧастыеНет (изоляция)
Free-threaded Python

В Python 3.13 появился режим без GIL (PEP 703). В Python 3.14 он уже не экспериментальный (PEP 779). Потоки постепенно смогут выполнять CPU-bound код параллельно — картина меняется.

asyncio.to_thread() — мост к блокирующему коду

Иногда нужно вызвать блокирующую функцию из async кода. asyncio.to_thread() (Python 3.9+) запускает её в отдельном потоке, не замораживая loop:

to_thread.py
import asyncio
import time

def blocking_io():
time.sleep(2)
return "done"

async def main():
result = await asyncio.to_thread(blocking_io)
print(result)

asyncio.run(main())

Незаменимо для legacy-кода: requests, файловый I/O, ORM без async-поддержки. По сути это мост между threading из прошлой лекции и asyncio.

TaskGroup — структурированная конкурентность (Python 3.11+)

TaskGroup — замена gather() с гарантиями. Все задачи завершатся при выходе из блока, а при ошибке в одной — остальные автоматически отменяются:

taskgroup.py
import asyncio

async def fetch(url: str) -> str:
await asyncio.sleep(1)
return f"data from {url}"

async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch("url1"))
task2 = tg.create_task(fetch("url2"))
task3 = tg.create_task(fetch("url3"))
# сюда попадём только когда все задачи завершены
print(task1.result(), task2.result(), task3.result())

asyncio.run(main())

Почему это лучше gather():

  • Ошибка в задаче → автоотмена остальных (в gather они продолжают работать)
  • Ошибки собираются в ExceptionGroup, обрабатываются через except* (PEP 654)
  • Ни одна задача не «потеряется» — все гарантированно завершены при выходе
taskgroup_errors.py
import asyncio

async def may_fail(n: int) -> str:
if n == 2:
raise ValueError(f"Ошибка в задаче {n}")
await asyncio.sleep(n)
return f"ok-{n}"

async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(may_fail(1))
tg.create_task(may_fail(2)) # упадёт
tg.create_task(may_fail(3)) # будет отменена
except* ValueError as eg:
for exc in eg.exceptions:
print(f"Поймали: {exc}")

asyncio.run(main())

Для нового кода берите TaskGroup вместо gather().

Обработка ошибок в asyncio

Главный подводный камень

Если вы создали Task и никто не вызвал await task или task.result(), исключение из этого таска пропадёт. Ну, не совсем — при сборке мусора Python выведет предупреждение в лог. Которое вы не заметите. Это классический fire-and-forget антипаттерн.

error_handling.py
import asyncio

async def may_fail():
raise ValueError("что-то пошло не так")

async def main():
# ❌ Исключение пропадёт
asyncio.create_task(may_fail())

# ✅ gather с return_exceptions — ошибки как объекты в списке
results = await asyncio.gather(
may_fail(), may_fail(),
return_exceptions=True
)
for r in results:
if isinstance(r, Exception):
print(f"Ошибка: {r}")

# ✅ TaskGroup — самый безопасный вариант (Python 3.11+)
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(may_fail())
except* ValueError as eg:
for exc in eg.exceptions:
print(f"ValueError: {exc}")

asyncio.run(main())

asyncio.timeout() (Python 3.11+)

Контекстный менеджер для таймаутов — замена asyncio.wait_for():

timeout.py
import asyncio

async def slow_operation() -> str:
await asyncio.sleep(10)
return "done"

async def main():
try:
async with asyncio.timeout(5):
result = await slow_operation()
except TimeoutError:
print("Не уложились!")

# Без исключения — проверка через cm.expired()
async with asyncio.timeout(5) as cm:
await asyncio.sleep(1)
if cm.expired():
print("Таймаут!")
else:
print("Успели!")

asyncio.run(main())

Асинхронные контекстные менеджеры

Для ресурсов, которые нужно асинхронно открыть и закрыть, есть __aenter__ / __aexit__:

async_context.py
class AsyncResource:
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, *exc):
await self.disconnect()

async with AsyncResource() as res:
await res.do_work()

На практике чаще используется @asynccontextmanager из contextlib:

async_contextmanager.py
from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_connection(url: str):
conn = await connect(url)
try:
yield conn
finally:
await conn.close()

async with managed_connection("postgresql://...") as conn:
await conn.execute("SELECT 1")

Асинхронные генераторы

async_gen.py
async def fetch_pages(urls):
async with aiohttp.ClientSession() as session:
for url in urls:
async with session.get(url) as resp:
yield await resp.text()

async def main():
async for page in fetch_pages(urls):
process(page) # обрабатываем по мере получения

Полезны для стриминга данных из базы, пагинированных API, потока событий (WebSocket, SSE) — везде, где данные приходят порциями с ожиданием I/O.

Синхронизация в asyncio

async_sync.py
import asyncio

# Lock — только одна корутина в критической секции
lock = asyncio.Lock()
async with lock:
...

# Semaphore — ограничение параллелизма (макс. 10 запросов)
sem = asyncio.Semaphore(10)
async with sem:
await fetch(url)

# Event — сигнализация между задачами
event = asyncio.Event()
await event.wait()
event.set()

# Barrier (Python 3.11+) — точка синхронизации для N задач
barrier = asyncio.Barrier(3)
await barrier.wait() # продолжим когда 3 задачи дойдут сюда

Semaphore особенно полезен на практике — ограничивает одновременные HTTP-запросы, чтобы не положить сервер.

Полезные ссылки

Книги

Статьи

Видео

Философия async

Документация

Примеры кода

PEP

  • PEP 342 — Coroutines via Enhanced Generators (Python 2.5)
  • PEP 380 — Syntax for Delegating to a Subgenerator (Python 3.3)
  • PEP 3156 — Asynchronous IO Support Rebooted (Python 3.4)
  • PEP 492 — Coroutines with async and await syntax (Python 3.5)
  • PEP 654 — Exception Groups and except* (Python 3.11)
  • PEP 703 — Making the Global Interpreter Lock Optional (Python 3.13+)