Перейти к основному содержимому

Концепции многозадачности

📺 Слайды к лекции

Материалы в разработке

Эта страница находится в процессе подготовки и не является финальной версией. Содержание будет дорабатываться и обновляться в течение текущего семестра.

Процессы и потоки

Определение

Процесс — экземпляр выполняющейся программы с собственным адресным пространством. Поток (thread) — единица выполнения внутри процесса, разделяющая память с другими потоками того же процесса.

ХарактеристикаПроцессыПотоки
ПамятьИзолированнаяОбщая
СозданиеДорогоеДешёвое
Обмен даннымиIPC (pipes, queues)Прямой доступ
GILНе влияетБлокирует CPU-bound

GIL (Global Interpreter Lock)

GIL — глобальная блокировка интерпретатора CPython, которая позволяет выполнять байткод Python только одному потоку в каждый момент времени.

Важно

GIL делает потоки Python бесполезными для CPU-bound задач, но они остаются полезными для I/O-bound задач (сетевые запросы, чтение файлов), потому что GIL освобождается при ожидании I/O.

gil_demo.py
import threading
import time

def cpu_bound(n):
"""CPU-bound: потоки НЕ ускорят"""
total = 0
for i in range(n):
total += i * i
return total

def io_bound(url):
"""I/O-bound: потоки УСКОРЯТ"""
import urllib.request
return urllib.request.urlopen(url).read()

Модуль threading

threading_example.py
import threading
import time

def worker(name, delay):
print(f"{name} started")
time.sleep(delay)
print(f"{name} finished")

# Создание потоков
t1 = threading.Thread(target=worker, args=("Thread-1", 2))
t2 = threading.Thread(target=worker, args=("Thread-2", 1))

t1.start()
t2.start()

t1.join() # дождаться завершения
t2.join()

Daemon-потоки

daemon.py
t = threading.Thread(target=worker, args=("Daemon", 10), daemon=True)
t.start()
# Программа завершится, не дожидаясь daemon-потока

Модуль multiprocessing

Для CPU-bound задач используйте процессы — каждый процесс имеет свой GIL:

multiprocessing_example.py
from multiprocessing import Process, Pool

def heavy_computation(n):
return sum(i * i for i in range(n))

# Один процесс
p = Process(target=heavy_computation, args=(10**7,))
p.start()
p.join()

# Пул процессов
with Pool(4) as pool:
results = pool.map(heavy_computation, [10**6] * 8)
print(sum(results))
Практический совет

multiprocessing.Pool удобен для параллельной обработки данных. Но помните: данные между процессами передаются через сериализацию (pickle), что может быть медленным для больших объектов.

concurrent.futures

Унифицированный API для потоков и процессов:

futures.py
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def download(url):
import urllib.request
return urllib.request.urlopen(url).read()

# ThreadPoolExecutor — для I/O-bound
urls = ["https://python.org"] * 5
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(download, url) for url in urls]
for future in futures:
data = future.result()
print(f"Downloaded {len(data)} bytes")

# ProcessPoolExecutor — для CPU-bound
def compute(n):
return sum(i**2 for i in range(n))

with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(compute, [10**6] * 8))

as_completed

as_completed.py
from concurrent.futures import ThreadPoolExecutor, as_completed

with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(download, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
data = future.result()
print(f"{url}: {len(data)} bytes")
except Exception as e:
print(f"{url}: error {e}")

Примитивы синхронизации

Lock и RLock

lock.py
import threading

counter = 0
lock = threading.Lock()

def increment():
global counter
for _ in range(100000):
with lock:
counter += 1

threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # Всегда 400000 благодаря lock
Частая ошибка

Без Lock операция counter += 1 не атомарна — потоки могут читать устаревшее значение, создавая race condition.

Semaphore и Event

sync.py
# Semaphore — ограничение одновременного доступа
sem = threading.Semaphore(3) # макс. 3 потока
with sem:
# критическая секция
pass

# Event — сигнализация между потоками
event = threading.Event()
event.wait() # блокировка до set()
event.set() # разблокировать все ожидающие
event.clear() # сбросить

Queue для межпоточного обмена

queue_example.py
from queue import Queue
import threading

q = Queue()

def producer():
for i in range(5):
q.put(f"item-{i}")

def consumer():
while True:
item = q.get()
print(f"Consumed: {item}")
q.task_done()

threading.Thread(target=producer).start()
threading.Thread(target=consumer, daemon=True).start()
q.join() # дождаться обработки всех элементов

Deadlocks и как их избежать

Частая ошибка

Deadlock возникает, когда два потока ждут ресурсы друг друга:

lock_a = threading.Lock()
lock_b = threading.Lock()

# Поток 1: lock_a -> lock_b
# Поток 2: lock_b -> lock_a — DEADLOCK!

Решение: всегда захватывайте блокировки в одном и том же порядке.

Когда использовать потоки vs процессы

ЗадачаРешениеПочему
Сетевые запросыThreadPoolExecutorI/O-bound, GIL не мешает
Обработка файловThreadPoolExecutorI/O-bound
Числовые вычисленияProcessPoolExecutorCPU-bound, обходим GIL
GUI + фоновая работаthreading.ThreadUI не блокируется