Spring Cloud Feign组件实例解析
这篇文章主要介绍了Spring Cloud Feign组件实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
采用Spring Cloud微服务框架后,经常会涉及到服务间调用,服务间调用采用了Feign组件。
由于之前有使用dubbo经验。dubbo的负载均衡策略(轮训、最小连接数、随机轮训、加权轮训),dubbo失败策略(快速失败、失败重试等等),
所以Feign负载均衡策略的是什么? 失败后是否会重试,重试策略又是什么?带这个疑问,查了一些资料,最后还是看了下代码。毕竟代码就是一切
Spring boot集成Feign的大概流程:
1、利用FeignAutoConfiguration自动配置。并根据EnableFeignClients 自动注册产生Feign的代理类。
2、注册方式利用FeignClientFactoryBean,熟悉Spring知道FactoryBean 产生bean的工厂,有个重要方法getObject产生FeignClient容器bean
3、同时代理类中使用hystrix做资源隔离,Feign代理类中 构造 RequestTemplate ,RequestTemlate要做的向负载均衡选中的server发送http请求,并进行编码和解码一系列操作。
下面只是粗略的看了下整体流程,先有整体再有细节吧,下面利用IDEA看下细节:
一、Feign失败重试
SynchronousMethodHandler的方法中的处理逻辑:
@Override public Object invoke(Object[] argv) throws Throwable { RequestTemplate template = buildTemplateFromArgs.create(argv); Retryer retryer = this.retryer.clone(); while (true) { try { return executeAndDecode(template); } catch (RetryableException e) { retryer.continueOrPropagate(e); if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } continue; } } }
- 上面的逻辑很简单。构造 template 并去进行服务间的http调用,然后对返回结果进行解码
- 当抛出 RetryableException 后,异常逻辑是否重试? 重试多少次? 带这个问题,看了retryer.continueOrPropagate(e);
具体逻辑如下:
public void continueOrPropagate(RetryableException e) { if (attempt++ >= maxAttempts) { throw e; } long interval; if (e.retryAfter() != null) { interval = e.retryAfter().getTime() - currentTimeMillis(); if (interval > maxPeriod) { interval = maxPeriod; } if (interval < 0) { return; } } else { interval = nextMaxInterval(); } try { Thread.sleep(interval); } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } sleptForMillis += interval; }
- 当重试次数大于默认次数5时候,直接抛出异常,不在重试
- 否则每隔一段时间 默认值最大1ms 后重试一次。
这就Feign这块的重试这块的粗略逻辑,由于之前工作中一直使用dubbo。同样是否需要将生产环境中重试操作关闭?
思考:之前dubbo生产环境的重试操作都会关闭。原因有几个:
- 一般第一次失败,重试也会失败,极端情况下不断的重试,会占用大量dubbo连接池,造成连接池被打满,影响核心功能
- 也是比较重要的一点原因,重试带来的业务逻辑的影响,即如果接口不是幂等的,重试会带来业务逻辑的错误,引发问题
二、Feign负载均衡策略
那么负载均衡的策略又是什么呢?由上图中可知 executeAndDecode(template)
Object executeAndDecode(RequestTemplate template) throws Throwable { Request request = targetRequest(template); if (logLevel != Logger.Level.NONE) { logger.logRequest(metadata.configKey(), logLevel, request); } Response response; long start = System.nanoTime(); try { response = client.execute(request, options); // ensure the request is set. TODO: remove in Feign 10 response.toBuilder().request(request).build(); } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start)); } throw errorExecuting(request, e); } long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); boolean shouldClose = true; try { if (logLevel != Logger.Level.NONE) { response = logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime); // ensure the request is set. TODO: remove in Feign 10 response.toBuilder().request(request).build(); } if (Response.class == metadata.returnType()) { if (response.body() == null) { return response; } if (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) { shouldClose = false; return response; } // Ensure the response body is disconnected byte[] bodyData = Util.toByteArray(response.body().asInputStream()); return response.toBuilder().body(bodyData).build(); } if (response.status() >= 200 && response.status() < 300) { if (void.class == metadata.returnType()) { return null; } else { return decode(response); } } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) { return decode(response); } else { throw errorDecoder.decode(metadata.configKey(), response); } } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime); } throw errorReading(request, response, e); } finally { if (shouldClose) { ensureClosed(response.body()); } } }
概括的说主要做了两件事:发送HTTP请求,解码响应数据
想看的负载均衡应该在11行 response = client.execute(request, options); 而client的实现方式有两种 Default、LoadBalancerFeignClient
猜的话应该是LoadBalancerFeignClient,带这个问题去看源码(其实个人更喜欢带着问题看源码,没有目的一是看很难将复杂的源码关联起来,二是很容易迷失其中)
果然通过一番查找发现 Client 实例就是LoadBalancerFeignClient,而设置这个Client就是通过上面说的FeignClientFactoryBean的getObject方法中设置的,具体不说了
下面重点看LoadBalancerFeignClient execute(request, options)
@Override public Response execute(Request request, Request.Options options) throws IOException { try { URI asUri = URI.create(request.url()); String clientName = asUri.getHost(); URI uriWithoutHost = cleanUrl(request.url(), clientName); FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( this.delegate, request, uriWithoutHost); IClientConfig requestConfig = getClientConfig(options, clientName); return lbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch (ClientException e) { IOException io = findIOException(e); if (io != null) { throw io; } throw new RuntimeException(e); } }
通过几行代码比较重要的点RibbonRequest ,原来Feign负载均衡还是通过Ribbon实现的,那么Ribbo又是如何实现负载均衡的呢?
public Observable<T> submit(final ServerOperation<T> operation) { final ExecutionInfoContext context = new ExecutionInfoContext(); if (listenerInvoker != null) { try { listenerInvoker.onExecutionStart(); } catch (AbortExecutionException e) { return Observable.error(e); } } final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); // Use the load balancer Observable<T> o = (server == null ? selectServer() : Observable.just(server)) .concatMap(new Func1<Server, Observable<T>>() { @Override // Called for each server being selected public Observable<T> call(Server server) { context.setServer(server); final ServerStats stats = loadBalancerContext.getServerStats(server); // Called for each attempt and retry Observable<T> o = Observable .just(server) .concatMap(new Func1<Server, Observable<T>>() { @Override public Observable<T> call(final Server server) { context.incAttemptCount(); loadBalancerContext.noteOpenConnection(stats); if (listenerInvoker != null) { try { listenerInvoker.onStartWithServer(context.toExecutionInfo()); } catch (AbortExecutionException e) { return Observable.error(e); } } final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start(); return operation.call(server).doOnEach(new Observer<T>() { private T entity; @Override public void onCompleted() { recordStats(tracer, stats, entity, null); // TODO: What to do if onNext or onError are never called? } @Override public void onError(Throwable e) { recordStats(tracer, stats, null, e); logger.debug("Got error {} when executed on server {}", e, server); if (listenerInvoker != null) { listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo()); } } @Override public void onNext(T entity) { this.entity = entity; if (listenerInvoker != null) { listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo()); } } private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) { tracer.stop(); loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler); } }); } }); if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); return o; } }); if (maxRetrysNext > 0 && server == null) o = o.retry(retryPolicy(maxRetrysNext, false)); return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() { @Override public Observable<T> call(Throwable e) { if (context.getAttemptCount() > 0) { if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), e); } else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), e); } } if (listenerInvoker != null) { listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo()); } return Observable.error(e); } }); }
通过上面代码分析,发现Ribbon和Hystrix一样都是利用了rxjava看来有必要掌握下rxjava了又。这里面 比较重要的就是17行,
selectServer() 方法选择指定的Server,负载均衡的策略主要是有ILoadBalancer接口不同实现方式:
- BaseLoadBalancer采用的规则为RoundRobinRule 轮训规则
- DynamicServerListLoadBalancer继承了BaseLoadBalancer,主要运行时改变Server列表
- NoOpLoadBalancer 什么操作都不做
- ZoneAwareLoadBalancer 功能主要是根据区域Zone分组的实例列表
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。
相关文章
Hystrix Dashboard断路监控仪表盘的实现详细介绍
这篇文章主要介绍了Hystrix Dashboard断路监控仪表盘的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2022-09-09彻底解决IDEA中SpringBoot热部署无效的问题(推荐)
这篇文章主要介绍了彻底解决IDEA中SpringBoot热部署无效的问题,本文给大家带来问题原因分析通过图文实例相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧2020-09-09springboot HandlerIntercepter拦截器修改request body数据的操作
这篇文章主要介绍了springboot HandlerIntercepter拦截器修改request body数据的操作,具有很好的参考价值,希望对大家有所帮助。2021-06-06
最新评论