Асинхронное программирование
Генераторы — откуда растёт async
Чтобы понять, как работает async/await, нужно сначала разобраться в генераторах. Не потому что «так принято», а потому что asyncio буквально построен на механике генераторов. Без этого фундамента await выглядит как магия, а с ним — как логичное продолжение идей, заложенных ещё в Python 2.5.
Генератор — функция с yield вместо return. Вместо одного результата она выдаёт последовательность значений, приостанавливаясь между ними:
def countdown(n):
while n > 0:
yield n
n -= 1
for i in countdown(5):
print(i, end=" ")
# 5 4 3 2 1
yield делает две вещи одновременно: возвращает значение наружу и приостанавливает выполнение генератора до следующего вызова next(). Запомните эту механику — именно она потом станет основой для await.
Вызов функции-генератора не запускает её тело. Он создаёт объект-генератор. Код начинает работать только при первом next():
def countdown(n):
print("Counting down from", n)
while n > 0:
yield n
n -= 1
x = countdown(10)
print(x) # <generator object countdown at 0x...>
print(next(x)) # Counting down from 10 \n 10
print(next(x)) # 9
Генераторы как пайплайн
Генераторы можно складывать в цепочки. Один генератор тянет данные из другого через for, и так далее:
def sub_gen():
yield 1.1
yield 1.2
def gen():
yield 1
for i in sub_gen():
yield i
yield 2
for x in gen():
print(x)
# 1, 1.1, 1.2, 2
Вот как выглядит собственная реализация chain — функции, которая объединяет несколько итерируемых объектов в один поток:
def chain(*iterables):
for it in iterables:
for i in it:
yield i
s = 'ABC'
r = range(3)
print(list(chain(s, r)))
# ['A', 'B', 'C', 0, 1, 2]
Генераторы как корутины (PEP 342)
В Python 2.5 генераторы научились не только отдавать значения, но и принимать их (PEP 342). Появился метод .send() — он отправляет значение внутрь генератора на место yield. Плюс .throw() и .close() для исключений. Так родились классические корутины:
def accumulator():
total = 0
while True:
value = yield total # приостанавливаемся и ждём value
total += value
coro = accumulator()
next(coro) # запускаем до первого yield → 0
print(coro.send(5)) # отправляем 5, получаем сумму 5
print(coro.send(10)) # отправляем 10, получаем сумму 15
Что тут происходит: accumulator после запуска останавливается на yield и ждёт. Каждый send() возобновляет её, передаёт значение, и она снова засыпает на следующем yield. Получается кооперативная многозадачность: корутина сама решает, когда уступить управление.
Классическая корутина — это генератор, используемый по-другому. «Объект сопрограммы» физически является объектом генератора. Путаница между этими двумя применениями — одна из главных причин, почему потом придумали отдельный синтаксис async/await.
Корутина vs генератор
Чтобы почувствовать разницу, возьмём замыкание для скользящего среднего:
def make_averager():
total, counter = 0, 0
def averager(new_value):
nonlocal total, counter
total += new_value
counter += 1
return total / counter
return averager
g = make_averager()
print(g(10)) # 10.0
print(g(10)) # 10.0
print(g(40)) # 20.0
print(g(10)) # 17.5
А теперь то же самое, но как корутина:
def averager():
total = 0.0
count = 0
average = 0.0
while True:
term = yield average
total += term
count += 1
average = total / count
coro_avg = averager()
next(coro_avg) # 0.0
print(coro_avg.send(10)) # 10.0
print(coro_avg.send(30)) # 20.0
print(coro_avg.send(5)) # 15.0
Генераторы производят данные для итерации. Сопрограммы потребляют данные. Это разные вещи, и смешивать их в голове — верный путь к путанице.
yield from (PEP 380)
В Python 3.3 (PEP 380) появился yield from — способ делегировать работу другому генератору без ручного цикла for. Заодно генераторам разрешили возвращать значение через return:
def sub_gen():
yield 1.1
yield 1.2
def gen():
yield 1
yield from sub_gen() # вместо for i in sub_gen(): yield i
yield 2
for x in gen():
print(x)
# 1, 1.1, 1.2, 2
С возвратом значения из субгенератора:
def sub_gen():
yield 1.1
yield 1.2
return 'Done!'
def gen():
yield 1
result = yield from sub_gen()
print('<--', result)
yield 2
for x in gen():
print(x)
# 1, 1.1, 1.2, <-- Done!, 2
Зачем это нужно: обход дерева
Без yield from обход дерева произвольной глубины выглядит так — и это тупик, потому что количество вложенных циклов нужно знать заранее:
def sub_tree(cls):
for sub_cls in cls.__subclasses__():
yield sub_cls.__name__, 1
for sub_sub_cls in sub_cls.__subclasses__():
yield sub_sub_cls.__name__, 2
for sub_sub_sub_cls in sub_sub_cls.__subclasses__():
yield sub_sub_sub_cls.__name__, 3
# А если глубина 10? 20?
С yield from — рекурсия на произвольную глубину:
def tree(cls, level=0):
yield cls.__name__, level
for sub_cls in cls.__subclasses__():
yield from tree(sub_cls, level + 1)
def display(cls):
for cls_name, level in tree(cls):
indent = ' ' * 4 * level
print(f'{indent}{cls_name}')
if __name__ == '__main__':
display(BaseException)
# BaseException
# Exception
# TypeError
# StopAsyncIteration
# ...
# GeneratorExit
# SystemExit
# KeyboardInterrupt
yield from убрал кучу шаблонного кода при объединении генераторов. И именно он стал строительным блоком для первых реализаций async I/O в Python 3.4.
От генераторов к asyncio (Python 3.4)
В Python 3.4 в стандартную библиотеку вошёл модуль asyncio (PEP 3156, проект Tulip). Синтаксиса async/await ещё не было, поэтому корутины помечали декоратором @asyncio.coroutine, а ожидание делали через yield from:
import asyncio
@asyncio.coroutine
def old_style_coro():
data = yield from asyncio.sleep(1, result="data")
return data
@asyncio.coroutine deprecated в Python 3.8, удалён в 3.11. Этот код не работает в современном Python — он тут как исторический контекст.
При yield from управление возвращалось event loop, и тот мог переключиться на другую корутину. По сути yield from тут делал то же, что се йчас делает await — просто синтаксис был корявый.
До asyncio в Python были Twisted и Tornado — фреймворки с callbacks и генераторными корутинами. asyncio собрал эти идеи в стандартную библиотеку.
async/await (Python 3.5+)
Python 3.5 добавил отдельный синтаксис для корутин (PEP 492). Сравните:
# Python 3.4 — удалён в 3.11
@asyncio.coroutine
def fetch_page(url):
resp = yield from aiohttp.request('GET', url) # псевдокод
data = yield from resp.text()
return data
# Python 3.5+ ✅
async def fetch_page(url):
async with aiohttp.ClientSession() as session:
resp = await session.get(url)
data = await resp.text()
return data
Что дал новый синтаксис: компилятор теперь видит, где корутина, а где генератор. Ошибки ловятся раньше. Код читается яснее — видно, где именно происходит переключение. Генераторные корутины удалены из Python 3.11+.
Правила нативных корутин:
async defвсегда определяет корутину, даже если внутри нетawaitawaitможно использовать только внутриasync def- Вызов
async def-функции возвращает coroutine-объект, а не результат — его нужноawait'ить
Проблема: блокирующий I/O
Теперь, когда мы понимаем механику корутин, зачем всё это нужно на практике?
Обычная программа ждёт завершения каждой I/O-операции, прежде чем перейти к следующей:
import requests
def fetch_all(urls):
for url in urls:
requests.get(url) # ждём 1-2 сек
# ничего не делаем пока ждём!
# 10 URL × 2 сек = 20 секунд
А что если запустить все запросы одновременно и ждать все сразу? Именно это делает asyncio.
Модели I/O
Выполнение I/O состоит из двух шагов: проверка готовности устройства (блокирующая или нет) и передача данных (синхронная или асинхронная). Комбинации дают три модели:
Синхронный, блокирующий — вы сами везёте коробки и стоите в пробке. Ничего не делаете, пока не доедете.
Синхронный, неблокирующий — вы проверяете пробки перед выездом и едете только когда свободно. Между проверками занимаетесь другим.
Асинхронный, неблокирующий — вы наняли перевозчика. Он спрашивает, что везти, а вы свободны. Когда всё готово — вас уведомят.
На деле «неблокирующими» мы называем операции, которые можно раздробить на мелкие кусочки. Благодаря этому появляется возможность переключаться между задачами.
Asyncio: как это работает
Идея простая. Есть event loop — цикл обработки событий — и корутины с I/O-операциями. Мы передаём корутины event loop, он их запускает. Когда корутина делает await, она засыпает, и loop переключается на другую задачу. Когда I/O готов — loop будит нужную корутину. Всё в одном потоке.
import asyncio
async def main():
print("Привет...")
await asyncio.sleep(1) # уступаем управление на 1 сек
print("...мир!")
asyncio.run(main()) # точка входа — создаёт loop, запускает, закрывает
asyncio.run() — единственный рекомендуемый способ запуска event loop (начиная с Python 3.7).
Event loop изнутри
Под капотом event loop использует мультиплексирование I/O — механизм ОС, который уведомляет, когда сокет готов к чтению/записи или таймер истёк. На Linux это epoll, на macOS — kqueue, на Windows — IOCP. Отсюда разные р еализации loop: SelectorEventLoop (Linux) и ProactorEventLoop (Windows).
Event loop API
import asyncio
# Рекомендуемый способ
asyncio.run(coro()) # создаёт loop, запускает, закрывает
# Из корутины
asyncio.get_running_loop() # текущий loop; RuntimeError если нет
asyncio.create_task(coro()) # планирует корутину как Task
# Low-level (редко нужно)
loop.run_until_complete(coro()) # запускает loop до завершения
loop.run_forever() # до вызова stop()
loop.stop() / loop.close()
asyncio.get_event_loop() в Python 3.12 выдаёт DeprecationWarning, если нет текущего loop. В Python 3.14 бросает RuntimeError. Система политик event loop (set_event_loop_policy() и т.д.) deprecated в 3.14, удаление в 3.16. Проблемы с RuntimeError: Event loop is closed на Windows были характерны для Python 3.8-3.10 — в 3.12+ asyncio.run() работает без костылей.
Spinner: кооперативная многозадачность на практике
Лучший способ понять asyncio — увидеть, как две задачи работают в одном потоке. Классический пример: spinner крутится в консоли, пока в фоне идёт вычисление.
import asyncio
import itertools
async def spin(msg: str) -> None:
for char in itertools.cycle(r"\|/-"):
status = f"\r{char} {msg}"
print(status, flush=True, end="")
try:
await asyncio.sleep(0.1) # уступаем управление
except asyncio.CancelledError:
break
blanks = " " * len(status)
print(f"\r{blanks}\r", end="")
async def slow() -> int:
await asyncio.sleep(3)
return 42
async def supervisor() -> int:
spinner = asyncio.create_task(spin("thinking!"))
print(f"spinner object: {spinner}")
result = await slow()
spinner.cancel()
return result
def main() -> None:
result = asyncio.run(supervisor())
print(f"Answer: {result}")
if __name__ == "__main__":
main()
create_task() планирует spin как отдельную задачу. Пока supervisor ждёт slow() (3 секунды), event loop переключается на spin каждые 0.1 секунды. Когда slow() возвращает результат, cancel() выбрасывает CancelledError в spin, и она завершается.
Альтернативный подход — остановка через asyncio.Event (явная сигнализация вместо отмены):
import itertools
import asyncio
from asyncio import Event
async def spin(msg: str, done: Event) -> None:
for char in itertools.cycle(r"\|/-"):
status = f"\r{char} {msg}"
print(status, end="", flush=True)
if done.is_set():
break
await asyncio.sleep(0.1)
blanks = " " * len(status)
print(f"\r{blanks}\r", end="")
async def slow() -> int:
await asyncio.sleep(3)
return 42
async def supervisor() -> int:
done = Event()
spinner = asyncio.create_task(spin("thinking!", done))
result = await slow()
done.set()
await spinner
return result
def main() -> None:
result = asyncio.run(supervisor())
print(f"Answer: {result}")
if __name__ == "__main__":
main()
cancel() — идиоматичный asyncio-подход. Event полезен, когда отмена нежелательна и нужна явная координация.
Три способа запуска корутины
| Способ | Откуда | Что делает |
|---|---|---|
asyncio.run(coro()) | Обычная функция | Точка входа, блокирует до завершения |
asyncio.create_task(coro()) | Из корутины | Планирует задачу, возвращает Task |
await coro() | Из корутины | Приостанавливает текущую, ждёт результат |
import asyncio
async def fetch_data(url: str) -> str:
await asyncio.sleep(1)
return f"data from {url}"
async def main():
# await — последовательно (~2 сек)
r1 = await fetch_data("url1")
r2 = await fetch_data("url2")
# create_task + gather — паралле льно (~1 сек)
t1 = asyncio.create_task(fetch_data("url1"))
t2 = asyncio.create_task(fetch_data("url2"))
results = await asyncio.gather(t1, t2)
asyncio.run(main())
Event loop работает в одном потоке. time.sleep(), requests.get() и любой блокирующий вызов замораживает весь loop. Внутри корутин — только асинхронные аналоги.
Task, Future и координация задач
Task и Future
Future — низкоуровневый объект, который представляет результат асинхронной операции. У него есть методы result(), done(), cancel(), cancelled(). Напрямую с Future работают редко — обычно через Task.
Task наследует Future и оборачивает корутину. Когда вы вызываете asyncio.create_task(), event loop начинает выполнять корутину в фоне. Результат можно забрать через await task или task.result():
import asyncio
async def fetch(n):
await asyncio.sleep(n)
return f"result-{n}"
async def main():
task = asyncio.create_task(fetch(2))
# task уже работает в фоне, пока мы тут
print(task.done()) # False — ещё не завершён
result = await task # ждём завершения
print(result) # "result-2"
print(task.done()) # True
asyncio.run(main())
У Task есть полезные методы:
task.cancel()— отменяет задачу (выбрасываетCancelledErrorв точкеawait)task.cancelled()— проверяет, была ли задача отмененаtask.done()—Trueесли задача завершена (успешно, с ошибкой или отменена)task.result()— возвращает результат или выбрасывает исключение
asyncio.gather() — запуск нескольких задач и ожидание всех
gather принимает набор корутин или Task, запускает их параллельно и возвращает список результатов в том же порядке:
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: factorial({number}), i={i}")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
return f
async def main():
# Все три задачи работают параллельно
results = await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
print(results) # [2, 6, 24] — порядок сохранён
asyncio.run(main())
Если одна из корутин упадёт с ошибкой, gather по умолчанию бросит это исключение, а остальные задачи продолжат работать (не отменяются!). Чтобы получить ошибки как объекты вместо исключения:
results = await asyncio.gather(
coro_ok(), coro_fail(), coro_ok(),
return_exceptions=True
)
# results = ["ok", ValueError("fail"), "ok"]
for r in results:
if isinstance(r, Exception):
print(f"Ошибка: {r}")