Параллелизм в Java: платформа Executor Framework

Введение С увеличением количества ядер, доступных в процессорах в настоящее время, в сочетании с постоянно растущей потребностью в достижении большей пропускной способности, многопоточные API становятся довольно популярными. Java предоставляет свою собственную многопоточную структуру, называемую Executor Framework [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html]. Что такое Executor Framework? Executor Framework содержит набор компонентов, которые используются для эффективного управления рабочими процессами.

Вступление

С увеличением количества ядер, доступных в процессорах в настоящее время, в сочетании с постоянно растущей потребностью в достижении большей пропускной способности, многопоточные API-интерфейсы становятся довольно популярными. Java предоставляет свою собственную многопоточную структуру, называемую Executor Framework .

Что такое Executor Framework?

Executor Framework содержит набор компонентов, которые используются для эффективного управления рабочими потоками. Executor API отделяет выполнение задачи от фактической задачи, которая должна выполняться через Executors . Этот дизайн - одна из реализаций паттерна " производитель-потребитель".

java.util.concurrent.Executors предоставляют фабричные методы, которые используются для создания ThreadPools потоков рабочих потоков.

Чтобы использовать Executor Framework, нам нужно создать один такой пул потоков и отправить ему задачу для выполнения. Работа Executor Framework состоит в том, чтобы запланировать и выполнить отправленные задачи и вернуть результаты из пула потоков.

В голову приходит основной вопрос: зачем нам нужны такие пулы потоков, если мы можем создавать объекты java.lang.Thread или реализовывать Runnable / Callable для достижения параллелизма?

Ответ сводится к двум основным фактам:

  1. Создание нового потока для новой задачи приводит к накладным расходам на создание и разрыв потока. Управление жизненным циклом этого потока значительно увеличивает время выполнения.
  2. Добавление нового потока для каждого процесса без какого-либо регулирования приводит к созданию большого количества потоков. Эти потоки занимают память и вызывают потерю ресурсов. ЦП начинает тратить слишком много времени на переключение контекстов, когда каждый поток выгружается и другой поток входит для выполнения.

Все эти факторы снижают пропускную способность системы. Пулы потоков решают эту проблему, поддерживая потоки в рабочем состоянии и повторно используя потоки. Любые дополнительные задачи, которые не могут быть обработаны потоками в пуле, хранятся в Queue . Когда какой-либо из потоков освобождается, он берет следующую задачу из этой очереди. Эта очередь задач по существу не ограничена для готовых исполнителей, предоставляемых JDK.

Типы исполнителей

Теперь, когда у нас есть хорошее представление о том, что такое исполнитель, давайте также рассмотрим различные типы исполнителей.

SingleThreadExecutor

Этот исполнитель пула потоков имеет только один поток. Он используется для последовательного выполнения задач. Если поток умирает из-за исключения при выполнении задачи, создается новый поток, который заменяет старый поток, а последующие задачи выполняются в новом.

 ExecutorService executorService = Executors.newSingleThreadExecutor() 

FixedThreadPool (n)

Как видно из названия, это пул потоков с фиксированным числом потоков. Задачи, отправленные исполнителю, выполняются n потоками, и если есть другие задачи, они сохраняются в LinkedBlockingQueue . Это число обычно является общим количеством потоков, поддерживаемых базовым процессором.

 ExecutorService executorService = Executors.newFixedThreadPool(4); 

CachedThreadPool

Этот пул потоков в основном используется там, где нужно выполнить множество краткосрочных параллельных задач. В отличие от фиксированного пула потоков, количество потоков этого пула исполнителей не ограничено. Если все потоки заняты выполнением некоторых задач и приходит новая задача, пул создаст и добавит новый поток к исполнителю. Как только один из потоков освободится, он займется выполнением новых задач. Если поток остается бездействующим в течение шестидесяти секунд, он завершается и удаляется из кеша.

Однако, если управление выполняется неправильно или если задачи недолговечны, в пуле потоков будет много активных потоков. Это может привести к нехватке ресурсов и, как следствие, падению производительности.

 ExecutorService executorService = Executors.newCachedThreadPool(); 

ScheduledExecutor

Этот исполнитель используется, когда у нас есть задача, которую необходимо запускать через определенные промежутки времени, или если мы хотим отложить выполнение определенной задачи.

 ScheduledExecutorService scheduledExecService = Executors.newScheduledThreadPool(1); 

Задачи можно запланировать в ScheduledExecutor используя один из двух методов scheduleAtFixedRate или scheduleWithFixedDelay .

 scheduledExecService.scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 

 scheduledExecService.scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit) 

Основное различие между этими двумя методами заключается в их интерпретации задержки между последовательными выполнениями запланированного задания.

scheduleAtFixedRate выполняет задачу с фиксированным интервалом независимо от того, когда закончилась предыдущая задача.

scheduleWithFixedDelay запустит обратный отсчет задержки только после завершения текущей задачи.

Понимание будущего объекта

Доступ к результату задачи, представленной для выполнения исполнителю, можно получить с помощью java.util.concurrent.Future возвращенного исполнителем. Будущее можно рассматривать как обещание, данное вызывающему исполнителем.

 Future<String> result = executorService.submit(callableTask); 

Задача, отправленная исполнителю, как и выше, является асинхронной, то есть выполнение программы не ожидает завершения выполнения задачи, чтобы перейти к следующему шагу. Вместо этого, всякий раз, когда выполнение задачи завершается, оно устанавливается исполнителем Future

Вызывающий может продолжить выполнение основной программы, а когда потребуется результат отправленной задачи, он может вызвать .get() для этого объекта Future Если задача завершена, результат немедленно возвращается вызывающей стороне, в противном случае вызывающая сторона блокируется до тех пор, пока ее выполнение не будет завершено исполнителем и не будет вычислен результат.

Если вызывающая сторона не может позволить себе ждать бесконечно перед получением результата, это ожидание также может быть рассчитано по времени. Это достигается с помощью Future.get(long timeout, TimeUnit unit) который TimeoutException если результат не возвращается в установленный период времени. Вызывающий может обработать это исключение и продолжить дальнейшее выполнение программы.

Если при выполнении задачи возникает исключение, вызов метода get вызовет исключение ExecutionException .

В отношении результата, возвращаемого методом Future.get() , важно то, что он возвращается только в том случае, если отправленная задача реализует java.util.concurrent.Callable . Если задача реализует Runnable , вызов .get() вернет значение null после завершения задачи.

Другой важный метод - это метод Future.cancel(boolean mayInterruptIfRunning) . Этот метод используется для отмены выполнения отправленной задачи. Если задача уже выполняется, исполнитель попытается прервать выполнение задачи, если mayInterruptIfRunning передан как true .

Пример: создание и выполнение простого исполнителя

Теперь создадим задачу и попробуем выполнить ее в исполнителе фиксированного пула:

 public class Task implements Callable<String> { 
 
 private String message; 
 
 public Task(String message) { 
 this.message = message; 
 } 
 
 @Override 
 public String call() throws Exception { 
 return "Hello " + message + "!"; 
 } 
 } 

Класс Task реализует Callable и параметризован для типа String Также объявлено, что оно выбрасывает Exception . Эта возможность генерировать исключение для исполнителя и исполнителя, возвращающего это исключение обратно вызывающей стороне, имеет большое значение, поскольку она помогает вызывающей стороне узнать статус выполнения задачи.

Теперь выполним эту задачу:

 public class ExecutorExample { 
 public static void main(String[] args) { 
 
 Task task = new Task("World"); 
 
 ExecutorService executorService = Executors.newFixedThreadPool(4); 
 Future<String> result = executorService.submit(task); 
 
 try { 
 System.out.println(result.get()); 
 } catch (InterruptedException | ExecutionException e) { 
 System.out.println("Error occured while executing the submitted task"); 
 e.printStackTrace(); 
 } 
 
 executorService.shutdown(); 
 } 
 } 

Здесь мы создали FixedThreadPool с количеством потоков 4, поскольку эта демонстрация разработана на четырехъядерном процессоре. Количество потоков может превышать количество ядер процессора, если выполняемые задачи выполняют значительные операции ввода-вывода или тратят время на ожидание внешних ресурсов.

Мы создали экземпляр Task и передаем его исполнителю для выполнения. Результат возвращается Future , который затем выводится на экран.

ExecutorExample и проверим его вывод:

 Hello World! 

Как и ожидалось, задача добавляет приветствие «Hello» и возвращает результат через объект Future

И наконец, мы вызываем завершение работы executorService чтобы завершить все потоки и вернуть ресурсы обратно в ОС.

Метод .shutdown() ожидает завершения текущих задач, переданных исполнителю. Однако, если требуется немедленно завершить работу исполнителя, не дожидаясь, мы можем вместо этого .shutdownNow()

Любые задачи, ожидающие выполнения, будут возвращены в объекте java.util.List

Мы также можем создать ту же задачу, реализовав интерфейс Runnable

 public class Task implements Runnable{ 
 
 private String message; 
 
 public Task(String message) { 
 this.message = message; 
 } 
 
 public void run() { 
 System.out.println("Hello " + message + "!"); 
 } 
 } 

Когда мы реализуем runnable, здесь есть пара важных изменений.

  1. Результат выполнения задачи не может быть возвращен из метода run() Следовательно, мы печатаем прямо отсюда.
  2. Метод run() не настроен на выдачу проверенных исключений.

Заключение

Многопоточность становится все более популярной, поскольку тактовую частоту процессора трудно увеличить. Однако управление жизненным циклом каждого потока очень сложно из-за его сложности.

В этой статье мы продемонстрировали эффективную, но простую многопоточную среду Executor Framework и объяснили ее различные компоненты. Мы также рассмотрели различные примеры создания задач отправки и выполнения в исполнителе.

Как всегда, код этого примера можно найти на GitHub .

comments powered by Disqus