动态线程池组件5:订阅消息并且变更线程池

添加一个Listener的模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ThreadPoolConfigAdjustListener implements MessageListener<ThreadPoolConfigEntity> {

private Logger logger = LoggerFactory.getLogger(ThreadPoolConfigAdjustListener.class);

private final IDynamicThreadPoolService dynamicThreadPoolService;
private final IRegistry registry;


public ThreadPoolConfigAdjustListener(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
this.dynamicThreadPoolService = dynamicThreadPoolService;
this.registry = registry;
}

@Override
public void onMessage(CharSequence charSequence, ThreadPoolConfigEntity threadPoolConfigEntity) {
logger.info("动态线程池,调整线程池配置。线程池名称:{} 核心线程数:{} 最大线程数:{}", threadPoolConfigEntity.getThreadPoolName(), threadPoolConfigEntity.getPoolSize(), threadPoolConfigEntity.getMaximumPoolSize());
dynamicThreadPoolService.updateThreadPoolConfig(threadPoolConfigEntity);

// 更新后上报最新数据
List<ThreadPoolConfigEntity> threadPoolConfigEntities = dynamicThreadPoolService.queryThreadPoolList();
registry.reportThreadPool(threadPoolConfigEntities);

ThreadPoolConfigEntity threadPoolConfigEntityCurrent = dynamicThreadPoolService.queryThreadPoolConfigByName(threadPoolConfigEntity.getThreadPoolName());
registry.reportThreadPoolConfigParameter(threadPoolConfigEntityCurrent);
logger.info("动态线程池,上报线程池配置:{}", JSON.toJSONString(threadPoolConfigEntity));
}

}

然后在主函数中开启订阅

1
2
3
4
5
6
7
8
9
10
11
@Bean
public ThreadPoolConfigAdjustListener threadPoolConfigAdjustListener(DynamicThreadPoolService dynamicThreadPoolService, IRegistry registry){
return new ThreadPoolConfigAdjustListener(dynamicThreadPoolService,registry);
}

@Bean(name = "dynamicThreadPoolRedisTopic")
public RTopic threadPoolConfigAdjustListener(RedissonClient redissonClient, ThreadPoolConfigAdjustListener threadPoolConfigAdjustListener) {
RTopic topic = redissonClient.getTopic(RegistryEnumVO.DYNAMIC_THREAD_POOL_REDIS_TOPIC.getKey() + "_" + applicationName);
topic.addListener(ThreadPoolConfigEntity.class, threadPoolConfigAdjustListener);
return topic;
}