Вступление
Когда вы запускаете программу на своем компьютере, она запускается в своем собственном «пузыре», который полностью отделен от других программ, которые активны в то же время. Этот «пузырь» называется процессом и включает в себя все, что необходимо для управления этим вызовом программы.
Например, эта так называемая среда процесса включает в себя страницы памяти, которые использует процесс, обрабатываемый этим процессом файл, права доступа пользователей и групп, а также весь вызов командной строки, включая заданные параметры.
Эта информация хранится в файловой системе процесса вашей системы UNIX / Linux, которая является виртуальной файловой системой и доступна через каталог / proc. Записи сортируются по идентификатору процесса, который уникален для каждого процесса. Пример 1 показывает это для произвольно выбранного процесса с идентификатором # 177.
Пример 1: Информация, доступная процессу
[email protected] :/proc/177# ls
attr cpuset limits net projid_map statm
autogroup cwd loginuid ns root status
auxv environ map_files numa_maps sched syscall
cgroup exe maps oom_adj sessionid task
clear_refs fd mem oom_score setgroups timers
cmdline fdinfo mountinfo oom_score_adj smaps uid_map
comm gid_map mounts pagemap stack wchan
coredump_filter io mountstats personality stat
Структурирование программного кода и данных
Чем сложнее становится программа, тем чаще бывает удобно разбивать ее на более мелкие части. Это относится не только к исходному коду, но и к коду, который выполняется на вашем компьютере. Одним из решений этого является использование подпроцессов в сочетании с параллельным выполнением. За этим стоят следующие мысли:
- Один процесс охватывает фрагмент кода, который можно запускать отдельно.
- Некоторые участки кода могут выполняться одновременно, что в принципе допускает распараллеливание.
- Использование возможностей современных процессоров и операционных систем, например каждого доступного ядра процессора, для сокращения общего времени выполнения программы.
- Чтобы снизить сложность вашей программы / кода и передать часть работы специализированным агентам, выступающим в качестве подпроцессов.
Использование подпроцессов требует от вас переосмысления способа выполнения вашей программы, от линейного к параллельному. Это похоже на изменение вашей рабочей точки зрения в компании с обычного работника на менеджера - вам нужно будет следить за тем, кто что делает, сколько времени занимает один шаг и каковы зависимости между промежуточными результатами.
Это поможет вам разделить код на более мелкие части, которые могут быть выполнены агентом, специализирующимся только на этой задаче. Если это еще не сделано, подумайте о том, как структурирован ваш набор данных, чтобы его могли эффективно обрабатывать отдельные агенты. Это приводит к следующим вопросам:
- Почему вы хотите распараллеливать код? Имеет ли смысл думать об этом в вашем конкретном случае и с точки зрения усилий?
- Ваша программа предназначена для запуска только один раз или она будет запускаться регулярно с аналогичным набором данных?
- Можете ли вы разделить свой алгоритм на несколько этапов выполнения?
- Допускают ли ваши данные вообще распараллеливание? Если еще нет, то каким образом нужно адаптировать организацию ваших данных?
- Какие промежуточные результаты ваших вычислений зависят друг от друга?
- Какое изменение оборудования необходимо для этого?
- Есть ли узкое место в аппаратном обеспечении или алгоритме, и как можно избежать или минимизировать влияние этих факторов?
- Какие еще побочные эффекты распараллеливания могут возникнуть?
Возможный вариант использования - это основной процесс и демон, работающий в фоновом режиме (ведущий / ведомый), ожидающий активации. Кроме того, это может быть главный процесс, запускающий рабочие процессы, выполняемые по запросу. На практике основной процесс - это процесс подачи, который управляет двумя или более агентами, которым подаются части данных, и которые выполняют вычисления для данной части.
Имейте в виду, что распараллеливание требует больших затрат и времени из-за накладных расходов на подпроцессы, которые необходимы вашей операционной системе. По сравнению с выполнением двух или более задач линейным способом, выполняя это параллельно, вы можете сэкономить от 25 до 30 процентов времени на подпроцесс, в зависимости от вашего варианта использования. Например, две задачи, каждая из которых занимает 5 секунд, требуют в сумме 10 секунд при последовательном выполнении и могут потребовать в среднем около 8 секунд на многоядерной машине при распараллеливании. 3 из этих 8 секунд могут быть потеряны из-за накладных расходов, что ограничит ваши улучшения скорости.
Запуск функции параллельно с Python
Python предлагает четыре возможных способа справиться с этим. Во-первых,
вы можете выполнять функции параллельно, используя модуль
многопроцессорности.
Во-вторых, альтернативой процессам являются потоки. Технически это
легкие процессы, выходящие за рамки данной статьи. Для дальнейшего
чтения вы можете взглянуть на модуль потоковой
передачи Python.
В-третьих, вы можете вызывать внешние программы с помощью метода
system()
os
или методов, предоставляемых subprocess
, и
впоследствии собирать результаты.
Модуль multiprocessing
предлагает хороший набор методов для
параллельного выполнения подпрограмм. Сюда входят процессы, пулы
агентов, очереди и каналы.
Листинг 1 работает с пулом из пяти агентов, которые обрабатывают блок
из трех значений одновременно. Значения количества агентов и chunksize
выбраны произвольно в демонстрационных целях. Отрегулируйте эти значения
в соответствии с количеством ядер в вашем процессоре.
Метод Pool.map()
требует трех параметров - функции, вызываемой для
каждого элемента набора данных, самого набора данных и chunksize
. В
листинге 1 мы используем функцию с именем square
вычисляет квадрат
заданного целочисленного значения. Кроме того, можно не указывать
chunksize
Если не задан явно, chunksize
по умолчанию равен 1.
Обратите внимание, что порядок выполнения агентов не гарантируется, но набор результатов находится в правильном порядке. Он содержит квадратные значения в соответствии с порядком элементов исходного набора данных.
Листинг 1: Параллельное выполнение функций
from multiprocessing import Pool
def square(x):
# calculate the square of the value of x
return x*x
if __name__ == '__main__':
# Define the dataset
dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
# Output the dataset
print ('Dataset: ' + str(dataset))
# Run this with a pool of 5 agents having a chunksize of 3 until finished
agents = 5
chunksize = 3
with Pool(processes=agents) as pool:
result = pool.map(square, dataset, chunksize)
# Output the result
print ('Result: ' + str(result))
Выполнение этого кода должно привести к следующему результату:
$ python3 pool_multiprocessing.py
Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
Result: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]
Примечание . Для этих примеров мы будем использовать Python 3.
Запуск нескольких функций с использованием очереди
Как структура данных, очередь очень распространена и существует несколькими способами. Он организован как « первым пришел - первым обслужен» (FIFO) или «последним пришел - первым обслужен» (LIFO) / стек , а также с приоритетами и без них (очередь с приоритетами). Структура данных реализована как массив с фиксированным количеством записей или как список, содержащий переменное количество отдельных элементов.
В листингах 2.1–2.7 мы используем очередь FIFO. Он реализован в виде
списка, который уже предоставлен соответствующим классом из модуля
multiprocessing
Кроме того, time
который используется для имитации
рабочей нагрузки.
Листинг 2.1: Используемые модули
import multiprocessing
from time import sleep
Затем определяется рабочая функция ( листинг 2.2 ). Эта функция
фактически представляет агента и требует трех аргументов. Имя процесса
указывает, что это за процесс, а tasks
и results
относятся к
соответствующей очереди.
Внутри функция работника является бесконечным во while
цикла. И
tasks
и results
- это очереди, которые определены в основной
программе. tasks.get()
возвращает текущую задачу из очереди задач для
обработки. Значение задачи меньше 0 покидает while
цикл, и возвращает
значение -1. Любое другое значение задачи выполнит вычисление (квадрат)
и вернет это значение. Возврат значения в основную программу реализован
как results.put()
. Это добавляет вычисленное значение в конец очереди
results
Листинг 2.2: Рабочая функция
# define worker function
def calculate(process_name, tasks, results):
print('[%s] evaluation routine starts' % process_name)
while True:
new_value = tasks.get()
if new_value < 0:
print('[%s] evaluation routine quits' % process_name)
# Indicate finished
results.put(-1)
break
else:
# Compute result and mimic a long-running task
compute = new_value * new_value
sleep(0.02*new_value)
# Output which process received the value
# and the calculation result
print('[%s] received value: %i' % (process_name, new_value))
print('[%s] calculated value: %i' % (process_name, compute))
# Add result to the queue
results.put(compute)
return
Следующим шагом является основной цикл (см. Листинг 2.3 ). Сначала определяется менеджер для межпроцессного взаимодействия (IPC). Затем добавляются две очереди - одна для задач, а другая для результатов.
Листинг 2.3: IPC и очереди
if __name__ == "__main__":
# Define IPC manager
manager = multiprocessing.Manager()
# Define a list (queue) for tasks and computation results
tasks = manager.Queue()
results = manager.Queue()
Выполнив эту настройку, мы определяем пул процессов с четырьмя рабочими
процессами (агентами). Мы используем класс multiprocessing.Pool()
и
создаем его экземпляр. Затем мы определяем пустой список процессов (см.
Листинг 2.4 ).
Листинг 2.4: Определение пула процессов
# Create process pool with four processes
num_processes = 4
pool = multiprocessing.Pool(processes=num_processes)
processes = []
На следующем шаге мы запускаем четыре рабочих процесса (агента). Для
простоты они называются от «P0» до «P3». Создание четырех рабочих
процессов выполняется с помощью multiprocessing.Process()
. Это
связывает каждый из них с рабочей функцией, а также с задачей и очередью
результатов. Наконец, мы добавляем вновь инициализированный процесс в
конец списка процессов и запускаем новый процесс с помощью
new_process.start()
(см. Листинг 2.5 ).
Листинг 2.5: Подготовка рабочих процессов
# Initiate the worker processes
for i in range(num_processes):
# Set process name
process_name = 'P%i' % i
# Create the process, and connect it to the worker function
new_process = multiprocessing.Process(target=calculate, args=(process_name,tasks,results))
# Add new process to the list of processes
processes.append(new_process)
# Start the process
new_process.start()
Наши рабочие процессы ждут работы. Мы определяем список задач, которые в
нашем случае являются произвольно выбранными целыми числами. Эти
значения добавляются в список задач с помощью tasks.put()
. Каждый
рабочий процесс ожидает выполнения задач и выбирает следующую доступную
задачу из списка задач. Этим занимается сама очередь (см. Листинг 2.6
).
Листинг 2.6: Подготовка очереди задач
# Fill task queue
task_list = [43, 1, 780, 256, 142, 68, 183, 334, 325, 3]
for single_task in task_list:
tasks.put(single_task)
# Wait while the workers process
sleep(5)
Через некоторое время мы хотим, чтобы наши агенты закончили. Каждый рабочий процесс реагирует на задачу со значением -1. Он интерпретирует это значение как сигнал завершения и после этого умирает. Вот почему мы помещаем в очередь задач столько -1, сколько у нас запущенных процессов. Перед смертью завершающийся процесс помещает -1 в очередь результатов. Это должно быть сигналом подтверждения для основного цикла, что агент завершает работу.
В основном цикле мы читаем из этой очереди и подсчитываем число -1. Основной цикл завершается, как только мы посчитаем столько подтверждений завершения, сколько у нас есть процессов. В противном случае выводим результат расчета из очереди.
Листинг 2.7: Завершение и вывод результатов
# Quit the worker processes by sending them -1
for i in range(num_processes):
tasks.put(-1)
# Read calculation results
num_finished_processes = 0
while True:
# Read result
new_result = results.get()
# Have a look at the results
if new_result == -1:
# Process has finished
num_finished_processes += 1
if num_finished_processes == num_processes:
break
else:
# Output result
print('Result:' + str(new_result))
В примере 2 показан вывод программы Python. Запуская программу несколько раз, вы можете заметить, что порядок, в котором запускаются рабочие процессы, столь же непредсказуем, как и сам процесс, выбирающий задачу из очереди. Однако после завершения порядок элементов очереди результатов совпадает с порядком элементов очереди задач.
Пример 2
$ python3 queue_multiprocessing.py
[P0] evaluation routine starts
[P1] evaluation routine starts
[P2] evaluation routine starts
[P3] evaluation routine starts
[P1] received value: 1
[P1] calculated value: 1
[P0] received value: 43
[P0] calculated value: 1849
[P0] received value: 68
[P0] calculated value: 4624
[P1] received value: 142
[P1] calculated value: 20164
result: 1
result: 1849
result: 4624
result: 20164
[P3] received value: 256
[P3] calculated value: 65536
result: 65536
[P0] received value: 183
[P0] calculated value: 33489
result: 33489
[P0] received value: 3
[P0] calculated value: 9
result: 9
[P0] evaluation routine quits
[P1] received value: 334
[P1] calculated value: 111556
result: 111556
[P1] evaluation routine quits
[P3] received value: 325
[P3] calculated value: 105625
result: 105625
[P3] evaluation routine quits
[P2] received value: 780
[P2] calculated value: 608400
result: 608400
[P2] evaluation routine quits
Примечание . Как упоминалось ранее, ваш вывод может не точно совпадать с показанным выше, поскольку порядок выполнения непредсказуем.
Использование метода os.system ()
Метод system()
является частью модуля
os , который позволяет
выполнять внешние программы командной строки в отдельном процессе от
вашей программы Python. Метод system()
- это блокирующий вызов, и вам
нужно дождаться завершения вызова и возврата. Как фетишист UNIX / Linux
вы знаете, что команду можно запускать в фоновом режиме и записывать
вычисленный результат в выходной поток, который перенаправляется в такой
файл (см. Пример 3 ):
Пример 3: Команда с перенаправлением вывода
$ ./program >> outputfile &
В программе Python вы просто инкапсулируете этот вызов, как показано ниже:
Листинг 3: Простой системный вызов с использованием модуля os
import os
os.system("./program >> outputfile &")
Этот системный вызов создает процесс, который выполняется параллельно вашей текущей программе Python. Получение результата может стать немного сложным, потому что этот вызов может завершиться после завершения вашей программы Python - вы никогда не узнаете.
Использование этого метода намного дороже, чем предыдущие методы, которые я описал. Во-первых, накладные расходы намного больше (переключение процесса), а во-вторых, данные записываются в физическую память, например на диск, что занимает больше времени. Хотя это лучший вариант, у вас ограниченная память (например, с ОЗУ), и вместо этого вы можете записывать большие выходные данные на твердотельный диск.
Использование модуля подпроцесса
Этот модуль предназначен для замены os.system()
и os.spawn()
. Идея
подпроцесса состоит в том,
чтобы упростить процессы порождения, общаться с ними через каналы и
сигналы и собирать вывод, который они производят, включая сообщения об
ошибках.
Начиная с Python 3.5, подпроцесс содержит метод subprocess.run()
для
запуска внешней команды, которая является оболочкой для базового класса
subprocess.Popen()
В качестве примера мы запускаем команду UNIX /
Linux df -h
чтобы узнать, сколько дискового пространства все еще
доступно в разделе /home
вашей машины. В программе Python вы
выполняете этот вызов, как показано ниже ( листинг 4 ).
Листинг 4: Базовый пример запуска внешней команды
import subprocess
ret = subprocess.run(["df", "-h", "/home"])
print(ret)
Это основной вызов, очень похожий на команду df -h /home
, выполняемую
в терминале. Обратите внимание, что параметры разделены списком, а не
одной строкой. Результат будет аналогичен Примеру 4 . По сравнению с
официальной документацией Python для этого модуля, он выводит результат
вызова stdout
в дополнение к возвращаемому значению вызова.
В примере 4 показан результат нашего вызова. Последняя строка вывода
показывает успешное выполнение команды. Вызов subprocess.run()
возвращает экземпляр класса CompletedProcess
который имеет два
атрибута с именами args
(аргументы командной строки) и returncode
(возвращаемое значение команды).
Пример 4: Запуск сценария Python из листинга 4
$ python3 diskfree.py
Filesystem Size Used Avail Capacity iused ifree %iused Mounted on
/dev/sda3 233Gi 203Gi 30Gi 88% 53160407 7818407 87% /home
CompletedProcess(args=['df', '-h', '/home'], returncode=0)
Чтобы подавить вывод на стандартный stdout
и перехватить вывод и
возвращаемое значение для дальнейшей оценки, вызов subprocess.run()
должен быть немного изменен. Без дальнейших изменений subprocess.run()
отправляет вывод выполненной команды на stdout
который является
каналом вывода базового процесса Python. Чтобы получить вывод, мы должны
изменить это и установить канал вывода на предварительно определенное
значение subprocess.PIPE
. В листинге 5 показано, как это сделать.
Листинг 5: Захват вывода в конвейер
import subprocess
# Call the command
output = subprocess.run(["df", "-h", "/home"], stdout=subprocess.PIPE)
# Read the return code and the output data
print ("Return code: %i" % output.returncode)
print ("Output data: %s" % output.stdout)
Как объяснялось ранее, subprocess.run()
возвращает экземпляр класса
CompletedProcess
. В листинге 5 этот экземпляр представляет собой
переменную с простым именем output
. Код возврата команды сохраняется
в атрибуте output.returncode
, а вывод, выводимый на стандартный
stdout
можно найти в атрибуте output.stdout
. Имейте в виду, что это
не касается обработки сообщений об ошибках, потому что мы не меняли для
этого выходной канал.
Заключение
Параллельная обработка данных - прекрасная возможность использовать мощь
современного оборудования. Python предоставляет вам доступ к этим
методам на очень сложном уровне. Как вы уже видели ранее,
multiprocessing
и subprocess
позволяет легко погрузиться в эту тему.
Благодарности
Автор благодарит Герольда Рупрехта за поддержку и критику при подготовке этой статьи.