动态线程池组件4:线程池数据获取

代码概览

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package fun.hyperzhu.middleware.dynamic.thread.pool.sdk.trigger.job;

import com.alibaba.fastjson.JSON;
import fun.hyperzhu.middleware.dynamic.thread.pool.sdk.domain.IDynamicThreadPoolService;
import fun.hyperzhu.middleware.dynamic.thread.pool.sdk.domain.model.entity.ThreadPoolConfigEntity;
import fun.hyperzhu.middleware.dynamic.thread.pool.sdk.registry.IRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.List;

//线程池数据上报任务
public class ThreadPoolDataReportJob {

private final Logger logger = LoggerFactory.getLogger(ThreadPoolDataReportJob.class);

private final IDynamicThreadPoolService dynamicThreadPoolService;

private final IRegistry registry;

public ThreadPoolDataReportJob(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
this.dynamicThreadPoolService = dynamicThreadPoolService;
this.registry = registry;
}

@Scheduled(cron = "0/20 * * * * ?")
public void execReportThreadPoolList() {
List<ThreadPoolConfigEntity> threadPoolConfigEntities = dynamicThreadPoolService.queryThreadPoolList();
registry.reportThreadPool(threadPoolConfigEntities);
logger.info("动态线程池,上报线程池信息:{}", JSON.toJSONString(threadPoolConfigEntities));

for (ThreadPoolConfigEntity threadPoolConfigEntity : threadPoolConfigEntities) {
registry.reportThreadPoolConfigParameter(threadPoolConfigEntity);
logger.info("动态线程池,上报线程池配置:{}", JSON.toJSONString(threadPoolConfigEntity));
}
}

}

详细解释

1. 包声明和导入

1
package fun.hyperzhu.middleware.dynamic.thread.pool.sdk.trigger.job;
  • 包声明:定义了这个类所属的包,通常用于组织代码结构。这里的包名表明这是一个中间件(middleware)动态线程池(dynamic thread pool)SDK(软件开发工具包)的一部分,具体位于trigger.job子包下,可能用于触发和管理任务。
1
2
3
4
5
6
7
8
import com.alibaba.fastjson.JSON;
import fun.hyperzhu.middleware.dynamic.thread.pool.sdk.domain.IDynamicThreadPoolService;
import fun.hyperzhu.middleware.dynamic.thread.pool.sdk.domain.model.entity.ThreadPoolConfigEntity;
import fun.hyperzhu.middleware.dynamic.thread.pool.sdk.registry.IRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.List;
  • 导入语句:引入了所需的外部类和接口,包括JSON处理库(fastjson)、自定义的服务接口(IDynamicThreadPoolService)、线程池配置实体(ThreadPoolConfigEntity)、注册接口(IRegistry)、日志库(SLF4J)以及Spring的调度注解(@Scheduled)。

2. 类声明和成员变量

1
2
//线程池数据上报任务
public class ThreadPoolDataReportJob {
  • 类声明:定义了一个名为ThreadPoolDataReportJob的公共类,用于线程池数据的上报任务。
1
2
3
java
复制代码
private final Logger logger = LoggerFactory.getLogger(ThreadPoolDataReportJob.class);
  • 日志记录器:使用SLF4J日志框架创建一个日志记录器,用于记录日志信息,帮助调试和监控。
1
2
private final IDynamicThreadPoolService dynamicThreadPoolService;
private final IRegistry registry;
  • 依赖注入

    • IDynamicThreadPoolService:一个接口,用于动态线程池服务,可能包含获取线程池配置等方法。
    • IRegistry:一个接口,用于注册和上报线程池信息。

3. 构造函数

1
2
3
4
java复制代码public ThreadPoolDataReportJob(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
this.dynamicThreadPoolService = dynamicThreadPoolService;
this.registry = registry;
}
  • 构造函数:通过构造函数注入IDynamicThreadPoolServiceIRegistry的实现,实现依赖注入,增强类的可测试性和模块化。

4. 定时任务方法

1
2
3
4
5
6
7
8
9
10
11
@Scheduled(cron = "0/20 * * * * ?")
public void execReportThreadPoolList() {
List<ThreadPoolConfigEntity> threadPoolConfigEntities = dynamicThreadPoolService.queryThreadPoolList();
registry.reportThreadPool(threadPoolConfigEntities);
logger.info("动态线程池,上报线程池信息:{}", JSON.toJSONString(threadPoolConfigEntities));

for (ThreadPoolConfigEntity threadPoolConfigEntity : threadPoolConfigEntities) {
registry.reportThreadPoolConfigParameter(threadPoolConfigEntity);
logger.info("动态线程池,上报线程池配置:{}", JSON.toJSONString(threadPoolConfigEntity));
}
}
  • @Scheduled 注解

    • 作用:标记这个方法为一个定时任务。
    • **cron 表达式 "0/20 \* \* \* \* ?"**:表示每隔20秒执行一次。
  • 方法功能

    1. 获取线程池配置列表

      1
      List<ThreadPoolConfigEntity> threadPoolConfigEntities = dynamicThreadPoolService.queryThreadPoolList();
      • 调用dynamicThreadPoolServicequeryThreadPoolList方法,获取当前所有线程池的配置列表。
    2. 上报线程池信息

      1
      2
      java复制代码registry.reportThreadPool(threadPoolConfigEntities);
      logger.info("动态线程池,上报线程池信息:{}", JSON.toJSONString(threadPoolConfigEntities));
      • 调用registryreportThreadPool方法,将线程池配置列表上报到注册中心或监控系统。
      • 使用日志记录上报的线程池信息,方便后续查看和调试。
    3. 逐个上报线程池配置参数

      1
      2
      3
      4
      for (ThreadPoolConfigEntity threadPoolConfigEntity : threadPoolConfigEntities) {
      registry.reportThreadPoolConfigParameter(threadPoolConfigEntity);
      logger.info("动态线程池,上报线程池配置:{}", JSON.toJSONString(threadPoolConfigEntity));
      }
      • 遍历每个ThreadPoolConfigEntity,调用registryreportThreadPoolConfigParameter方法,逐个上报线程池的具体配置参数。
      • 同时记录每个线程池配置的日志信息。

总体作用

ThreadPoolDataReportJob类的主要作用是作为一个定时任务,定期(每20秒)从动态线程池服务中获取当前所有线程池的配置列表,并将这些信息上报到注册中心或监控系统。同时,通过日志记录上报的详细信息,以便于开发和运维人员监控线程池的状态和配置变化。

使用场景

  • 动态线程池管理:在微服务或大型分布式系统中,线程池的配置可能需要根据实时负载动态调整。通过定时上报线程池配置,可以实现对线程池状态的实时监控和管理。
  • 监控与报警:将线程池的配置信息上报到监控系统后,可以设置相应的监控指标和报警规则,及时发现和处理线程池相关的问题,如线程池过载、配置异常等。
  • 配置同步:在分布式环境中,确保各个服务实例的线程池配置保持一致,避免因配置不一致导致的性能问题或资源浪费。

注意事项

  • 性能影响:虽然每20秒上报一次线程池信息不太可能对系统性能造成显著影响,但在高频率的场景下,需要评估上报操作对系统资源的消耗。
  • 错误处理:当前代码没有显示的错误处理机制。如果queryThreadPoolListreportThreadPool等方法抛出异常,可能导致任务失败。建议添加异常处理,以确保任务的健壮性。
  • 安全性:上报的线程池信息可能包含敏感数据,需要确保传输过程的安全性,如使用加密传输协议等。

定义RedissionClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Bean("dynamicThreadRedissonClient")
public RedissonClient redissonClient(DynamicThreadPoolAutoProperties properties) {
Config config = new Config();
// 根据需要可以设定编解码器;https://github.com/redisson/redisson/wiki/4.-%E6%95%B0%E6%8D%AE%E5%BA%8F%E5%88%97%E5%8C%96
config.setCodec(JsonJacksonCodec.INSTANCE);

config.useSingleServer()
.setAddress("redis://" + properties.getHost() + ":" + properties.getPort())
.setPassword(properties.getPassword())
.setConnectionPoolSize(properties.getPoolSize())
.setConnectionMinimumIdleSize(properties.getMinIdleSize())
.setIdleConnectionTimeout(properties.getIdleTimeout())
.setConnectTimeout(properties.getConnectTimeout())
.setRetryAttempts(properties.getRetryAttempts())
.setRetryInterval(properties.getRetryInterval())
.setPingConnectionInterval(properties.getPingInterval())
.setKeepAlive(properties.isKeepAlive())
;

RedissonClient redissonClient = Redisson.create(config);

logger.info("动态线程池,注册器(redis)链接初始化完成。{} {} {}", properties.getHost(), properties.getPoolSize(), !redissonClient.isShutdown());

return redissonClient;
}
  • 这段代码的作用是使用Redisson客户端与Redis服务器交互,配置并返回一个RedissonClient的Spring Bean实例。它使用了Spring的依赖注入和定时调度功能。
  • RedissonClient被用作与Redis交互的工具,RedisRegistry则是一个基于Redis的注册器,用于将动态线程池的信息注册到Redis中,可能用于配置管理、监控或状态跟踪。
  • 通过这种方式,动态线程池系统可以依赖Redis来保持线程池配置的同步或实现分布式环境下的状态管理。

这段代码的功能是通过Spring的@Bean注解和Redisson库,配置并初始化一个RedissonClient,用于与Redis进行交互。具体来说,这段代码是实现了动态线程池系统的Redis注册中心(Registry)配置。接下来,我会逐步解释每一部分的功能:

1. Bean 的定义

1
2
3
@Bean("dynamicThreadRedissonClient")
@Autowired(required = false)
public RedissonClient redissonClient(DynamicThreadPoolAutoProperties properties) {
  • @Bean 注解:表示这个方法返回的对象将被Spring容器管理为一个Bean,Bean的名字是dynamicThreadRedissonClient。这个名字是可选的,如果不指定,Spring会默认使用方法名作为Bean的名字。
  • **@Autowired(required = false)**:表示Spring会尝试自动注入DynamicThreadPoolAutoProperties类中的属性到这个方法中。如果没有找到这个依赖,程序不会抛出异常,因为required被设置为false
  • 方法参数DynamicThreadPoolAutoProperties类封装了动态线程池配置属性,具体属性包括Redis的连接信息(主机、端口、密码等)以及连接池设置。

2. 配置 Redis 的 RedissonClient

1
2
Config config = new Config();
config.setCodec(JsonJacksonCodec.INSTANCE);
  • Config 对象:这是Redisson的配置类,用于设置Redis客户端的各项参数。RedissonClient需要使用该Config对象来初始化。
  • 设置编解码器:使用JsonJacksonCodec.INSTANCE作为Redis数据的序列化和反序列化的编解码器。Redisson允许通过设置不同的编解码器来控制数据如何存储和传输。JsonJacksonCodec使用Jackson库对数据进行序列化和反序列化操作,这种配置使得Redis中的数据以JSON格式存储。

3. 配置 Redis 单节点连接

1
2
3
4
5
6
7
8
9
10
11
config.useSingleServer()
.setAddress("redis://" + properties.getHost() + ":" + properties.getPort())
.setPassword(properties.getPassword())
.setConnectionPoolSize(properties.getPoolSize())
.setConnectionMinimumIdleSize(properties.getMinIdleSize())
.setIdleConnectionTimeout(properties.getIdleTimeout())
.setConnectTimeout(properties.getConnectTimeout())
.setRetryAttempts(properties.getRetryAttempts())
.setRetryInterval(properties.getRetryInterval())
.setPingConnectionInterval(properties.getPingInterval())
.setKeepAlive(properties.isKeepAlive());
  • **useSingleServer()**:使用Redis单节点配置。如果你的Redis是集群模式,可以使用useClusterServers()方法。
  • **setAddress()**:指定Redis服务器的地址,通过读取配置类properties中的hostport来动态配置Redis地址。redis://是Redis连接协议。
  • **setPassword()**:设置Redis的连接密码。
  • **setConnectionPoolSize()**:设置Redis的连接池大小,即客户端可用的最大连接数。
  • **setConnectionMinimumIdleSize()**:设置最小空闲连接数。
  • **setIdleConnectionTimeout()**:设置空闲连接超时时间,超过这个时间未使用的连接将被关闭。
  • **setConnectTimeout()**:设置连接Redis服务器的超时时间。
  • **setRetryAttempts()**:设置连接失败时的重试次数。
  • **setRetryInterval()**:设置连接失败后再次尝试连接的时间间隔。
  • **setPingConnectionInterval()**:设置客户端Ping Redis服务器的时间间隔,用于保持连接的活跃状态。
  • **setKeepAlive()**:是否启用TCP KeepAlive,保持长时间连接。

4. 创建 RedissonClient

1
RedissonClient redissonClient = Redisson.create(config);
  • **Redisson.create(config)**:通过上面配置的Config对象创建一个RedissonClient实例,用于与Redis服务器进行交互。

5. 日志记录

1
2
3
java
复制代码
logger.info("动态线程池,注册器(redis)链接初始化完成。{} {} {}", properties.getHost(), properties.getPoolSize(), !redissonClient.isShutdown());
  • 日志记录:初始化完成后,记录连接到Redis的主机地址、连接池大小、以及Redisson客户端是否已经关闭的信息。redissonClient.isShutdown()用于检查客户端是否关闭。

6. 返回 RedissonClient

1
2
3
java
复制代码
return redissonClient;
  • **返回 RedissonClient**:方法的返回值是RedissonClient对象,它将被Spring容器作为一个Bean管理。这个Bean可以在应用程序的其他地方被自动注入使用。

7. RedisRegistry Bean

1
2
3
java复制代码public IRegistry redisRegistry(RedissonClient redissonClient){
return new RedisRegistry(redissonClient);
}
  • redisRegistry 方法:通过这个方法,定义了一个IRegistry类型的Bean。
  • IRegistry:表示一个注册器接口,可能用于动态线程池相关的信息注册。这里的实现类是RedisRegistry,说明这是一个基于Redis实现的注册器。
  • **构造 RedisRegistry**:将RedissonClient实例传递给RedisRegistry构造函数。通过这个RedissonClientRedisRegistry可以与Redis进行交互,完成注册、更新等操作。