dubbo服务引用之创建Invoker流程详解

 更新时间:2023年08月16日 11:09:33   作者:包月星  
这篇文章主要为大家介绍了dubbo服务引用二之创建Invoker流程详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

1、创建Invoker流程

1.1、收集引用参数

ReferenceConfig#init方法的结尾处,调用createProxy方法,采集的参数集合作为入参传递到该方法中。

1.2、创建Invoker

@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
        ...
        // 创建服务代理
        return (T) proxyFactory.getProxy(invoker);
    }

方法主要逻辑就是

  • 1、默认情况下如果本地有服务暴露,则引用本地服务.
  • 2、用户写死了引用的URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
  • 3、通过注册中心配置拼装URL,List<URL> us = loadRegistries(false); 用户配置了几个注册中心,就会产生几个URL

不管走哪种引用类型,都会执行下面的核心代码

invoker = refprotocol.refer(interfaceClass, url);

refprotocol是一个Protocol接口,getAdaptiveExtension返回的是一个Protocol$Adpative,在该类的源码及其生产方法。

private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

又看到了Protocol这个接口

@SPI("dubbo")
public interface Protocol {
    int getDefaultPort();
    @Adaptive
    <T> Exporter<T> export(Invoker<T> var1) throws RpcException;
    @Adaptive
    <T> Invoker<T> refer(Class<T> var1, URL var2) throws RpcException;
    void destroy();
}

主要任务是,暴露远程服务、引用远程服务、释放协议【释放暴露于引用服务时占用的资源】,dubbo支持多种协议,http,thrift,RMI等,真是通过dubbo的SPI机制,才可以灵活的在这些协议来回切换。

我们第二种为例讲一下核心逻辑,同服务暴露时的一样,因为Dubbo的AOP机制,在获RegistryProtocol时,会经过两个Wrapper类的包装

这个地方也不例外,但是两个Wrapper类的ProtocolFilterWrapper,ProtocolListenerWrapper并无实际的业务逻辑,我们直接跳过。

执行refprotocol.refer(interfaceClass, url)即执行RegistryProtocol#refer代码。

@SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }
        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }

1.2.1、 连接zookeeper

Registry registry = registryFactory.getRegistry(url);

在服务发布的时候,已经讲过了,其主要核心作用就是连接zookeeper服务器,并返回一个ZookeeperRegistry实例。

在RegistryProtocol#doRefer方法中,通过ZookeeperRegistry#register的执行,创建引用服务的consumers节点。创建如下节点:

/dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer%3A%2F%2F192.168.43.156%2Fcom.alibaba.dubbo.demo.DemoService%3Fapplication%3Ddemo-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D15775%26side%3Dconsumer%26timestamp%3D1525073802234

1.2.2、 监听节点

1.2.3、 加入集群

cluster.join(directory)

cluster也是一个带有Adaptive注解的扩展类,默认实现时FailoverCluster

@SPI(FailoverCluster.NAME)
public interface Cluster {
    /**
     * Merge the directory invokers to a virtual invoker.
     *
     * @param <T>
     * @param directory
     * @return cluster invoker
     * @throws RpcException
     */
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;
}

进入FailoverCluster#join,返回FailoverClusterInvoker,一个可以失败转移的Invoker,

public class FailoverCluster implements Cluster {
    public final static String NAME = "failover";
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }
}

FailoverClusterInvoker源码中,它实现了父类中的一个模板子方法doInvoke。
父类AbstractClusterInvoker的invoke方法,

public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
        LoadBalance loadbalance;
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && invokers.size() > 0) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }

方法list(invocation)返回了List<Invoker<T>> invokers,这些Invoker就是实际与服务交互的对象,

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        List<Invoker<T>> invokers = directory.list(invocation);
        return invokers;
    }

我们在构造FailoverClusterInvoker时,传入的Directory实现类是RegistryDirectory,即AbstractDirectory#list方法。

1.2.4、 核心类DubboInvoker

DubboInvoker 最终Invoker执行的方法是:

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

调用invoker,白话描述就是:
将通过远程通信将Invocation信息传递给服务器端,服务器端接收到该Invocation信息后,找到对应的本地Invoker,然后通过反射执行相应的方法,将方法的返回值再通过远程通信将结果传递给客户端。

这里分3种情况:

  • 执行方法不需要返回值:直接调用ExchangeClient.send()方法
  • 执行方法的结果需要异步返回:使用ExchangeClient.request()方法返回一个ResponseFuture对象,通过RpcContext中的ThreadLocal使ResponseFuture和当前线程绑定,未等服务端响应结果就直接返回,然后服务端通过ProtocolFilterWrapper.buildInvokerChain()方法会调用Filter.invoke()方法,即FutureFilter.invoker()->asyncCallback(),会获取RpcContext的ResponseFuture对象,异步返回结果
  • 执行方法的结果需要同步返回:使用ExchangeClient.request()方法,返回一个ResponseFuture,一直阻塞到服务端返回响应结果

返回FailoverClusterInvoker

以上就是dubbo服务引用二之创建Invoker的详细内容,更多关于dubbo服务引用创建Invoker的资料请关注脚本之家其它相关文章!

相关文章

  • JAVA模拟新增顺序表及单链表

    JAVA模拟新增顺序表及单链表

    这篇文章主要介绍了JAVA模拟新增顺序表及单链表,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • java对象转成byte数组的3种方法

    java对象转成byte数组的3种方法

    这篇文章主要为大家详细介绍了java对象转成byte数组的3种方法,具有一定的参考价值,感兴趣的朋友可以参考一下
    2018-06-06
  • SpringBoot集成JWT实现Token登录验证的示例代码

    SpringBoot集成JWT实现Token登录验证的示例代码

    随着技术的发展,分布式web应用的普及,通过session管理用户登录状态成本越来越高,因此慢慢发展成为token的方式做登录身份校验,本文就来介绍一下SpringBoot集成JWT实现Token登录验证的示例代码,感兴趣的可以了解一下
    2023-12-12
  • SpringBoot中如何进行统一异常处理

    SpringBoot中如何进行统一异常处理

    大家好,本篇文章主要讲的是SpringBoot中如何进行统一异常处理,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下
    2022-02-02
  • 详解Mybatis的分页插件

    详解Mybatis的分页插件

    这篇文章主要介绍了详解Mybatis的分页插件,在 Mybatis中,如何对数据进行分页是一个非常常见的问题,现在,我们可以通过使用 Mybatis 的分页插件来实现对数据的分页,需要的朋友可以参考下
    2023-05-05
  • Java 爬虫数据异步加载如何解决

    Java 爬虫数据异步加载如何解决

    这篇文章主要介绍了Java 爬虫遇上数据异步加载,试试这两种办法!问题如何解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • Java实现经典捕鱼达人游戏的示例代码

    Java实现经典捕鱼达人游戏的示例代码

    《捕鱼达人》是一款以深海狩猎为题材的休闲竞技游戏。本文将利用Java实现这一经典的游戏,文中采用了swing技术进行了界面化处理,需要的可以参考一下
    2022-02-02
  • Java中Lombok的@Builder注解注意事项

    Java中Lombok的@Builder注解注意事项

    这篇文章主要介绍了Java中Lombok的@Builder注解注意事项,使用Lombok也会造成很多问题,尤其@Builder 有个很大的坑,已经见过好几次由于使用@Builder注解导致默认值失效的问题,如果测试时没有在意这个问题,就很容易引发线上问题,需要的朋友可以参考下
    2023-12-12
  • Java中捕获线程异常的几种方式总结

    Java中捕获线程异常的几种方式总结

    这篇文章主要介绍了Java中捕获线程异常的几种方式总结,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-11-11
  • SpringBoot集成Quartz实现持久化定时接口调用任务

    SpringBoot集成Quartz实现持久化定时接口调用任务

    Quartz是功能强大的开源作业调度库,几乎可以集成到任何 Java 应用程序中,从最小的独立应用程序到最大的电子商务系统,本文将通过代码示例给大家介绍SpringBoot集成Quartz实现持久化定时接口调用任务,需要的朋友可以参考下
    2023-07-07

最新评论