Вступление
С увеличением количества ядер, доступных в процессорах в настоящее время, в сочетании с постоянно растущей потребностью в достижении большей пропускной способности, многопоточные API-интерфейсы становятся довольно популярными. Java предоставляет свою собственную многопоточную структуру, называемую Executor Framework .
Что такое Executor Framework?
Executor Framework содержит набор компонентов, которые используются для
эффективного управления рабочими потоками. Executor API отделяет
выполнение задачи от фактической задачи, которая должна выполняться
через Executors
. Этот дизайн - одна из реализаций паттерна "
производитель-потребитель".
java.util.concurrent.Executors
предоставляют фабричные методы, которые
используются для создания ThreadPools
потоков рабочих потоков.
Чтобы использовать Executor Framework, нам нужно создать один такой пул потоков и отправить ему задачу для выполнения. Работа Executor Framework состоит в том, чтобы запланировать и выполнить отправленные задачи и вернуть результаты из пула потоков.
В голову приходит основной вопрос: зачем нам нужны такие пулы потоков,
если мы можем создавать объекты java.lang.Thread
или реализовывать
Runnable
/ Callable
для достижения параллелизма?
Ответ сводится к двум основным фактам:
- Создание нового потока для новой задачи приводит к накладным расходам на создание и разрыв потока. Управление жизненным циклом этого потока значительно увеличивает время выполнения.
- Добавление нового потока для каждого процесса без какого-либо регулирования приводит к созданию большого количества потоков. Эти потоки занимают память и вызывают потерю ресурсов. ЦП начинает тратить слишком много времени на переключение контекстов, когда каждый поток выгружается и другой поток входит для выполнения.
Все эти факторы снижают пропускную способность системы. Пулы потоков
решают эту проблему, поддерживая потоки в рабочем состоянии и повторно
используя потоки. Любые дополнительные задачи, которые не могут быть
обработаны потоками в пуле, хранятся в Queue
. Когда какой-либо из
потоков освобождается, он берет следующую задачу из этой очереди. Эта
очередь задач по существу не ограничена для готовых исполнителей,
предоставляемых JDK.
Типы исполнителей
Теперь, когда у нас есть хорошее представление о том, что такое исполнитель, давайте также рассмотрим различные типы исполнителей.
SingleThreadExecutor
Этот исполнитель пула потоков имеет только один поток. Он используется для последовательного выполнения задач. Если поток умирает из-за исключения при выполнении задачи, создается новый поток, который заменяет старый поток, а последующие задачи выполняются в новом.
ExecutorService executorService = Executors.newSingleThreadExecutor()
FixedThreadPool (n)
Как видно из названия, это пул потоков с фиксированным числом потоков.
Задачи, отправленные исполнителю, выполняются n
потоками, и если есть
другие задачи, они сохраняются в LinkedBlockingQueue
. Это число
обычно является общим количеством потоков, поддерживаемых базовым
процессором.
ExecutorService executorService = Executors.newFixedThreadPool(4);
CachedThreadPool
Этот пул потоков в основном используется там, где нужно выполнить множество краткосрочных параллельных задач. В отличие от фиксированного пула потоков, количество потоков этого пула исполнителей не ограничено. Если все потоки заняты выполнением некоторых задач и приходит новая задача, пул создаст и добавит новый поток к исполнителю. Как только один из потоков освободится, он займется выполнением новых задач. Если поток остается бездействующим в течение шестидесяти секунд, он завершается и удаляется из кеша.
Однако, если управление выполняется неправильно или если задачи недолговечны, в пуле потоков будет много активных потоков. Это может привести к нехватке ресурсов и, как следствие, падению производительности.
ExecutorService executorService = Executors.newCachedThreadPool();
ScheduledExecutor
Этот исполнитель используется, когда у нас есть задача, которую необходимо запускать через определенные промежутки времени, или если мы хотим отложить выполнение определенной задачи.
ScheduledExecutorService scheduledExecService = Executors.newScheduledThreadPool(1);
Задачи можно запланировать в ScheduledExecutor
используя один из двух
методов scheduleAtFixedRate
или scheduleWithFixedDelay
.
scheduledExecService.scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
scheduledExecService.scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit)
Основное различие между этими двумя методами заключается в их интерпретации задержки между последовательными выполнениями запланированного задания.
scheduleAtFixedRate
выполняет задачу с фиксированным интервалом
независимо от того, когда закончилась предыдущая задача.
scheduleWithFixedDelay
запустит обратный отсчет задержки только после
завершения текущей задачи.
Понимание будущего объекта
Доступ к результату задачи, представленной для выполнения исполнителю,
можно получить с помощью java.util.concurrent.Future
возвращенного
исполнителем. Будущее можно рассматривать как обещание, данное
вызывающему исполнителем.
Future<String> result = executorService.submit(callableTask);
Задача, отправленная исполнителю, как и выше, является асинхронной, то
есть выполнение программы не ожидает завершения выполнения задачи, чтобы
перейти к следующему шагу. Вместо этого, всякий раз, когда выполнение
задачи завершается, оно устанавливается исполнителем Future
Вызывающий может продолжить выполнение основной программы, а когда
потребуется результат отправленной задачи, он может вызвать .get()
для
этого объекта Future
Если задача завершена, результат немедленно
возвращается вызывающей стороне, в противном случае вызывающая сторона
блокируется до тех пор, пока ее выполнение не будет завершено
исполнителем и не будет вычислен результат.
Если вызывающая сторона не может позволить себе ждать бесконечно перед
получением результата, это ожидание также может быть рассчитано по
времени. Это достигается с помощью
Future.get(long timeout, TimeUnit unit)
который TimeoutException
если результат не возвращается в установленный период времени.
Вызывающий может обработать это исключение и продолжить дальнейшее
выполнение программы.
Если при выполнении задачи возникает исключение, вызов метода get
вызовет исключение ExecutionException
.
В отношении результата, возвращаемого методом Future.get()
, важно то,
что он возвращается только в том случае, если отправленная задача
реализует java.util.concurrent.Callable
. Если задача реализует
Runnable
, вызов .get()
вернет значение null
после завершения
задачи.
Другой важный метод - это метод
Future.cancel(boolean mayInterruptIfRunning)
. Этот метод используется
для отмены выполнения отправленной задачи. Если задача уже выполняется,
исполнитель попытается прервать выполнение задачи, если
mayInterruptIfRunning
передан как true
.
Пример: создание и выполнение простого исполнителя
Теперь создадим задачу и попробуем выполнить ее в исполнителе фиксированного пула:
public class Task implements Callable<String> {
private String message;
public Task(String message) {
this.message = message;
}
@Override
public String call() throws Exception {
return "Hello " + message + "!";
}
}
Класс Task
реализует Callable
и параметризован для типа String
Также объявлено, что оно выбрасывает Exception
. Эта возможность
генерировать исключение для исполнителя и исполнителя, возвращающего это
исключение обратно вызывающей стороне, имеет большое значение, поскольку
она помогает вызывающей стороне узнать статус выполнения задачи.
Теперь выполним эту задачу:
public class ExecutorExample {
public static void main(String[] args) {
Task task = new Task("World");
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<String> result = executorService.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
System.out.println("Error occured while executing the submitted task");
e.printStackTrace();
}
executorService.shutdown();
}
}
Здесь мы создали FixedThreadPool
с количеством потоков 4, поскольку
эта демонстрация разработана на четырехъядерном процессоре. Количество
потоков может превышать количество ядер процессора, если выполняемые
задачи выполняют значительные операции ввода-вывода или тратят время на
ожидание внешних ресурсов.
Мы создали экземпляр Task
и передаем его исполнителю для выполнения.
Результат возвращается Future
, который затем выводится на экран.
ExecutorExample
и проверим его вывод:
Hello World!
Как и ожидалось, задача добавляет приветствие «Hello» и возвращает
результат через объект Future
И наконец, мы вызываем завершение работы executorService
чтобы
завершить все потоки и вернуть ресурсы обратно в ОС.
Метод .shutdown()
ожидает завершения текущих задач, переданных
исполнителю. Однако, если требуется немедленно завершить работу
исполнителя, не дожидаясь, мы можем вместо этого .shutdownNow()
Любые задачи, ожидающие выполнения, будут возвращены в объекте
java.util.List
Мы также можем создать ту же задачу, реализовав интерфейс Runnable
public class Task implements Runnable{
private String message;
public Task(String message) {
this.message = message;
}
public void run() {
System.out.println("Hello " + message + "!");
}
}
Когда мы реализуем runnable, здесь есть пара важных изменений.
- Результат выполнения задачи не может быть возвращен из метода
run()
Следовательно, мы печатаем прямо отсюда. - Метод
run()
не настроен на выдачу проверенных исключений.
Заключение
Многопоточность становится все более популярной, поскольку тактовую частоту процессора трудно увеличить. Однако управление жизненным циклом каждого потока очень сложно из-за его сложности.
В этой статье мы продемонстрировали эффективную, но простую многопоточную среду Executor Framework и объяснили ее различные компоненты. Мы также рассмотрели различные примеры создания задач отправки и выполнения в исполнителе.
Как всегда, код этого примера можно найти на GitHub .