Параллельная обработка в Python

Введение Когда вы запускаете программу на своем компьютере, она запускается в своем собственном «пузыре», который полностью отделен от других программ, которые активны в то же время. Этот «пузырь» называется процессом [https://en.wikipedia.org/wiki/Process_(computing)] и включает в себя все, что необходимо для управления этим вызовом программы. Например, эта так называемая среда процесса включает страницы памяти [https://en.wikipedia.org/wiki/Page_(computer_memory)], которые использует процесс, файл обрабатывает это

Вступление

Когда вы запускаете программу на своем компьютере, она запускается в своем собственном «пузыре», который полностью отделен от других программ, которые активны в то же время. Этот «пузырь» называется процессом и включает в себя все, что необходимо для управления этим вызовом программы.

Например, эта так называемая среда процесса включает в себя страницы памяти, которые использует процесс, обрабатываемый этим процессом файл, права доступа пользователей и групп, а также весь вызов командной строки, включая заданные параметры.

Эта информация хранится в файловой системе процесса вашей системы UNIX / Linux, которая является виртуальной файловой системой и доступна через каталог / proc. Записи сортируются по идентификатору процесса, который уникален для каждого процесса. Пример 1 показывает это для произвольно выбранного процесса с идентификатором # 177.

Пример 1: Информация, доступная процессу

 [email protected] :/proc/177# ls 
 attr cpuset limits net projid_map statm 
 autogroup cwd loginuid ns root status 
 auxv environ map_files numa_maps sched syscall 
 cgroup exe maps oom_adj sessionid task 
 clear_refs fd mem oom_score setgroups timers 
 cmdline fdinfo mountinfo oom_score_adj smaps uid_map 
 comm gid_map mounts pagemap stack wchan 
 coredump_filter io mountstats personality stat 

Структурирование программного кода и данных

Чем сложнее становится программа, тем чаще бывает удобно разбивать ее на более мелкие части. Это относится не только к исходному коду, но и к коду, который выполняется на вашем компьютере. Одним из решений этого является использование подпроцессов в сочетании с параллельным выполнением. За этим стоят следующие мысли:

  • Один процесс охватывает фрагмент кода, который можно запускать отдельно.
  • Некоторые участки кода могут выполняться одновременно, что в принципе допускает распараллеливание.
  • Использование возможностей современных процессоров и операционных систем, например каждого доступного ядра процессора, для сокращения общего времени выполнения программы.
  • Чтобы снизить сложность вашей программы / кода и передать часть работы специализированным агентам, выступающим в качестве подпроцессов.

Использование подпроцессов требует от вас переосмысления способа выполнения вашей программы, от линейного к параллельному. Это похоже на изменение вашей рабочей точки зрения в компании с обычного работника на менеджера - вам нужно будет следить за тем, кто что делает, сколько времени занимает один шаг и каковы зависимости между промежуточными результатами.

Это поможет вам разделить код на более мелкие части, которые могут быть выполнены агентом, специализирующимся только на этой задаче. Если это еще не сделано, подумайте о том, как структурирован ваш набор данных, чтобы его могли эффективно обрабатывать отдельные агенты. Это приводит к следующим вопросам:

  • Почему вы хотите распараллеливать код? Имеет ли смысл думать об этом в вашем конкретном случае и с точки зрения усилий?
  • Ваша программа предназначена для запуска только один раз или она будет запускаться регулярно с аналогичным набором данных?
  • Можете ли вы разделить свой алгоритм на несколько этапов выполнения?
  • Допускают ли ваши данные вообще распараллеливание? Если еще нет, то каким образом нужно адаптировать организацию ваших данных?
  • Какие промежуточные результаты ваших вычислений зависят друг от друга?
  • Какое изменение оборудования необходимо для этого?
  • Есть ли узкое место в аппаратном обеспечении или алгоритме, и как можно избежать или минимизировать влияние этих факторов?
  • Какие еще побочные эффекты распараллеливания могут возникнуть?

Возможный вариант использования - это основной процесс и демон, работающий в фоновом режиме (ведущий / ведомый), ожидающий активации. Кроме того, это может быть главный процесс, запускающий рабочие процессы, выполняемые по запросу. На практике основной процесс - это процесс подачи, который управляет двумя или более агентами, которым подаются части данных, и которые выполняют вычисления для данной части.

Имейте в виду, что распараллеливание требует больших затрат и времени из-за накладных расходов на подпроцессы, которые необходимы вашей операционной системе. По сравнению с выполнением двух или более задач линейным способом, выполняя это параллельно, вы можете сэкономить от 25 до 30 процентов времени на подпроцесс, в зависимости от вашего варианта использования. Например, две задачи, каждая из которых занимает 5 секунд, требуют в сумме 10 секунд при последовательном выполнении и могут потребовать в среднем около 8 секунд на многоядерной машине при распараллеливании. 3 из этих 8 секунд могут быть потеряны из-за накладных расходов, что ограничит ваши улучшения скорости.

Запуск функции параллельно с Python

Python предлагает четыре возможных способа справиться с этим. Во-первых, вы можете выполнять функции параллельно, используя модуль многопроцессорности. Во-вторых, альтернативой процессам являются потоки. Технически это легкие процессы, выходящие за рамки данной статьи. Для дальнейшего чтения вы можете взглянуть на модуль потоковой передачи Python. В-третьих, вы можете вызывать внешние программы с помощью метода system() os или методов, предоставляемых subprocess , и впоследствии собирать результаты.

Модуль multiprocessing предлагает хороший набор методов для параллельного выполнения подпрограмм. Сюда входят процессы, пулы агентов, очереди и каналы.

Листинг 1 работает с пулом из пяти агентов, которые обрабатывают блок из трех значений одновременно. Значения количества агентов и chunksize выбраны произвольно в демонстрационных целях. Отрегулируйте эти значения в соответствии с количеством ядер в вашем процессоре.

Метод Pool.map() требует трех параметров - функции, вызываемой для каждого элемента набора данных, самого набора данных и chunksize . В листинге 1 мы используем функцию с именем square вычисляет квадрат заданного целочисленного значения. Кроме того, можно не указывать chunksize Если не задан явно, chunksize по умолчанию равен 1.

Обратите внимание, что порядок выполнения агентов не гарантируется, но набор результатов находится в правильном порядке. Он содержит квадратные значения в соответствии с порядком элементов исходного набора данных.

Листинг 1: Параллельное выполнение функций

 from multiprocessing import Pool 
 
 def square(x): 
 # calculate the square of the value of x 
 return x*x 
 
 if __name__ == '__main__': 
 
 # Define the dataset 
 dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14] 
 
 # Output the dataset 
 print ('Dataset: ' + str(dataset)) 
 
 # Run this with a pool of 5 agents having a chunksize of 3 until finished 
 agents = 5 
 chunksize = 3 
 with Pool(processes=agents) as pool: 
 result = pool.map(square, dataset, chunksize) 
 
 # Output the result 
 print ('Result: ' + str(result)) 

Выполнение этого кода должно привести к следующему результату:

 $ python3 pool_multiprocessing.py 
 Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14] 
 Result: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196] 

Примечание . Для этих примеров мы будем использовать Python 3.

Запуск нескольких функций с использованием очереди

Как структура данных, очередь очень распространена и существует несколькими способами. Он организован как « первым пришел - первым обслужен» (FIFO) или «последним пришел - первым обслужен» (LIFO) / стек , а также с приоритетами и без них (очередь с приоритетами). Структура данных реализована как массив с фиксированным количеством записей или как список, содержащий переменное количество отдельных элементов.

В листингах 2.1–2.7 мы используем очередь FIFO. Он реализован в виде списка, который уже предоставлен соответствующим классом из модуля multiprocessing Кроме того, time который используется для имитации рабочей нагрузки.

Листинг 2.1: Используемые модули

 import multiprocessing 
 from time import sleep 

Затем определяется рабочая функция ( листинг 2.2 ). Эта функция фактически представляет агента и требует трех аргументов. Имя процесса указывает, что это за процесс, а tasks и results относятся к соответствующей очереди.

Внутри функция работника является бесконечным во while цикла. И tasks и results - это очереди, которые определены в основной программе. tasks.get() возвращает текущую задачу из очереди задач для обработки. Значение задачи меньше 0 покидает while цикл, и возвращает значение -1. Любое другое значение задачи выполнит вычисление (квадрат) и вернет это значение. Возврат значения в основную программу реализован как results.put() . Это добавляет вычисленное значение в конец очереди results

Листинг 2.2: Рабочая функция

 # define worker function 
 def calculate(process_name, tasks, results): 
 print('[%s] evaluation routine starts' % process_name) 
 
 while True: 
 new_value = tasks.get() 
 if new_value < 0: 
 print('[%s] evaluation routine quits' % process_name) 
 
 # Indicate finished 
 results.put(-1) 
 break 
 else: 
 # Compute result and mimic a long-running task 
 compute = new_value * new_value 
 sleep(0.02*new_value) 
 
 # Output which process received the value 
 # and the calculation result 
 print('[%s] received value: %i' % (process_name, new_value)) 
 print('[%s] calculated value: %i' % (process_name, compute)) 
 
 # Add result to the queue 
 results.put(compute) 
 
 return 

Следующим шагом является основной цикл (см. Листинг 2.3 ). Сначала определяется менеджер для межпроцессного взаимодействия (IPC). Затем добавляются две очереди - одна для задач, а другая для результатов.

Листинг 2.3: IPC и очереди

 if __name__ == "__main__": 
 # Define IPC manager 
 manager = multiprocessing.Manager() 
 
 # Define a list (queue) for tasks and computation results 
 tasks = manager.Queue() 
 results = manager.Queue() 

Выполнив эту настройку, мы определяем пул процессов с четырьмя рабочими процессами (агентами). Мы используем класс multiprocessing.Pool() и создаем его экземпляр. Затем мы определяем пустой список процессов (см. Листинг 2.4 ).

Листинг 2.4: Определение пула процессов

 # Create process pool with four processes 
 num_processes = 4 
 pool = multiprocessing.Pool(processes=num_processes) 
 processes = [] 

На следующем шаге мы запускаем четыре рабочих процесса (агента). Для простоты они называются от «P0» до «P3». Создание четырех рабочих процессов выполняется с помощью multiprocessing.Process() . Это связывает каждый из них с рабочей функцией, а также с задачей и очередью результатов. Наконец, мы добавляем вновь инициализированный процесс в конец списка процессов и запускаем новый процесс с помощью new_process.start() (см. Листинг 2.5 ).

Листинг 2.5: Подготовка рабочих процессов

 # Initiate the worker processes 
 for i in range(num_processes): 
 
 # Set process name 
 process_name = 'P%i' % i 
 
 # Create the process, and connect it to the worker function 
 new_process = multiprocessing.Process(target=calculate, args=(process_name,tasks,results)) 
 
 # Add new process to the list of processes 
 processes.append(new_process) 
 
 # Start the process 
 new_process.start() 

Наши рабочие процессы ждут работы. Мы определяем список задач, которые в нашем случае являются произвольно выбранными целыми числами. Эти значения добавляются в список задач с помощью tasks.put() . Каждый рабочий процесс ожидает выполнения задач и выбирает следующую доступную задачу из списка задач. Этим занимается сама очередь (см. Листинг 2.6 ).

Листинг 2.6: Подготовка очереди задач

 # Fill task queue 
 task_list = [43, 1, 780, 256, 142, 68, 183, 334, 325, 3] 
 for single_task in task_list: 
 tasks.put(single_task) 
 
 # Wait while the workers process 
 sleep(5) 

Через некоторое время мы хотим, чтобы наши агенты закончили. Каждый рабочий процесс реагирует на задачу со значением -1. Он интерпретирует это значение как сигнал завершения и после этого умирает. Вот почему мы помещаем в очередь задач столько -1, сколько у нас запущенных процессов. Перед смертью завершающийся процесс помещает -1 в очередь результатов. Это должно быть сигналом подтверждения для основного цикла, что агент завершает работу.

В основном цикле мы читаем из этой очереди и подсчитываем число -1. Основной цикл завершается, как только мы посчитаем столько подтверждений завершения, сколько у нас есть процессов. В противном случае выводим результат расчета из очереди.

Листинг 2.7: Завершение и вывод результатов

 # Quit the worker processes by sending them -1 
 for i in range(num_processes): 
 tasks.put(-1) 
 
 # Read calculation results 
 num_finished_processes = 0 
 while True: 
 # Read result 
 new_result = results.get() 
 
 # Have a look at the results 
 if new_result == -1: 
 # Process has finished 
 num_finished_processes += 1 
 
 if num_finished_processes == num_processes: 
 break 
 else: 
 # Output result 
 print('Result:' + str(new_result)) 

В примере 2 показан вывод программы Python. Запуская программу несколько раз, вы можете заметить, что порядок, в котором запускаются рабочие процессы, столь же непредсказуем, как и сам процесс, выбирающий задачу из очереди. Однако после завершения порядок элементов очереди результатов совпадает с порядком элементов очереди задач.

Пример 2

 $ python3 queue_multiprocessing.py 
 [P0] evaluation routine starts 
 [P1] evaluation routine starts 
 [P2] evaluation routine starts 
 [P3] evaluation routine starts 
 [P1] received value: 1 
 [P1] calculated value: 1 
 [P0] received value: 43 
 [P0] calculated value: 1849 
 [P0] received value: 68 
 [P0] calculated value: 4624 
 [P1] received value: 142 
 [P1] calculated value: 20164 
 result: 1 
 result: 1849 
 result: 4624 
 result: 20164 
 [P3] received value: 256 
 [P3] calculated value: 65536 
 result: 65536 
 [P0] received value: 183 
 [P0] calculated value: 33489 
 result: 33489 
 [P0] received value: 3 
 [P0] calculated value: 9 
 result: 9 
 [P0] evaluation routine quits 
 [P1] received value: 334 
 [P1] calculated value: 111556 
 result: 111556 
 [P1] evaluation routine quits 
 [P3] received value: 325 
 [P3] calculated value: 105625 
 result: 105625 
 [P3] evaluation routine quits 
 [P2] received value: 780 
 [P2] calculated value: 608400 
 result: 608400 
 [P2] evaluation routine quits 

Примечание . Как упоминалось ранее, ваш вывод может не точно совпадать с показанным выше, поскольку порядок выполнения непредсказуем.

Использование метода os.system ()

Метод system() является частью модуля os , который позволяет выполнять внешние программы командной строки в отдельном процессе от вашей программы Python. Метод system() - это блокирующий вызов, и вам нужно дождаться завершения вызова и возврата. Как фетишист UNIX / Linux вы знаете, что команду можно запускать в фоновом режиме и записывать вычисленный результат в выходной поток, который перенаправляется в такой файл (см. Пример 3 ):

Пример 3: Команда с перенаправлением вывода

 $ ./program >> outputfile & 

В программе Python вы просто инкапсулируете этот вызов, как показано ниже:

Листинг 3: Простой системный вызов с использованием модуля os

 import os 
 
 os.system("./program >> outputfile &") 

Этот системный вызов создает процесс, который выполняется параллельно вашей текущей программе Python. Получение результата может стать немного сложным, потому что этот вызов может завершиться после завершения вашей программы Python - вы никогда не узнаете.

Использование этого метода намного дороже, чем предыдущие методы, которые я описал. Во-первых, накладные расходы намного больше (переключение процесса), а во-вторых, данные записываются в физическую память, например на диск, что занимает больше времени. Хотя это лучший вариант, у вас ограниченная память (например, с ОЗУ), и вместо этого вы можете записывать большие выходные данные на твердотельный диск.

Использование модуля подпроцесса

Этот модуль предназначен для замены os.system() и os.spawn() . Идея подпроцесса состоит в том, чтобы упростить процессы порождения, общаться с ними через каналы и сигналы и собирать вывод, который они производят, включая сообщения об ошибках.

Начиная с Python 3.5, подпроцесс содержит метод subprocess.run() для запуска внешней команды, которая является оболочкой для базового класса subprocess.Popen() В качестве примера мы запускаем команду UNIX / Linux df -h чтобы узнать, сколько дискового пространства все еще доступно в разделе /home вашей машины. В программе Python вы выполняете этот вызов, как показано ниже ( листинг 4 ).

Листинг 4: Базовый пример запуска внешней команды

 import subprocess 
 
 ret = subprocess.run(["df", "-h", "/home"]) 
 print(ret) 

Это основной вызов, очень похожий на команду df -h /home , выполняемую в терминале. Обратите внимание, что параметры разделены списком, а не одной строкой. Результат будет аналогичен Примеру 4 . По сравнению с официальной документацией Python для этого модуля, он выводит результат вызова stdout в дополнение к возвращаемому значению вызова.

В примере 4 показан результат нашего вызова. Последняя строка вывода показывает успешное выполнение команды. Вызов subprocess.run() возвращает экземпляр класса CompletedProcess который имеет два атрибута с именами args (аргументы командной строки) и returncode (возвращаемое значение команды).

Пример 4: Запуск сценария Python из листинга 4

 $ python3 diskfree.py 
 Filesystem Size Used Avail Capacity iused ifree %iused Mounted on 
 /dev/sda3 233Gi 203Gi 30Gi 88% 53160407 7818407 87% /home 
 CompletedProcess(args=['df', '-h', '/home'], returncode=0) 

Чтобы подавить вывод на стандартный stdout и перехватить вывод и возвращаемое значение для дальнейшей оценки, вызов subprocess.run() должен быть немного изменен. Без дальнейших изменений subprocess.run() отправляет вывод выполненной команды на stdout который является каналом вывода базового процесса Python. Чтобы получить вывод, мы должны изменить это и установить канал вывода на предварительно определенное значение subprocess.PIPE . В листинге 5 показано, как это сделать.

Листинг 5: Захват вывода в конвейер

 import subprocess 
 
 # Call the command 
 output = subprocess.run(["df", "-h", "/home"], stdout=subprocess.PIPE) 
 
 # Read the return code and the output data 
 print ("Return code: %i" % output.returncode) 
 print ("Output data: %s" % output.stdout) 

Как объяснялось ранее, subprocess.run() возвращает экземпляр класса CompletedProcess . В листинге 5 этот экземпляр представляет собой переменную с простым именем output . Код возврата команды сохраняется в атрибуте output.returncode , а вывод, выводимый на стандартный stdout можно найти в атрибуте output.stdout . Имейте в виду, что это не касается обработки сообщений об ошибках, потому что мы не меняли для этого выходной канал.

Заключение

Параллельная обработка данных - прекрасная возможность использовать мощь современного оборудования. Python предоставляет вам доступ к этим методам на очень сложном уровне. Как вы уже видели ранее, multiprocessing и subprocess позволяет легко погрузиться в эту тему.

Благодарности

Автор благодарит Герольда Рупрехта за поддержку и критику при подготовке этой статьи.

Licensed under CC BY-NC-SA 4.0
comments powered by Disqus