Java ExecutorService

原文:https://www.studytonight.com/java-examples/java-executorservice

ExecutorService是用于在后台并发执行异步任务的接口。ExecutorService 维护一个线程池,并将任务分配给这些线程。当没有空闲线程可用时,它还维护一个队列来存储任务。

在本教程中,我们将学习如何在 Java 中使用 ExecutorService。

创建执行器服务

我们可以使用 Executors 类的三种不同工厂方法来创建一个 ExecutorService。

  • NewsingLeadExecutor()方法创建一个具有单线程的执行器。
  • NewFixedThreadPool(int poolSize)方法用 PoolSize 提到的固定线程数创建一个执行器。
  • newScheduledThreadPool(int pool size)方法创建一个具有上述线程池大小的执行器,该执行器可以在给定延迟后调度任务。它还可以定期执行任务。

下面的代码演示了这些方法的工作原理。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo
{
    public static void main(String[] args) throws InterruptedException
    {
        Runnable task = new Runnable() {
            public void run() {
                System.out.println("Performing a Task...");
            }
        };        
        ExecutorService exeService1 = Executors.newSingleThreadExecutor();
        exeService1.execute(task);

        ExecutorService exeService2 = Executors.newFixedThreadPool(5);
        exeService2.execute(task);

        ExecutorService exeService3 = Executors.newScheduledThreadPool(5);    
        exeService3.execute(task);
    }
}

执行任务... 执行任务... 执行任务...

我们也可以选择一个执行器服务实现,比如线程池执行器类或者调度线程池执行器类。

将任务分配给执行器服务

我们可以使用 execute()、submit()、invokeAny()或 invokeAll()方法来使用 ExecutorService 分配和执行任务。

执行()方法

execute()方法将一个 Runnable 任务作为参数,并异步执行它。它不会返回任何内容,我们也无法检查任务状态。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo
{
    public static void main(String[] args) throws InterruptedException
    {
        Runnable task = new Runnable() {
            public void run() {
                System.out.println("Performing a Task...");
            }
        };

        ExecutorService exeService = Executors.newSingleThreadExecutor();

        exeService.execute(task);
    }
}

执行任务...

提交()方法

submit()方法可以执行一个 Runnable 或 Callable 任务。与 execute()不同,它将以 Future 对象的形式返回任务状态。我们可以在 Future 对象上使用 get()方法来检查任务状态。如果可运行任务成功执行,并且调用()方法的结果为可调用任务,get()方法将返回 null。

对于可运行任务:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Demo
{
    public static void main(String[] args) throws InterruptedException, ExecutionException
    {
        Runnable task = new Runnable() {
            public void run() {
                System.out.println("Performing a Runnable Task...");
            }
        };        
        ExecutorService exeService = Executors.newSingleThreadExecutor();

        Future status = exeService.submit(task);
        System.out.println("Status: " + status.get());
    }
}

执行可运行任务... 状态:空

对于可调用任务

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Demo
{
    public static void main(String[] args) throws InterruptedException, ExecutionException
    {
        Callable task = new Callable() {
            public Object call() throws Exception{
                System.out.println("Performing a Callable Task...");
                return "Callable Task Performed";
            }
        };

        ExecutorService exeService = Executors.newSingleThreadExecutor();

        Future status = exeService.submit(task);
        System.out.println("Status: " + status.get());
    }
}

执行可调用任务... 状态:可调用任务已执行

invokeAny()方法

invokeAny()方法获取一组可调用任务并执行它们。它将返回任何成功执行的任务的状态。状态通过使用未来对象返回。

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo
{
    public static void main(String[] args) throws InterruptedException, ExecutionException
    {
        //Creating callable tasks
        Callable task1 = new Callable() {
            public Object call() throws Exception{
                System.out.println("Performing a Callable Task1...");
                return "Callable Task1 Performed";
            }
        };        
        Callable task2 = new Callable() {
            public Object call() throws Exception{
                System.out.println("Performing a Callable Task2...");
                return "Callable Task2 Performed";
            }
        };        
        Callable task3 = new Callable() {
            public Object call() throws Exception{
                System.out.println("Performing a Callable Task3...");
                return "Callable Task3 Performed";
            }
        };
        //Creating a list of callable tasks
        ArrayList<Callable<String>> callableTasks = new ArrayList<>();
        callableTasks.add(task1);
        callableTasks.add(task2);
        callableTasks.add(task3);

        ExecutorService exeService = Executors.newSingleThreadExecutor();
        String status = exeService.invokeAny(callableTasks);
        System.out.println("Status: " + status);
    }
}

执行可调用任务 1... 执行可调用任务 2... 执行可调用任务 3... 状态:可调用任务 1 已执行

invokeAll()方法还获取可调用任务的集合并执行它们。但是与 invokeAny()方法不同,它将返回一个包含所有已执行任务状态的列表。状态通过使用未来对象返回。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Demo
{
    public static void main(String[] args) throws InterruptedException, ExecutionException
    {
        //Creating callable tasks
        Callable task1 = new Callable() {
            public Object call() throws Exception{
                return "Callable Task1 Performed";
            }
        };

        Callable task2 = new Callable() {
            public Object call() throws Exception{
                return "Callable Task2 Performed";
            }
        };

        Callable task3 = new Callable() {
            public Object call() throws Exception{
                return "Callable Task3 Performed";
            }
        };
        //Creating a list of callable tasks
        ArrayList<Callable<String>> callableTasks = new ArrayList<>();
        callableTasks.add(task1);
        callableTasks.add(task2);
        callableTasks.add(task3);

        ExecutorService exeService = Executors.newSingleThreadExecutor();
        List<Future<String>> status = exeService.invokeAll(callableTasks);

        //Printing the status
        for(Future f : status)
            System.out.println(f.get());
    }
}

可调用任务 1 已执行 可调用任务 2 已执行 可调用任务 3 已执行

关闭执行器服务

当 ExecutorService 没有更多任务要执行时,它不会自动关闭。它将继续等待未来可能出现的新任务。执行器服务内部的活动线程将继续运行,并且不允许 JVM 停止。

关机()方法

我们可以使用 shutdown()方法来停止 ExecutorService。此方法将阻止 ExecutorService 接受新任务。当所有现有任务完成后,执行器服务将关闭。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Demo
{
    public static void main(String[] args) throws InterruptedException
    {
        Runnable task1 = new Runnable() {
            public void run() {
                System.out.println("Performing a Task1...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.print(e);
                }
            }
        };
        Runnable task2 = new Runnable() {
            public void run() {
                System.out.println("Performing a Task2...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.print(e);
                }
            }
        };
        Runnable task3 = new Runnable() {
            public void run() {
                System.out.println("Performing a Task3...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.print(e);
                }
            }
        };

        ExecutorService exeService = Executors.newSingleThreadExecutor();

        exeService.execute(task1);
        exeService.execute(task2);
        exeService.execute(task3);

        exeService.shutdown();//All Tasks will be executed
    }
}

执行任务 1... 执行任务 2... 执行任务 3...

关闭现在()方法

shutdownNow()方法用于立即停止正在运行的任务。这种方法会尽最大努力停止所有正在运行的任务,但不能保证。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Demo
{
    public static void main(String[] args) throws InterruptedException
    {
        Runnable task1 = new Runnable() {
            public void run() {
                System.out.println("Performing a Task1...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.print(e);
                }
            }
        };
        Runnable task2 = new Runnable() {
            public void run() {
                System.out.println("Performing a Task2...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.print(e);
                }
            }
        };
        Runnable task3 = new Runnable() {
            public void run() {
                System.out.println("Performing a Task3...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.print(e);
                }
            }
        };

        ExecutorService exeService = Executors.newSingleThreadExecutor();

        exeService.execute(task1);
        exeService.execute(task2);
        exeService.execute(task3);

        exeService.shutdownNow();//Only a few Tasks will be executed
    }
}

执行任务 1... java.lang .断续异常:睡眠中断

预警终端()方法

关闭执行器服务的推荐方法是将 shut down()和 shutdownNow()方法与 awaitTermination()方法一起使用。

首先,ExecutorService 将停止使用 shutdown()或 shutdownNow()方法接受新任务。然后,awaitTermination()要么等待所有任务完成执行,要么等待超时发生,无论哪个先发生。awaitTermination()方法的常见实现如下所示。

public static void shutdownUsingAwaitTermination(ExecutorService exeService)
{
    exeService.shutdown();
    try {
        //Wait till timeout for existing tasks to end
        if(!exeService.awaitTermination(10, TimeUnit.SECONDS))//Wait till timeout for existing tasks to end
            exeService.shutdownNow();//Cancel executing tasks

        if(!exeService.awaitTermination(20, TimeUnit.SECONDS))//Wait for the tasks to be cancelled
            System.err.println("ExecutorService did not terminate");
        }
    catch(Exception e){
        exeService.shutdownNow();
    }
}

调度执行服务接口

ScheduledExecutorService 可以在一定延迟后运行任务。它还可以以固定的速率或在固定的延迟后重复运行任务。

让我们在初始延迟 5 秒后执行一个可运行的任务。我们将为此使用 schedule() 命令。

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Demo
{
    public static void main(String[] args) throws InterruptedException
    {
        Runnable runnableTask = new Runnable() {
            public void run()
            {
                System.out.println("Task Performed At: " + new Date());
            }
        };

        System.out.println("Current Date and Time: " + new Date());
        long delay  = 5;
        ScheduledExecutorService exeService = Executors.newSingleThreadScheduledExecutor();
        exeService.schedule(runnableTask, delay, TimeUnit.SECONDS);
    }
}

当前日期和时间:2021 年 8 月 05 日星期四 11:22:12 IST 任务执行时间:2021 年 8 月 05 日星期四 11:22:17 IST

让我们每 5 秒钟重复一次某个任务。为此使用 scheduleAtFixedRate() 方法。请注意,重复周期将根据绝对时间工作。下一个任务的开始不受上一个任务终止时间的影响。

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Demo
{
    public static void main(String[] args) throws InterruptedException
    {
        Runnable runnableTask = new Runnable() {
            public void run()
            {
                System.out.println("Task Performed At: " + new Date());
            }
        };        
        System.out.println("Current Date and Time: " + new Date());
        long delay  = 1;
        long repeatPeriod = 5;
        ScheduledExecutorService exeService = Executors.newSingleThreadScheduledExecutor();
        exeService.scheduleAtFixedRate(runnableTask, delay, repeatPeriod, TimeUnit.SECONDS);
    }
}

当前日期和时间:2021 年 8 月 05 日星期四 11:23:29 IST 任务执行时间:2021 年 8 月 05 日星期四 11:23:30 IST 任务执行时间:2021 年 8 月 05 日星期四 11:23:35 IST 任务执行时间:2021 年 8 月 05 日星期四 11:23:40 IST 任务执行时间:2021 年 8 月 05 日星期四 11:23:45 IST

如果我们想在一个任务的终止和下一个任务的开始之间引入一个固定的延迟,那么我们将使用 scheduleWithFixedDelay() 方法。

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Demo
{
    public static void main(String[] args) throws InterruptedException
    {
        Runnable runnableTask = new Runnable() {
            public void run()
            {
                System.out.println("Task Performed At: " + new Date());
            }
        };        
        System.out.println("Current Date and Time: " + new Date());
        long delay  = 1;
        long repeatPeriod = 5;
        ScheduledExecutorService exeService = Executors.newSingleThreadScheduledExecutor();
        exeService.scheduleWithFixedDelay(runnableTask, delay, repeatPeriod, TimeUnit.SECONDS);
    }
}

当前日期和时间:2021 年 8 月 05 日星期四 11:24:23 IST 任务执行时间:2021 年 8 月 05 日星期四 11:24:24 IST 任务执行时间:2021 年 8 月 05 日星期四 11:24:29 IST 任务执行时间:2021 年 8 月 05 日星期四 11:24:34 IST

摘要

Java 中的 ExecutorService 用于异步执行任务。初始化 ExecutorService 有几种不同的方法。在使用 newFixedThreadPool()方法初始化 ExecutorService 时,我们应该始终使用适当的池大小。较小的池大小会增加任务的等待时间,而较大的池大小会导致性能开销。

我们还可以使用不同的方法来执行任务,如 execute()或 submit()。我们也应该谨慎使用未来界面。尝试对取消的任务使用 get()方法将导致取消异常。在 Future 接口的 get()方法中使用超时也是一个好主意。请记住在使用后关闭执行器服务。