ETJava Beta | Java    注册   登录
  • 搜索:
  • 面试官:如何实现线程池任务编排?

    发表于      阅读(1)     博客类别:Crawler     转自:https://www.cnblogs.com/vipstone/p/18404617
    如有侵权 请联系我们删除  (页面底部联系我们)  

    任务编排(Task Orchestration)是指管理和控制多个任务的执行流程,确保它们按照预定的顺序正确执行

    1.为什么需要任务编排?

    在复杂的业务场景中,任务间通常存在依赖关系,也就是某个任务会依赖另一个任务的执行结果,在这种情况下,我们需要通过任务编排,来确保任务按照正确的顺序进行执行。

    例如,以下任务的执行顺序:

    其中,任务二要等任务一执行完才能执行,而任务四要等任务二和任务三全部执行完才能执行。

    2.任务编排实现

    任务编排和控制的主要手段有以下:

    • Future
    • CompletableFuture
    • CountDownLatch
    • Semaphore
    • CyclicBarrier

    但如果是全局线程池,想要实现精准的任务编排,只能使用 Future 或 CompletableFuture。

    2.1 Future 任务编排

    使用 Future 实现上述 4 个任务的编排(任务二要等任务一执行完才能执行,而任务四要等任务二和任务三全部执行完才能执行):

    import java.util.concurrent.*;
    import java.util.Arrays;
    
    public class TaskOrchestrator {
        public static void main(String[] args) {
            // 创建一个线程池来执行任务
            ExecutorService executor = Executors.newFixedThreadPool(5);
    
            // 定义任务一
            Future<String> taskOneResult = executor.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    Thread.sleep(2000); // 模拟耗时操作
                    return "Task One Result";
                }
            });
    
            // 定义任务二,依赖任务一
            Future<String> taskTwoResult = executor.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    String result = taskOneResult.get(); // 阻塞等待任务一完成
                    Thread.sleep(1000); // 模拟耗时操作
                    return "Task Two Result, got: " + result;
                }
            });
    
            // 定义任务三
            Future<String> taskThreeResult = executor.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    Thread.sleep(1500); // 模拟耗时操作
                    return "Task Three Result";
                }
            });
    
            // 定义任务四,依赖任务二和任务三
            Future<String> taskFourResult = executor.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    String taskTwoOutput = taskTwoResult.get(); // 阻塞等待任务二完成
                    String taskThreeOutput = taskThreeResult.get(); // 阻塞等待任务三完成
                    Thread.sleep(500); // 模拟耗时操作
                    return "Task Four Result, got: " + taskTwoOutput + " and " + taskThreeOutput;
                }
            });
    
            // 打印最终结果
            try {
                System.out.println("Final Result: " + taskFourResult.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    

    2.2 CompletableFuture 任务编排

    CompletableFutrue 提供的方法有很多,但最常用和最实用的核心方法只有以下几个:

    接下来,使用 CompletableFuture 实现上述 4 个任务的编排(任务二要等任务一执行完才能执行,而任务四要等任务二和任务三全部执行完才能执行):

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureExample {
    
        public static void main(String[] args) {
            // 任务一:返回 "Task 1 result"
            CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
                try {
                    // 模拟耗时操作
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
                return "Task 1 result";
            });
            // 任务二:依赖任务一,返回 "Task 2 result" + 任务一的结果
            CompletableFuture<String> task2 = task1.handle((result1, throwable) -> {
                try {
                    // 模拟耗时操作
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
                return "Task 2 result " + result1;
            });
            // 任务三:和任务一、任务二并行执行,返回 "Task 3 result"
            CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
                try {
                    // 模拟耗时操作
                    Thread.sleep(800); // 任务三可能比任务二先完成
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
                return "Task 3 result";
            });
            // 任务四:依赖任务二和任务三,等待它们都完成后执行,返回 "Task 4 result" + 任务二和任务三的结果
            CompletableFuture<String> task4 = CompletableFuture.allOf(task2, task3).handle((res, throwable) -> {
                try {
                    // 这里不需要显式等待,因为 allOf 已经保证了它们完成
                    return "Task 4 result with " + task2.get() + " and " + task3.get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
            // 获取任务四的结果并打印
            String finalResult = task4.join();
            System.out.println(finalResult);
        }
    }
    

    课后思考

    Future 和 CompletableFutrue 有什么关系?CompletableFutrue 底层是如何实现的?

    本文已收录到我的面试小站 www.javacn.site,其中包含的内容有:Redis、JVM、并发、并发、MySQL、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、设计模式、消息队列等模块。