动态线程池组件3:采集线程池配置信息

需要继续创建一个测试模块

image-20240905165340366

创建两个线程的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Slf4j
@EnableAsync
@Configuration
@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
public class ThreadPoolConfig {

@Bean("threadPoolExecutor01")
public ThreadPoolExecutor threadPoolExecutor01(ThreadPoolConfigProperties properties) {
// 实例化策略
RejectedExecutionHandler handler;
switch (properties.getPolicy()){
case "AbortPolicy":
handler = new ThreadPoolExecutor.AbortPolicy();
break;
case "DiscardPolicy":
handler = new ThreadPoolExecutor.DiscardPolicy();
break;
case "DiscardOldestPolicy":
handler = new ThreadPoolExecutor.DiscardOldestPolicy();
break;
case "CallerRunsPolicy":
handler = new ThreadPoolExecutor.CallerRunsPolicy();
break;
default:
handler = new ThreadPoolExecutor.AbortPolicy();
break;
}

// 创建线程池
return new ThreadPoolExecutor(properties.getCorePoolSize(),
properties.getMaxPoolSize(),
properties.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(properties.getBlockQueueSize()),
Executors.defaultThreadFactory(),
handler);
}

@Bean("threadPoolExecutor02")
public ThreadPoolExecutor threadPoolExecutor02(ThreadPoolConfigProperties properties) {
...
//同上
}

// 创建线程池
return new ThreadPoolExecutor(properties.getCorePoolSize(),
...
}

}

线程配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Data
@ConfigurationProperties(prefix = "thread.pool.executor.config", ignoreInvalidFields = true)
public class ThreadPoolConfigProperties {

/** 核心线程数 */
private Integer corePoolSize = 20;
/** 最大线程数 */
private Integer maxPoolSize = 200;
/** 最大等待时间 */
private Long keepAliveTime = 10L;
/** 最大队列数 */
private Integer blockQueueSize = 5000;
/*
* AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* DiscardPolicy:直接丢弃任务,但是不会抛出异常
* DiscardOldestPolicy:将最早进入队列的任务删除,之后再尝试加入队列的任务被拒绝
* CallerRunsPolicy:如果任务添加线程池失败,那么主线程自己执行该任务
* */
private String policy = "AbortPolicy";

}

构建线程池信息获取

image-20240905165718566

通过这段代码可以在Service中获取得到线程池的各个信息

1
2
3
4
5
6
7
8
9
Set<String> threadPoolKeys = threadPoolExecutorMap.keySet();
for (String threadPoolKey:threadPoolKeys){
ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolKey);
int poolSize = threadPoolExecutor.getPoolSize();
int corePoolSize = threadPoolExecutor.getCorePoolSize();
BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
String simpleName = queue.getClass().getSimpleName();

}

同时在domain中定义线程池服务的接口:

1
2
3
4
5
6
7
8
9
10
11
12
IDynamicThreadPoolService接口 
定义了与动态线程池管理相关的服务方法方法说明
1. List<ThreadPoolConfigEntity> queryThreadPoolList()
功能:获取所有线程池的配置信息。
返回:一个包含所有线程池配置的实体对象的列表。
2. ThreadPoolConfigEntity queryThreadPoolConfigByName(String threadPoolName)
功能:根据线程池的名称查询具体的线程池配置。
参数:threadPoolName,线程池的名称。
返回:指定名称的线程池配置实体。
3. void updateThreadPoolConfig(ThreadPoolConfigEntity threadPoolConfigEntity)
功能:更新线程池的配置信息。
参数:threadPoolConfigEntity,包含更新后的线程池配置的实体对象。

最后创建名为 DynamicThreadPoolService 的类,该类提供了管理和查询 Java 线程池的功能。该服务允许通过应用名称 (applicationName) 和线程池名来查询和更新线程池的配置信息。我们来逐步解释其各个部分的功能和逻辑。

  1. 类成员变量
1
2
3
java复制代码private final Logger logger = LoggerFactory.getLogger(DynamicThreadPoolService.class);
private final String applicationName;
private final Map<String, ThreadPoolExecutor> threadPoolExecutorMap;
  • logger:用于记录日志,借助 SLF4J 和 Logback 等工具进行日志输出。
  • applicationName:存储当前应用的名称,作为线程池配置的一部分,可能用于区分不同应用的线程池。
  • threadPoolExecutorMap:这是一个存储多个线程池实例的映射表,key 是线程池的名字,value 是 ThreadPoolExecutor 对象。它允许通过名称获取具体的线程池实例。
  1. 构造函数
1
2
3
4
java复制代码public DynamicThreadPoolService(String applicationName, Map<String, ThreadPoolExecutor> threadPoolExecutorMap) {
this.applicationName = applicationName;
this.threadPoolExecutorMap = threadPoolExecutorMap;
}
  • 构造函数用于初始化 DynamicThreadPoolService,将应用名称和线程池映射传入。
  • applicationNamethreadPoolExecutorMap 都是必须的,因为线程池信息是通过这两个字段管理的。
  1. queryThreadPoolList() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码@Override
public List<ThreadPoolConfigEntity> queryThreadPoolList() {
Set<String> threadPoolBeanNames = threadPoolExecutorMap.keySet();
List<ThreadPoolConfigEntity> threadPoolVOS = new ArrayList<>(threadPoolBeanNames.size());
for (String beanName : threadPoolBeanNames) {
ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(beanName);
ThreadPoolConfigEntity threadPoolConfigVO = new ThreadPoolConfigEntity(applicationName, beanName);
threadPoolConfigVO.setCorePoolSize(threadPoolExecutor.getCorePoolSize());
threadPoolConfigVO.setMaximumPoolSize(threadPoolExecutor.getMaximumPoolSize());
threadPoolConfigVO.setActiveCount(threadPoolExecutor.getActiveCount());
threadPoolConfigVO.setPoolSize(threadPoolExecutor.getPoolSize());
threadPoolConfigVO.setQueueType(threadPoolExecutor.getQueue().getClass().getSimpleName());
threadPoolConfigVO.setQueueSize(threadPoolExecutor.getQueue().size());
threadPoolConfigVO.setRemainingCapacity(threadPoolExecutor.getQueue().remainingCapacity());
threadPoolVOS.add(threadPoolConfigVO);
}
return threadPoolVOS;
}
  • 该方法返回所有已注册线程池的配置信息。
  • 它遍历 threadPoolExecutorMap 中的所有线程池,获取线程池名称 (beanName) 和对应的 ThreadPoolExecutor 实例。
  • 对每个线程池,创建一个 ThreadPoolConfigEntity 对象,并从 ThreadPoolExecutor 中获取各类配置参数(如核心线程数、最大线程数、活动线程数、队列类型等)。
  • 最终返回包含所有线程池配置信息的列表。
  1. queryThreadPoolConfigByName() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@Override
public ThreadPoolConfigEntity queryThreadPoolConfigByName(String threadPoolName) {
ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolName);
if (null == threadPoolExecutor) return new ThreadPoolConfigEntity(applicationName, threadPoolName);

ThreadPoolConfigEntity threadPoolConfigVO = new ThreadPoolConfigEntity(applicationName, threadPoolName);
threadPoolConfigVO.setCorePoolSize(threadPoolExecutor.getCorePoolSize());
threadPoolConfigVO.setMaximumPoolSize(threadPoolExecutor.getMaximumPoolSize());
threadPoolConfigVO.setActiveCount(threadPoolExecutor.getActiveCount());
threadPoolConfigVO.setPoolSize(threadPoolExecutor.getPoolSize());
threadPoolConfigVO.setQueueType(threadPoolExecutor.getQueue().getClass().getSimpleName());
threadPoolConfigVO.setQueueSize(threadPoolExecutor.getQueue().size());
threadPoolConfigVO.setRemainingCapacity(threadPoolExecutor.getQueue().remainingCapacity());

if (logger.isDebugEnabled()) {
logger.info("动态线程池,配置查询 应用名:{} 线程名:{} 池化配置:{}", applicationName, threadPoolName, JSON.toJSONString(threadPoolConfigVO));
}

return threadPoolConfigVO;
}
  • 该方法根据给定的 threadPoolName 查询某个特定线程池的配置信息。
  • 如果找不到指定名称的线程池,返回一个只包含应用名和线程池名的空配置信息对象。
  • 如果找到对应的线程池,则从 ThreadPoolExecutor 中提取相关信息(核心线程数、最大线程数等),并封装到 ThreadPoolConfigEntity 中。
  • 如果日志级别为 DEBUG,它会记录查询到的线程池配置信息。
  1. updateThreadPoolConfig() 方法
1
2
3
4
5
6
7
8
9
java复制代码@Override
public void updateThreadPoolConfig(ThreadPoolConfigEntity threadPoolConfigEntity) {
if (null == threadPoolConfigEntity || !applicationName.equals(threadPoolConfigEntity.getAppName())) return;
ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolConfigEntity.getThreadPoolName());
if (null == threadPoolExecutor) return;

threadPoolExecutor.setCorePoolSize(threadPoolConfigEntity.getCorePoolSize());
threadPoolExecutor.setMaximumPoolSize(threadPoolConfigEntity.getMaximumPoolSize());
}
  • 该方法用于更新线程池的配置,尤其是核心线程数和最大线程数。
  • 首先,它检查传入的 ThreadPoolConfigEntity 是否为空,或应用名是否匹配。
  • 然后根据 threadPoolNamethreadPoolExecutorMap 获取对应的 ThreadPoolExecutor,并更新核心线程数和最大线程数。