Spring Cloud Feign内部实现代码细节

 更新时间:2021年05月21日 10:42:04   作者:Javachichi  
Feign 的英文表意为“假装,伪装,变形”, 是一个http请求调用的轻量级框架,可以以Java接口注解的方式调用Http请求,而不用像Java中通过封装HTTP请求报文的方式直接调用。接下来通过本文给大家分享Spring Cloud Feign内部实现代码细节,感兴趣的朋友一起看看吧

1. 概述

Feign用于服务间调用,它的内部实现是一个包含Ribbon(负载均衡)的**JDK-HttpURLConnection(Http)**调用。虽然调用形式是类似于RPC,但是实际调用是Http,这也是为什么Feign被称为伪RPC调用的原因。
内部调用过程如下:

在这里插入图片描述

2. 代码细节

1) BaseLoadBalancer.java配置初始化

重点功能: 1. 初始化负载均衡策略 2. 初始化取服务注册列表调度策略

void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
    ...
    // 每隔30s Ping一次
    int pingIntervalTime = Integer.parseInt(""
            + clientConfig.getProperty(
                    CommonClientConfigKey.NFLoadBalancerPingInterval,
                    Integer.parseInt("30")));
    // 每次最多Ping 2s
    int maxTotalPingTime = Integer.parseInt(""
            + clientConfig.getProperty(
                    CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
                    Integer.parseInt("2")));
    setPingInterval(pingIntervalTime);
    setMaxTotalPingTime(maxTotalPingTime);

    // cross associate with each other
    // i.e. Rule,Ping meet your container LB
    // LB, these are your Ping and Rule guys ...
    // 设置负载均衡规则
    setRule(rule);
    // 初始化取服务注册列表调度策略
    setPing(ping);

    setLoadBalancerStats(stats);
    rule.setLoadBalancer(this);
    ...
}

2) 负载均衡策略初始化

重点功能: 1. 默认使用轮询策略

BaseLoadBalancer.java

public void setRule(IRule rule) {
    if (rule != null) {
        this.rule = rule;
    } else {
        /* default rule */
        // 默认使用轮询策略
        this.rule = new RoundRobinRule();
    }
    if (this.rule.getLoadBalancer() != this) {
        this.rule.setLoadBalancer(this);
    }
}

RoundRobinRule.java

private AtomicInteger nextServerCyclicCounter;

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        log.warn("no load balancer");
        return null;
    }

    Server server = null;
    int count = 0;
    while (server == null && count++ < 10) {
        List<Server> reachableServers = lb.getReachableServers();
        List<Server> allServers = lb.getAllServers();
        int upCount = reachableServers.size();
        int serverCount = allServers.size();

        if ((upCount == 0) || (serverCount == 0)) {
            log.warn("No up servers available from load balancer: " + lb);
            return null;
        }
        // 轮询重点算法
        int nextServerIndex = incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);

        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }

        if (server.isAlive() && (server.isReadyToServe())) {
            return (server);
        }

        // Next.
        server = null;
    }

    if (count >= 10) {
        log.warn("No available alive servers after 10 tries from load balancer: "
                + lb);
    }
    return server;
}

private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextServerCyclicCounter.get();
        int next = (current + 1) % modulo;
        if (nextServerCyclicCounter.compareAndSet(current, next))
            return next;
    }
}

3) 初始化取服务注册列表调度策略

重点功能: 1. 设置轮询间隔为30s 一次

注意: 这里没有做实际的Ping,只是获取缓存的注册列表的alive服务,原因是为了提高性能

BaseLoadBalancer.java

public void setPing(IPing ping) {
    if (ping != null) {
        if (!ping.equals(this.ping)) {
            this.ping = ping;
            setupPingTask(); // since ping data changed
        }
    } else {
        this.ping = null;
        // cancel the timer task
        lbTimer.cancel();
    }
}

void setupPingTask() {
    if (canSkipPing()) {
        return;
    }
    if (lbTimer != null) {
        lbTimer.cancel();
    }
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
            true);
    // 这里虽然默认设置是10s一次,但是在初始化的时候,设置了30s一次
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}

class Pinger {

    private final IPingStrategy pingerStrategy;

    public Pinger(IPingStrategy pingerStrategy) {
        this.pingerStrategy = pingerStrategy;
    }

    public void runPinger() throws Exception {
        if (!pingInProgress.compareAndSet(false, true)) { 
            return; // Ping in progress - nothing to do
        }
        
        // we are "in" - we get to Ping

        Server[] allServers = null;
        boolean[] results = null;

        Lock allLock = null;
        Lock upLock = null;

        try {
            /*
             * The readLock should be free unless an addServer operation is
             * going on...
             */
            allLock = allServerLock.readLock();
            allLock.lock();
            allServers = allServerList.toArray(new Server[allServerList.size()]);
            allLock.unlock();

            int numCandidates = allServers.length;
            results = pingerStrategy.pingServers(ping, allServers);

            final List<Server> newUpList = new ArrayList<Server>();
            final List<Server> changedServers = new ArrayList<Server>();

            for (int i = 0; i < numCandidates; i++) {
                boolean isAlive = results[i];
                Server svr = allServers[i];
                boolean oldIsAlive = svr.isAlive();

                svr.setAlive(isAlive);

                if (oldIsAlive != isAlive) {
                    changedServers.add(svr);
                    logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                        name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                }

                if (isAlive) {
                    newUpList.add(svr);
                }
            }
            upLock = upServerLock.writeLock();
            upLock.lock();
            upServerList = newUpList;
            upLock.unlock();

            notifyServerStatusChangeListener(changedServers);
        } finally {
            pingInProgress.set(false);
        }
    }
}

private static class SerialPingStrategy implements IPingStrategy {

    @Override
    public boolean[] pingServers(IPing ping, Server[] servers) {
        int numCandidates = servers.length;
        boolean[] results = new boolean[numCandidates];

        logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);

        for (int i = 0; i < numCandidates; i++) {
            results[i] = false; /* Default answer is DEAD. */
            try {
                // NOTE: IFF we were doing a real ping
                // assuming we had a large set of servers (say 15)
                // the logic below will run them serially
                // hence taking 15 times the amount of time it takes
                // to ping each server
                // A better method would be to put this in an executor
                // pool
                // But, at the time of this writing, we dont REALLY
                // use a Real Ping (its mostly in memory eureka call)
                // hence we can afford to simplify this design and run
                // this
                // serially
                if (ping != null) {
                    results[i] = ping.isAlive(servers[i]);
                }
            } catch (Exception e) {
                logger.error("Exception while pinging Server: '{}'", servers[i], e);
            }
        }
        return results;
    }
}

4) 最后拼接完整URL使用JDK-HttpURLConnection进行调用

SynchronousMethodHandler.java(io.github.openfeign:feign-core:10.10.1/feign.SynchronousMethodHandler.java)

Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
    Request request = this.targetRequest(template);
    if (this.logLevel != Level.NONE) {
        this.logger.logRequest(this.metadata.configKey(), this.logLevel, request);
    }

    long start = System.nanoTime();

    Response response;
    try {
        response = this.client.execute(request, options);
        response = response.toBuilder().request(request).requestTemplate(template).build();
    } catch (IOException var13) {
        if (this.logLevel != Level.NONE) {
            this.logger.logIOException(this.metadata.configKey(), this.logLevel, var13, this.elapsedTime(start));
        }

        throw FeignException.errorExecuting(request, var13);
    }
    ...
}

LoadBalancerFeignClient.java

@Override
public Response execute(Request request, Request.Options options) throws IOException {
    try {
        URI asUri = URI.create(request.url());
        String clientName = asUri.getHost();
        URI uriWithoutHost = cleanUrl(request.url(), clientName);
        FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                this.delegate, request, uriWithoutHost);

        IClientConfig requestConfig = getClientConfig(options, clientName);
        return lbClient(clientName)
                .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
    }
    catch (ClientException e) {
        IOException io = findIOException(e);
        if (io != null) {
            throw io;
        }
        throw new RuntimeException(e);
    }
}

AbstractLoadBalancerAwareClient.java

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    // 获取真实访问URL
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        Throwable t = e.getCause();
        if (t instanceof ClientException) {
            throw (ClientException) t;
        } else {
            throw new ClientException(e);
        }
    }
    
}

FeignLoadBalancer.java

@Override
public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
        throws IOException {
    Request.Options options;
    if (configOverride != null) {
        RibbonProperties override = RibbonProperties.from(configOverride);
        options = new Request.Options(override.connectTimeout(this.connectTimeout),
                override.readTimeout(this.readTimeout));
    }
    else {
        options = new Request.Options(this.connectTimeout, this.readTimeout);
    }
    Response response = request.client().execute(request.toRequest(), options);
    return new RibbonResponse(request.getUri(), response);
}

feign.Client.java

@Override
public Response execute(Request request, Options options) throws IOException {
  HttpURLConnection connection = convertAndSend(request, options);
  return convertResponse(connection, request);
}

Response convertResponse(HttpURLConnection connection, Request request) throws IOException {
  // 使用HttpURLConnection 真实进行Http调用
  int status = connection.getResponseCode();
  String reason = connection.getResponseMessage();

  if (status < 0) {
    throw new IOException(format("Invalid status(%s) executing %s %s", status,
        connection.getRequestMethod(), connection.getURL()));
  }

  Map<String, Collection<String>> headers = new LinkedHashMap<>();
  for (Map.Entry<String, List<String>> field : connection.getHeaderFields().entrySet()) {
    // response message
    if (field.getKey() != null) {
      headers.put(field.getKey(), field.getValue());
    }
  }

  Integer length = connection.getContentLength();
  if (length == -1) {
    length = null;
  }
  InputStream stream;
  if (status >= 400) {
    stream = connection.getErrorStream();
  } else {
    stream = connection.getInputStream();
  }
  return Response.builder()
      .status(status)
      .reason(reason)
      .headers(headers)
      .request(request)
      .body(stream, length)
      .build();
}

拓展干货阅读:一线大厂面试题、高并发等主流技术资料

以上就是Spring Cloud Feign内部实现代码细节的详细内容,更多关于Spring Cloud Feign的资料请关注脚本之家其它相关文章!

相关文章

  • Java 括号匹配问题案例详解

    Java 括号匹配问题案例详解

    这篇文章主要介绍了Java 括号匹配问题案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08
  • ReentrantLock源码详解--公平锁、非公平锁

    ReentrantLock源码详解--公平锁、非公平锁

    ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程中使用频率很高的一个锁,表示能够对共享资源能够重复加锁,即当前线程获取该锁再次获取不会被阻塞。下面我们来深入了解一下它吧
    2019-06-06
  • Springboot使用jxls实现同sheet多个列表展示

    Springboot使用jxls实现同sheet多个列表展示

    这篇文章主要介绍了Springboot使用jxls实现同sheet多个列表展示,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-08-08
  • idea启动命令过长的问题及解决

    idea启动命令过长的问题及解决

    当IDEA启动命令过长时,可以通过修改workspace.xml文件或调整启动类配置来解决,方案一是在.idea文件或项目目录中修改workspace.xml;方案二是通过运行配置(run->edit)来保存启动设置,这两种方法都可以有效缩短命令长度,解决启动错误
    2024-09-09
  • spring cloud config 配置中心快速实现过程解析

    spring cloud config 配置中心快速实现过程解析

    这篇文章主要介绍了spring cloud config 配置中心快速实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-08-08
  • mybatis-plus 自定义 Service Vo接口实现数据库实体与 vo 对象转换返回功能

    mybatis-plus 自定义 Service Vo接口实现数据库实体与 vo

    这篇文章主要介绍了mybatis-plus 自定义 Service Vo接口实现数据库实体与 vo 对象转换返回功能,本文通过实例图文相结合给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧
    2024-08-08
  • java并发编程死锁定义及如何避免死锁

    java并发编程死锁定义及如何避免死锁

    这篇文章主要为大家介绍了java并发编程中死锁的详细说明及如何避免死锁的方法,有需要的朋友可以借鉴参考下希望能够有所帮助,祝大家多多进步
    2022-02-02
  • Spring Boot使用Log4j2的实例代码

    Spring Boot使用Log4j2的实例代码

    这篇文章主要介绍了Spring Boot使用Log4j2的实例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-07-07
  • Java线程优先级变量及功能

    Java线程优先级变量及功能

    这篇文章主要介绍了Java线程优先级变量及功能,关于优先级的问可能有两个或更多线程被分配了相同的优先级,那么它们的执行取决于操作系统,更多相关介绍,需要的小伙伴可以参考一下
    2022-06-06
  • 使用EasyExcel实现百万级别数据导出的代码示例

    使用EasyExcel实现百万级别数据导出的代码示例

    近期需要开发一个将百万数据量MySQL8的数据导出到excel的功能,所以本文讲给大家介绍了基于EasyExcel实现百万级别数据导出,文中通过代码示例讲解的非常详细,需要的朋友可以参考下
    2023-12-12

最新评论