通用的SpringBoot项目下线程池的配置和使用,异步任务的创建和使用等。
📚 1. 线程池配置
1.1. 参数配置
resouces/appliccation.yml
1
2
3
4
5async:
executor:
thread:
keepAliveSeconds: 60
namePrefix: async-service-
1.2. 开启线程池
- 从运行时拿Cpu核心数,提高可迁移性
baseServiceExecutor
用拒绝的策略,防止请求过载downloadServiceExecutor
用等待的策略,保证所有下载线程都会被执行若有多个线程池,最好定义一些Bean(name = “executorName”)的线程池名字,方便后续使用
@Qualifier("executorName")
指定线程池config/ExecutorConfiguration.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56package com.xxx.xxx.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
public class ExecutorConfiguration {
public static final int cpuNum = Runtime.getRuntime().availableProcessors();
private int corePoolSize;
private int maxPoolSize;
private int queueCapacity;
private int keepAliveSeconds;
private String namePrefix;
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
public ThreadPoolTaskExecutor baseServiceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(cpuNum + 1);
executor.setMaxPoolSize((cpuNum + 1) * 2);
executor.setQueueCapacity(cpuNum * 20);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setThreadNamePrefix(namePrefix + "base");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.initialize();
return executor;
}
public ThreadPoolTaskExecutor downloadServiceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(cpuNum * 2);
executor.setMaxPoolSize(cpuNum * 4);
executor.setQueueCapacity(cpuNum * 20);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setThreadNamePrefix(namePrefix + "download");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}启动类增加支持Async的注解
1
用到线程池的服务接口增加注解,并标记使用哪个线程池
1
Method annotated with @Async should return 'void' or "Future-like" type
,异步调用不能有返回值或者要用带Future
的返回,可以用CompletableFuture<T>
的结构封装原来的返回类型T
📡 2. 接口中的异步任务改写
2.1. 引入线程池和注入
controller/BaseController.java
1
2
3
4
5
6
7private final ThreadPoolTaskExecutor baseServiceExecutor;
public BaseController(XXX xxx, { ThreadPoolTaskExecutor baseServiceExecutor)
this.xxx = xxx;
this.baseServiceExecutor = baseServiceExecutor;
}
2.2. 接口内多任务的异步编排
- 接口请求成功马上返回结果,可以是查询的结果也可以是空,根据需求;如果需要返回的是多线程服务的查询,return的Result也要用Future结构封装
- 执行异步的下载任务,防止阻塞接口的响应
CompletableFuture
的runasync()
和supplyasync()
的区别runasync()
方法接收一个Runnable
类型的参数,不会返回任何结果supplyasync()
方法接收一个Supplier
类型的参数,需要返回结果
CompletableFuture
的thenCompose()
和thenApply()
的区别thenCompose()
用来连接两个CompletableFuture
,是生成一个新的CompletableFuture
thenApply()
转换的是泛型中的类型,是同一个CompletableFuture
1 |
|
🔎 3. 并行下载请求
- 网络请求下载任务要关注一些异常的捕获,直接抛出将漏掉部分下载内容
- 为了统计下载任务总数,用了原子类来计数
AtomicLong
,和使用CountDownLatch
统计预估下载总线程数量,再用latch.countDown()
和latch.wait()
等待所有线程结束统一返回
1 | package com.***.***.utils; |