dubbo如何实现consumer从多个group中调用指定group的provider

 更新时间:2023年03月21日 10:30:14   作者:自东向西  
这篇文章主要介绍了dubbo如何实现consumer从多个group中调用指定group的provider问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

背景

在工作中,遇到这样的场景:

有个es索引构建服务,需要从各个业务服务获取索引的信息,从而构建索引,业务服务都实现同一个接口——IndexInfoProvider,通过设置不同的group来达到区分的效果(group就是es索引名)。

索引构建服务在内存维护了一个Map<String, IndexInfoProvider> providerMap,key是索引名——也就是provider的group,value是IndexInfoProvider服务的consumer。

为了图方便,索引构建服务还一次性初始化了所有consumer,假设总共有200个分组,那么Map就会缓存200个consumer,初始化的时候巨慢。如果不一次性初始化,而是按需的话,代码会变得复杂一些,可能需要像双重检验这样的措施。而且因为是缓存,必然也会遇到缓存一致性的问题,例如新增一个索引。

基于这样的问题,就想着,dubbo的一个consumer,能不能按需调用指定分组的provider呢~

过程

为什么必须缓存那么多consumer

为什么有200个分组,就要缓存200个consumer,而不是像我们平时那样,1个就可以呢?

业务服务在实现IndexInfoProvider接口的时候,都指定是分组,而且分组名,就是对应的索引名。

相应的,消费者就必须指定消费的分组,200个索引,自然就需要200个consumer。

consumer只能调用一个group的provider么

在我们日常的使用中,consumer基本都是只能调用一个group的provider。但实际上,dubbo是支持调用多个group的provider的。

这样写是指定a分组和b分组,多个分组之间用英文逗号分隔

这样写,是所有的分组

group="*"能代替Map<String, IndexInfoProvider>么

如果将es索引构建服务的IndexInfoProvider接口的consumer分组设置为group="*",然后在调用接口的时候,根据需要,从一堆provider中筛选出指定group的provider,只调用这些provider,是不是就可以代替Map缓存了?

初步方案:

  • 利用ThreadLocal来指定要调用的分组,在调用方法前设置group到ThreadLocal中。
  • 实现dubbo的负载均衡拓展,只选取指定分组的节点来调用。
  • 调用结束,清理ThreadLocal中的分组信息。

似乎是可行的?

理想很美好,现实很骨感

通过源码和代码debug发现,consumer持有的Invoker,有点像一个责任链。

如果是单分组——!group.contains(",") && !group.equals("*"),那么会是这样:

MockClusterInvoker->FailoverClusterInvoker

如果是多分组,则会变成:

MockClusterInvoker->MergeableClusterInvoker->FailoverClusterInvoker

负载均衡的机制,是得到了FailoverClusterInvoker才会生效

MockClusterInvoker只是一种降级机制,不是导致问题的原因

问题是出在MergeableClusterInvoker,下面是导致问题的代码!!!

代码大概的意思是:消费者没有指定合并策略,那么就会调用第一个有效的服务提供者,如果都是无效,就直接调用第一个。

我是没有指定合并策略的,所以会变成调用第一个有效的服务提供者。根本走不到负载均衡那里。

不指定合并策略不行,那指定呢?指定合并策略会怎样呢?

指定了合并策略,会调用所有invoker,然后用Merger合并结果。

这种很适合这样种场景:

一部分数据在服务A,一部分数据在服务B,需要分别调用A和B,然后把两者的结果集合并

设想着这样的方案:

  • 指定了合并策略,那么所有分组的invoker都会被调用到,那么请求就到FailoverClusterInvoker了,负载均衡spi就生效了
  • 修改一下负载均衡spi,如果目前调用的invoker不是指定分组的,那么就直接返回null
  • 实现自己的合并spi,返回第一个不会null的结果

虽然不是什么优雅的方式,但是,似乎是能做到的???

实际上,行不通,达不到想要的效果。原因是MergeableClusterInvoker内部是用线程池并发调用的,ThreadLocal里的分组信息会丢失

还有机会么

MergeableClusterInvoker类是没有留拓展的余地啦,还有其他机会么?MergeableClusterInvoker的Invokers列表从哪里来的?

Invokers是从directory来的,这里有没有拓展的余地呢?

有的,在AbstractDirectory的list方法

看到这,发现用路由spi,还是有机会的,如果实现动态路由,每次只给MergeableClusterInvoker返回指定分组的Invoker,是不是就可以呢?怎么让routers包含我们自定义的路由spi呢?

需要url上携带了router参数,但是这里的url的参数是由谁决定的?@Reference 注解是没有没有的指定router参数的…

最后debuig,兜兜转转,发现只能靠重写ReferenceConfig类的loadRegistries方法,往url上加上动态路由的参数。

成果

AssignGroupRouterFactory

public class AssignGroupRouterFactory implements RouterFactory {
    public static final String NAME = "assignGroup";

    @Override
    public Router getRouter(URL url) {
        return new AssignGroupRouter(url);
    }
}

AssignGroupRouter

public class AssignGroupRouter implements Router {
    private final URL url;

    public AssignGroupRouter(URL url) {
        this.url = url;
    }

    @Override
    public URL getUrl() {
        return url;
    }

    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        String assignGroup = IndexInfoProviderInvoker.getAssignGroup();
        if (Objects.isNull(assignGroup)) {
            return invokers;
        }
        return invokers.stream()
            .filter(invoker -> {
                URL invokerUrl = invoker.getUrl();
                return Objects.equals(assignGroup, invokerUrl.getParameter(Constants.GROUP_KEY));
            }).collect(Collectors.toList());
    }

    @Override
    public int compareTo(Router o) {
        return 1;
    }
}


AssignGroupReferenceConfig

public class AssignGroupReferenceConfig<T> extends ReferenceConfig<T> {
    @Override
    protected List<URL> loadRegistries(boolean provider) {
        List<URL> urls = super.loadRegistries(provider);
        if (CollectionUtils.isEmpty(urls)) {
            return urls;
        }
        // 指定动态路由,路由方式为assignGroup
        return urls.stream()
            .map(url -> url.addParameter(Constants.ROUTER_KEY, "assignGroup").addParameter(Constants.RUNTIME_KEY,
                "true"))
            .collect(Collectors.toList());
    }
}

IndexInfoProviderInvokerProxy

@Component
public class IndexInfoProviderInvokerProxy {
    public static final ThreadLocal<String> INDEX_TYPE_THREAD_LOCAL = new ThreadLocal<>();

    private final IndexInfoProvider indexInfoProvider;

    public IndexInfoProviderInvokerProxy(IndexInfoProvider indexInfoProvider) {
        this.indexInfoProvider = indexInfoProvider;
    }

    /**
     * 获取文档id 在[start, end) 之间的所有索引文档
     *
     * @param indexType 索引名
     * @param start 起始id
     * @param end 结束id
     * @return 文档列表
     */
    public List<IndexDocument> getDocsByIdRange(String indexType, long start, long end) {
        return invokeWitchIndexType(indexType, () -> indexInfoProvider.getDocsByIdRange(start, end));
    }

    private <T> T invokeWitchIndexType(String indexType, Supplier<T> supplier) {
        INDEX_TYPE_THREAD_LOCAL.set(indexType);
        T result = supplier.get();
        INDEX_TYPE_THREAD_LOCAL.remove();
        return result;
    }
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Mybatis Plus 大数据游标分页的实现

    Mybatis Plus 大数据游标分页的实现

    使用MyBatis Plus的游标分页,我们可以轻松应对大数据量的场景,本文主要介绍了Mybatis Plus 大数据游标分页的实现,具有一定的参考价值,感兴趣的可以了解一下
    2024-07-07
  • SpringBoot+Websocket实现一个简单的网页聊天功能代码

    SpringBoot+Websocket实现一个简单的网页聊天功能代码

    本篇文章主要介绍了SpringBoot+Websocket实现一个简单的网页聊天功能代码,具有一定的参考价值,有需要的可以了解一下
    2017-08-08
  • 使用idea开发Servlet详细图文教程

    使用idea开发Servlet详细图文教程

    这篇文章主要给大家介绍了关于使用idea开发Servlet的相关资料,将idea添加servlet的过程其实非常简单,只需要按照以下几个步骤即可完成,需要的朋友可以参考下
    2023-10-10
  • 一篇文章超详细的介绍Java继承

    一篇文章超详细的介绍Java继承

    Java中的继承是一种机制,表示为一个对象获取父对象的所有属性和行为,下面这篇文章主要给大家介绍了关于Java继承的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-03-03
  • SpringBoot全局异常处理机制和配置拦截器方式

    SpringBoot全局异常处理机制和配置拦截器方式

    这篇文章主要介绍了SpringBoot全局异常处理机制和配置拦截器方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • 在Spring Boot中使用swagger-bootstrap-ui的方法

    在Spring Boot中使用swagger-bootstrap-ui的方法

    这篇文章主要介绍了在Spring Boot中使用swagger-bootstrap-ui的方法,需要的朋友可以参考下
    2018-01-01
  • Java 多线程并发 ReentrantReadWriteLock详情

    Java 多线程并发 ReentrantReadWriteLock详情

    这篇文章主要介绍了Java多线程并发ReentrantReadWriteLock详情,ReentrantReadWriteLock可重入读写锁。实际使用场景中,我们需要处理的操作本质上是读与写,更多相关资料,感兴趣的小伙伴可以参考一下下面文章内容
    2022-06-06
  • 详解java中的阻塞队列

    详解java中的阻塞队列

    这篇文章主要介绍了java中的阻塞队列的相关知识,文中代码非常详细,供大家参考和学习,感兴趣的朋友可以了解下
    2020-06-06
  • Java多线程编程之访问共享对象和数据的方法

    Java多线程编程之访问共享对象和数据的方法

    这篇文章主要介绍了Java多线程编程之访问共享对象和数据的方法,多个线程访问共享对象和数据的方式有两种情况,本文分别给出代码实例,需要的朋友可以参考下
    2015-05-05
  • mybatisplus驼峰命名映射的问题解决

    mybatisplus驼峰命名映射的问题解决

    本文主要介绍了mybatisplus驼峰命名映射的问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-07-07

最新评论