Java动态线程池插件dynamic-tp集成zookeeper

 更新时间:2023年03月02日 09:48:27   作者:Redick01  
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等

前言

dynamic-tp是一个轻量级的动态线程池插件,它是一个基于配置中心的动态线程池,线程池的参数可以通过配置中心配置进行动态的修改,在配置中心的支持上最开始的时候支持NacosApollo,由于笔者公司用的配置中心是Zookeeper,所以就想着扩展支持Zookeeper,在了解源码支持发现dynamic-tp的扩展能力做的很好,提供了扩展接口,只要我开发对应的配置中心模块即可,最终笔者实现了Zookeeper的支持并贡献到社区。接下来我通过源码解析方式介绍下Zookeeper配置中心的接入。

配置刷新

dynamic-tp提供了一个刷新配置的接口Refresher,抽象类AbstractRefresher实现刷新配置接口的刷新配置方法refresh,该方法能根据配置类型内容和配置解析配置并刷新动态线程池的相关配置,由DtpRegistry负责刷新线程池配置,事件发布订阅模式操作Web容器参数,代码如下:

public interface Refresher {
    /**
     * Refresh with specify content.
     * @param content content
     * @param fileType file type
     */
    void refresh(String content, ConfigFileTypeEnum fileType);
}
@Slf4j
public abstract class AbstractRefresher implements Refresher {
    @Resource
    private DtpProperties dtpProperties;
    @Resource
    private ApplicationEventMulticaster applicationEventMulticaster;
    @Override
    public void refresh(String content, ConfigFileTypeEnum fileTypeEnum) {
        if (StringUtils.isBlank(content) || Objects.isNull(fileTypeEnum)) {
            return;
        }
        try {
            // 根据配置内容和配置类型将配置内容转成Map
            val prop = ConfigHandler.getInstance().parseConfig(content, fileTypeEnum);
            doRefresh(prop);
        } catch (IOException e) {
            log.error("DynamicTp refresh error, content: {}, fileType: {}",
                    content, fileTypeEnum, e);
        }
    }
    private void doRefresh(Map<Object, Object> properties) {
        // 将Map中的配置转换成DtpProperties
        ConfigurationPropertySource sources = new MapConfigurationPropertySource(properties);
        Binder binder = new Binder(sources);
        ResolvableType type = ResolvableType.forClass(DtpProperties.class);
        Bindable<?> target = Bindable.of(type).withExistingValue(dtpProperties);
        binder.bind(MAIN_PROPERTIES_PREFIX, target);
        // 刷新动态线程池配置
        DtpRegistry.refresh(dtpProperties);
        // 发布刷新实现,该事件用于控制Web容器线程池参数控制
        publishEvent();
    }
    private void publishEvent() {
        RefreshEvent event = new RefreshEvent(this, dtpProperties);
        applicationEventMulticaster.multicastEvent(event);
    }
}

Zookeeper配置中心接入扩展实现

基于AbstractRefresher就可以实现Zookeeper配置中心的扩展了,Zookeeper的扩展实现继承AbstractRefresherZookeeper的扩展实现只需要监听配置中心的配置变更即可拿到配置内容,然后通过refresh刷新配置即可。代码如下:

ZookeeperRefresher继承AbstractRefresher,实现InitializingBeanafterPropertiesSet方法逻辑从配置DtpProperties获取Zookeeper的配置信息,CuratorFrameworkFactory创建客户端,设置监听器,这里有两种监听器,一个是连接监听ConnectionStateListener,一个是节点变动监听CuratorListener,出发监听后loadNode负责从Zookeeper获取配置文件配置并组装配置内容,然后通过refresh刷新配置,注意,Zookeeper配置目前配置类型仅支持properties

@Slf4j
public class ZookeeperRefresher extends AbstractRefresher implements InitializingBean {
    @Resource
    private DtpProperties dtpProperties;
    private CuratorFramework curatorFramework;
    @Override
    public void afterPropertiesSet() throws Exception {
        DtpProperties.Zookeeper zookeeper = dtpProperties.getZookeeper();
        curatorFramework = CuratorFrameworkFactory.newClient(zookeeper.getZkConnectStr(),
                new ExponentialBackoffRetry(1000, 3));
        String nodePath = ZKPaths.makePath(ZKPaths.makePath(zookeeper.getRootNode(),
                zookeeper.getConfigVersion()), zookeeper.getNode());
        final ConnectionStateListener connectionStateListener = (client, newState) -> {
            if (newState == ConnectionState.CONNECTED || newState == ConnectionState.RECONNECTED) {
                loadNode(nodePath);
            }};
        final CuratorListener curatorListener = (client, curatorEvent) -> {
            final WatchedEvent watchedEvent = curatorEvent.getWatchedEvent();
            if (null != watchedEvent) {
                switch (watchedEvent.getType()) {
                    case NodeChildrenChanged:
                    case NodeDataChanged:
                        loadNode(nodePath);
                        break;
                    default:
                        break;
                }
            }};
        curatorFramework.getConnectionStateListenable().addListener(connectionStateListener);
        curatorFramework.getCuratorListenable().addListener(curatorListener);
        curatorFramework.start();
        log.info("DynamicTp refresher, add listener success, nodePath: {}", nodePath);
    }
    /**
     * load config and refresh
     * @param nodePath config path
     */
    public void loadNode(String nodePath) {
        try {
            final GetChildrenBuilder childrenBuilder = curatorFramework.getChildren();
            final List<String> children = childrenBuilder.watched().forPath(nodePath);
            StringBuilder content = new StringBuilder();
            children.forEach(c -> {
                String n = ZKPaths.makePath(nodePath, c);
                final String nodeName = ZKPaths.getNodeFromPath(n);
                final GetDataBuilder data = curatorFramework.getData();
                String value = "";
                try {
                    value = new String(data.watched().forPath(n), StandardCharsets.UTF_8);
                } catch (Exception e) {
                    log.error("zk config value watched exception.", e);
                }
                content.append(nodeName).append("=").append(value).append("\n");
            });
            refresh(content.toString(), ConfigFileTypeEnum.PROPERTIES);
        } catch (Exception e) {
            log.error("load zk node error, nodePath is {}", nodePath, e);
        }
    }
}

总结

dynamic-tp对应支持配置中心的扩展能力做的非常好,笔者通过Zookeeper客户端CuratorFramework设置监听的方式进行接入,主要监听CuratorFramework客户端连接建立和断开的事件和节点变动的事件实现了动态线程池参数的更新。

到此这篇关于Java动态线程池插件dynamic-tp集成zookeeper的文章就介绍到这了,更多相关Java dynamic-tp内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java实现File转换MultipartFile格式的例子

    Java实现File转换MultipartFile格式的例子

    本文主要介绍了Java实现File转换MultipartFile格式的例子,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-07-07
  • Java Lambda表达式常用的函数式接口

    Java Lambda表达式常用的函数式接口

    这篇文章主要介绍了Java Lambda表达式常用的函数式接口,文章基于Java Lambda表达式展开对常用的函数式接口的介绍,具有一的的参考价值需要的小伙伴可以参考一下
    2022-04-04
  • java JSP开发之Spring中Bean的使用

    java JSP开发之Spring中Bean的使用

    这篇文章主要介绍了java JSP开发之Spring中Bean的使用的相关资料,在Spring中,bean的生命周期就比较复杂,这里就详细介绍下,需要的朋友可以参考下
    2017-08-08
  • Python文件高级操作函数之文件信息获取与目录操作

    Python文件高级操作函数之文件信息获取与目录操作

    这篇文章主要介绍了Python文件高级操作函数之文件信息获取与目录操作,在Python中,内置了文件(File)对象。在使用文件对象时,首先需要通过内置的open()方法创建一个文件对象,然后通过该对象提供的方法进行一些基本文件操作,需要的朋友可以参考下
    2023-05-05
  • MyBatis 引入映射器的方法

    MyBatis 引入映射器的方法

    本文通过实例代码给大家分享mybatis 引入映射器的方法,非常不错,具有参考借鉴价值,需要的朋友参考下吧
    2017-09-09
  • 解决java.lang.Error: Unresolved compilation problems:问题

    解决java.lang.Error: Unresolved compilation pro

    这篇文章主要介绍了解决java.lang.Error: Unresolved compilation problems:问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03
  • Java即将引入新对象类型来解决内存使用问题

    Java即将引入新对象类型来解决内存使用问题

    这篇文章主要介绍了Java即将引入新对象类型来解决内存使用问题,文章通过围绕主题的相关资料展开详细内容,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-05-05
  • SpringBoot Redis清除所有的key的实现方法

    SpringBoot Redis清除所有的key的实现方法

    本文主要介绍了SpringBoot Redis清除所有的key的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-05-05
  • Java自动释放锁的三种实现方案

    Java自动释放锁的三种实现方案

    在笔者面试过程时,经常会被问到各种各样的锁,如乐观锁、读写锁等等,非常繁多,下面这篇文章主要给大家介绍了关于Java自动释放锁的三种实现方案,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-06-06
  • RocketMQ实现随缘分BUG小功能示例详解

    RocketMQ实现随缘分BUG小功能示例详解

    这篇文章主要为大家介绍了RocketMQ实现随缘分BUG小功能示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08

最新评论