Обзор Async IO в Python 3.7

Модуль asyncio Python 3 предоставляет фундаментальные инструменты для реализации асинхронного ввода-вывода в Python. Он был представлен в Python 3.4, и с каждым последующим второстепенным выпуском модуль значительно развивался. Это руководство содержит общий обзор асинхронной парадигмы и того, как она реализована в Python 3.7. Блокирующий и неблокирующий ввод-вывод Проблема, которую пытается решить асинхронность, - это блокирование ввода-вывода. По умолчанию, когда ваша программа получает доступ к данным из источника ввода-вывода, она ожидает этой операции.

asyncio Python 3 предоставляет фундаментальные инструменты для реализации асинхронного ввода-вывода в Python. Он был представлен в Python 3.4, и с каждым последующим второстепенным выпуском модуль значительно развивался.

Это руководство содержит общий обзор асинхронной парадигмы и того, как она реализована в Python 3.7.

Блокирующий и неблокирующий ввод-вывод

Проблема, которую пытается решить асинхронность, - это блокировка ввода-вывода .

По умолчанию, когда ваша программа обращается к данным из источника ввода-вывода, она ожидает завершения этой операции, прежде чем продолжить выполнение программы.

 with open('myfile.txt', 'r') as file: 
 data = file.read() 
 # Until the data is read into memory, the program waits here 
 print(data) 

Программа заблокирована от продолжения потока выполнения, пока осуществляется доступ к физическому устройству и передаются данные.

Сетевые операции - еще один распространенный источник блокировки:

 # pip install --user requests 
 import requests 
 
 req = requests.get('https://www.stackabuse.com/') 
 
 # 
 # Blocking occurs here, waiting for completion of an HTTPS request 
 # 
 
 print(req.text) 

Во многих случаях задержка, вызванная блокировкой, незначительна. Однако блокировка ввода-вывода очень плохо масштабируется. Если вам нужно дождаться 10 ^10^ чтений файла или сетевых транзакций, производительность снизится.

Многопроцессорность, многопоточность и асинхронность

Стратегии минимизации задержек при блокировке ввода-вывода делятся на три основные категории: многопроцессорность, многопоточность и асинхронность.

Многопроцессорность

Многопроцессорность - это форма параллельных вычислений: инструкции выполняются в перекрывающихся временных рамках на нескольких физических процессорах или ядрах. Каждый процесс, порожденный ядром, требует накладных расходов, включая независимо выделяемый фрагмент памяти (кучу).

Python реализует параллелизм с модулем multiprocessing

Ниже приведен пример программы Python 3, которая порождает четыре дочерних процесса, каждый из которых демонстрирует случайную независимую задержку. Выходные данные показывают идентификатор процесса каждого дочернего процесса, системное время до и после каждой задержки, а также текущее и пиковое распределение памяти на каждом шаге.

 from multiprocessing import Process 
 import os, time, datetime, random, tracemalloc 
 
 tracemalloc.start() 
 children = 4 # number of child processes to spawn 
 maxdelay = 6 # maximum delay in seconds 
 
 def status(): 
 return ('Time: ' + 
 str(datetime.datetime.now().time()) + 
 '\t Malloc, Peak: ' + 
 str(tracemalloc.get_traced_memory())) 
 
 def child(num): 
 delay = random.randrange(maxdelay) 
 print(f"{status()}\t\tProcess {num}, PID: {os.getpid()}, Delay: {delay} seconds...") 
 time.sleep(delay) 
 print(f"{status()}\t\tProcess {num}: Done.") 
 
 if __name__ == '__main__': 
 print(f"Parent PID: {os.getpid()}") 
 for i in range(children): 
 proc = Process(target=child, args=(i,)) 
 proc.start() 

Выход:

 Parent PID: 16048 
 Time: 09:52:47.014906 Malloc, Peak: (228400, 240036) Process 0, PID: 16051, Delay: 1 seconds... 
 Time: 09:52:47.016517 Malloc, Peak: (231240, 240036) Process 1, PID: 16052, Delay: 4 seconds... 
 Time: 09:52:47.018786 Malloc, Peak: (231616, 240036) Process 2, PID: 16053, Delay: 3 seconds... 
 Time: 09:52:47.019398 Malloc, Peak: (232264, 240036) Process 3, PID: 16054, Delay: 2 seconds... 
 Time: 09:52:48.017104 Malloc, Peak: (228434, 240036) Process 0: Done. 
 Time: 09:52:49.021636 Malloc, Peak: (232298, 240036) Process 3: Done. 
 Time: 09:52:50.022087 Malloc, Peak: (231650, 240036) Process 2: Done. 
 Time: 09:52:51.020856 Malloc, Peak: (231274, 240036) Process 1: Done. 

Резьба

Многопоточность - это альтернатива многопроцессорности со своими преимуществами и недостатками.

Потоки планируются независимо, и их выполнение может происходить в перекрывающийся период времени. Однако, в отличие от многопроцессорной обработки, потоки существуют полностью в одном процессе ядра и совместно используют одну выделенную кучу.

Потоки Python являются параллельными - несколько последовательностей машинного кода выполняются в перекрывающиеся временные рамки. Но они не являются параллельными - выполнение не происходит одновременно на нескольких физических ядрах.

Основными недостатками потоковой передачи Python являются безопасность памяти и состояние гонки . Все дочерние потоки родительского процесса работают в одном и том же пространстве общей памяти. Без дополнительной защиты один поток может перезаписать совместно используемое значение в памяти, не зная об этом другим потокам. Такое повреждение данных было бы катастрофическим.

Чтобы обеспечить безопасность потоков, реализации CPython используют глобальную блокировку интерпретатора (GIL). GIL - это механизм мьютекса, который предотвращает одновременное выполнение нескольких потоков над объектами Python. Фактически это означает, что в любой момент времени выполняется только один поток.

Вот многопоточная версия примера многопроцессорной обработки из предыдущего раздела. Обратите внимание, что изменилось очень немногое: multiprocessing.Process заменено на threading.Thread . Как указано в выходных данных, все происходит в одном процессе, и объем памяти значительно меньше.

 from threading import Thread 
 import os, time, datetime, random, tracemalloc 
 
 tracemalloc.start() 
 children = 4 # number of child threads to spawn 
 maxdelay = 6 # maximum delay in seconds 
 
 def status(): 
 return ('Time: ' + 
 str(datetime.datetime.now().time()) + 
 '\t Malloc, Peak: ' + 
 str(tracemalloc.get_traced_memory())) 
 
 def child(num): 
 delay = random.randrange(maxdelay) 
 print(f"{status()}\t\tProcess {num}, PID: {os.getpid()}, Delay: {delay} seconds...") 
 time.sleep(delay) 
 print(f"{status()}\t\tProcess {num}: Done.") 
 
 if __name__ == '__main__': 
 print(f"Parent PID: {os.getpid()}") 
 for i in range(children): 
 thr = Thread(target=child, args=(i,)) 
 thr.start() 

Выход:

 Parent PID: 19770 
 Time: 10:44:40.942558 Malloc, Peak: (9150, 9264) Process 0, PID: 19770, Delay: 3 seconds... 
 Time: 10:44:40.942937 Malloc, Peak: (13989, 14103) Process 1, PID: 19770, Delay: 5 seconds... 
 Time: 10:44:40.943298 Malloc, Peak: (18734, 18848) Process 2, PID: 19770, Delay: 3 seconds... 
 Time: 10:44:40.943746 Malloc, Peak: (23959, 24073) Process 3, PID: 19770, Delay: 2 seconds... 
 Time: 10:44:42.945896 Malloc, Peak: (26599, 26713) Process 3: Done. 
 Time: 10:44:43.945739 Malloc, Peak: (26741, 27223) Process 0: Done. 
 Time: 10:44:43.945942 Malloc, Peak: (26851, 27333) Process 2: Done. 
 Time: 10:44:45.948107 Malloc, Peak: (24639, 27475) Process 1: Done. 

Асинхронность

Асинхронность - это альтернатива многопоточности для написания параллельных приложений. Асинхронные события происходят по независимым расписаниям, «несинхронно» друг с другом, полностью в одном потоке .

В отличие от потоковой передачи, в асинхронных программах программист контролирует, когда и как происходит добровольное прерывание, что упрощает изоляцию и предотвращение состояний гонки.

Введение в модуль asyncio Python 3.7

В Python 3.7 асинхронные операции предоставляются модулем asyncio

Высокоуровневый и низкоуровневый асинхронный API

Компоненты Asyncio делятся на API высокого уровня (для написания программ) и API низкого уровня (для написания библиотек или фреймворков на основе asyncio ).

Каждая asyncio может быть написана с использованием только высокоуровневых API. Если вы не пишете фреймворк или библиотеку, вам никогда не придется касаться низкоуровневых вещей.

С учетом сказанного, давайте посмотрим на основные высокоуровневые API-интерфейсы и обсудим основные концепции.

Сопрограммы

В общем, сопрограмма (сокращение от кооперативной подпрограммы ) - это функция, разработанная для добровольного вытесняющего многозадачность: она проактивно уступает место другим подпрограммам и процессам, а не принудительно вытесняется ядром. Термин «сопрограмма» был придуман в 1958 году Мелвином Конвеем (известный как «Закон Конвея») для описания кода, который активно поддерживает потребности других частей системы.

В asyncio это добровольное прерывание называется ожиданием .

Awaitables, Async и Await

Любой объект, который можно ожидать (добровольно вытесняемый сопрограммой), называется ожидаемым .

await приостанавливает выполнение текущей сопрограммы и вызывает указанное ожидание.

В Python 3.7 три ожидаемых объекта - это coroutine , task и future .

Asyncio coroutine любая функция Python, определение с префиксом async ключевого слова.

 async def my_coro(): 
 pass 

task asyncio - это объект, который обертывает сопрограмму, предоставляя методы для управления ее выполнением и запрашивая ее статус. Задача может быть создана с помощью asyncio.create_task() или asyncio.gather() .

future asyncio - это объект низкого уровня, который действует как заполнитель для данных, которые еще не были рассчитаны или получены. Он может предоставить пустую структуру, которая будет заполнена данными позже, и механизм обратного вызова, который запускается, когда данные готовы.

Задача наследует все методы, доступные для future , кроме двух, поэтому в Python 3.7 вам никогда не нужно создавать future объект напрямую.

Циклы событий

В asyncio цикл событий управляет планированием и обменом ожидаемыми объектами. Для использования ожидающих требуется цикл событий. Каждая программа asyncio имеет хотя бы один цикл обработки событий. Возможно создание нескольких циклов событий, но использование нескольких циклов событий в Python 3.7 категорически не рекомендуется .

Ссылка на текущий объект цикла получается путем вызова asyncio.get_running_loop() .

Спать

asyncio.sleep(delay) блокируется на секунды delay Это полезно для имитации блокирующего ввода-вывода.

 import asyncio 
 
 async def main(): 
 print("Sleep now.") 
 await asyncio.sleep(1.5) 
 print("OK, wake up!") 
 
 asyncio.run(main()) 
Запуск цикла главного события

Канонической точкой входа в программу asyncio является asyncio.run(main()) , где main() - сопрограмма верхнего уровня.

 import asyncio 
 
 async def my_coro(arg): 
 "A coroutine." 
 print(arg) 
 
 async def main(): 
 "The top-level coroutine." 
 await my_coro(42) 
 
 asyncio.run(main()) 

Вызов asyncio.run() неявно создает и запускает цикл событий. У объекта цикла есть много полезных методов, включая loop.time() , который возвращает значение с плавающей запятой, представляющее текущее время, измеренное внутренними часами цикла.

Примечание . asyncio.run() нельзя вызвать из существующего цикла событий. Следовательно, возможно, что вы увидите ошибки, если вы запускаете программу в контролирующей среде, такой как Anaconda или Jupyter, которая запускает собственный цикл обработки событий. Примеры программ в этом и следующих разделах следует запускать непосредственно из командной строки, выполнив файл python.

Следующая программа печатает строки текста с блокировкой на одну секунду после каждой строки до последней.

 import asyncio 
 
 async def my_coro(delay): 
 loop = asyncio.get_running_loop() 
 end_time = loop.time() + delay 
 while True: 
 print("Blocking...") 
 await asyncio.sleep(1) 
 if loop.time() > end_time: 
 print("Done.") 
 break 
 
 async def main(): 
 await my_coro(3.0) 
 
 asyncio.run(main()) 

Выход:

 Blocking... 
 Blocking... 
 Blocking... 
 Done. 
Задачи

Задача - это ожидаемый объект, который является оболочкой для сопрограммы. Чтобы создать и сразу запланировать задачу, вы можете вызвать следующее:

 asyncio.create_task(coro(args...)) 

Это вернет объект задачи. Создание задачи говорит циклу: «Давай, запустим эту сопрограмму, как только сможешь».

Если вы ожидаете выполнения задачи, выполнение текущей сопрограммы блокируется до тех пор, пока эта задача не будет завершена.

 import asyncio 
 
 async def my_coro(n): 
 print(f"The answer is {n}.") 
 
 async def main(): 
 # By creating the task, it's scheduled to run 
 # concurrently, at the event loop's discretion. 
 mytask = asyncio.create_task(my_coro(42)) 
 
 # If we later await the task, execution stops there 
 # until the task is complete. If the task is already 
 # complete before it is awaited, nothing is awaited. 
 await mytask 
 
 asyncio.run(main()) 

Выход:

 The answer is 42. 

У задач есть несколько полезных методов для управления обернутой сопрограммой. В частности, вы можете запросить отмену задачи, вызвав метод .cancel() Задача будет запланирована для отмены в следующем цикле цикла событий. Отмена не гарантируется: задача может быть завершена до этого цикла, и в этом случае отмена не происходит.

Сбор ожидаемых

Ожидаемые объекты можно собрать как группу, предоставив их в качестве аргумента списка для встроенной сопрограммы asyncio.gather(awaitables) .

asyncio.gather() возвращает ожидаемый объект, представляющий собранные ожидаемые объекты, и поэтому должен иметь префикс await .

Если какой-либо элемент awaitables является сопрограммой, он немедленно назначается как задача.

Сбор - это удобный способ запланировать одновременное выполнение нескольких сопрограмм как задач. Он также связывает собранные задачи некоторыми полезными способами:

  • Когда все собранные задачи завершены, их совокупные возвращаемые значения возвращаются в виде списка, упорядоченного в соответствии с порядком в списке ожидаемых.
  • Любую собранную задачу можно отменить, не отменяя другие задачи.
  • Сам сбор можно отменить, отменив все задания.
Пример: асинхронные веб-запросы с aiohttp

В следующем примере показано, как можно реализовать эти высокоуровневые API-интерфейсы asyncio. Ниже приведена модифицированная версия отличного примера asyncio Скотта Робинсона , обновленная для Python 3.7. Его программа использует aiohttp чтобы собирать самые популярные сообщения Reddit и выводить их на консоль.

Перед запуском приведенного ниже сценария убедитесь, что у вас установлен aiohttp Вы можете загрузить модуль с помощью следующей команды pip:

 $ pip install --user aiohttp 

 import sys 
 import asyncio 
 import aiohttp 
 import json 
 import datetime 
 
 async def get_json(client, url): 
 async with client.get(url) as response: 
 assert response.status == 200 
 return await response.read() 
 
 async def get_reddit_top(subreddit, client, numposts): 
 data = await get_json(client, 'https://www.reddit.com/r/' + 
 subreddit + '/top.json?sort=top&t=day&limit=' + 
 str(numposts)) 
 
 print(f'\n/r/{subreddit}:') 
 
 j = json.loads(data.decode('utf-8')) 
 for i in j['data']['children']: 
 score = i['data']['score'] 
 title = i['data']['title'] 
 link = i['data']['url'] 
 print('\t' + str(score) + ': ' + title + '\n\t\t(' + link + ')') 
 
 async def main(): 
 print(datetime.datetime.now().strftime("%A, %B %d, %I:%M %p")) 
 print('---------------------------') 
 loop = asyncio.get_running_loop() 
 async with aiohttp.ClientSession(loop=loop) as client: 
 await asyncio.gather( 
 get_reddit_top('python', client, 3), 
 get_reddit_top('programming', client, 4), 
 get_reddit_top('asyncio', client, 2), 
 get_reddit_top('dailyprogrammer', client, 1) 
 ) 
 
 asyncio.run(main()) 

Если вы запустите программу несколько раз, вы увидите, что порядок вывода изменится. Это потому, что запросы JSON отображаются по мере их получения, что зависит от времени ответа сервера и промежуточной задержки в сети. В системе Linux вы можете наблюдать это в действии, запустив сценарий с префиксом (например) watch -n 5 , который будет обновлять вывод каждые 5 секунд:

{.ezlazyload .img-responsive}

Другие API высокого уровня

Надеюсь, этот обзор даст вам прочную основу того, как, когда и зачем использовать asyncio. Другие высокоуровневые API-интерфейсы asyncio, не описанные здесь, включают:

  • stream , набор высокоуровневых сетевых примитивов для управления асинхронными TCP-событиями.
  • lock , event , condition , async аналоги примитивов синхронизации, представленных в модуле потоковой передачи.
  • subprocess , набор инструментов для запуска асинхронных подпроцессов, таких как команды оболочки.
  • queue , асинхронный аналог модуля очереди.
  • исключение для обработки исключений в асинхронном коде.

Заключение

Имейте в виду, что даже если ваша программа не требует асинхронности по соображениям производительности, вы все равно можете использовать asyncio если предпочитаете писать в асинхронной парадигме. asyncio представление о том, как, когда и зачем начинать использовать asyncio.

Licensed under CC BY-NC-SA 4.0
comments powered by Disqus