Spring Cloud Gateway重试机制原理解析
重试,我相信大家并不陌生。在我们调用Http接口的时候,总会因为某种原因调用失败,这个时候我们可以通过重试的方式,来重新请求接口。
生活中这样的事例很多,比如打电话,对方正在通话中啊,信号不好啊等等原因,你总会打不通,当你第一次没打通之后,你会打第二次,第三次…第四次就通了。
重试也要注意应用场景,读数据的接口比较适合重试的场景,写数据的接口就需要注意接口的幂等性了。还有就是重试次数如果太多的话会导致请求量加倍,给后端造成更大的压力,设置合理的重试机制才是最关键的。
今天我们来简单的了解下Spring Cloud Gateway中的重试机制和使用。
使用讲解
RetryGatewayFilter是Spring Cloud Gateway对请求重试提供的一个GatewayFilter Factory。
配置方式:
spring: cloud: gateway: routes: - id: fsh-house uri: lb://fsh-house predicates: - Path=/house/** filters: - name: Retry args: retries: 3 series: - SERVER_ERROR statuses: - OK methods: - GET - POST exceptions: - java.io.IOException
配置讲解
配置类源码:org.springframework.cloud.gateway.filter.factory.RetryGatewayFilterFactory.RetryConfig:
public static class RetryConfig { private int retries = 3; private List<Series> series = toList(Series.SERVER_ERROR); private List<HttpStatus> statuses = new ArrayList<>(); private List<HttpMethod> methods = toList(HttpMethod.GET); private List<Class<? extends Throwable>> exceptions = toList(IOException.class); // ..... }
retries:重试次数,默认值是3次
series:状态码配置(分段),符合的某段状态码才会进行重试逻辑,默认值是SERVER_ERROR,值是5,也就是5XX(5开头的状态码),共有5个值:
public enum Series { INFORMATIONAL(1), SUCCESSFUL(2), REDIRECTION(3), CLIENT_ERROR(4), SERVER_ERROR(5); }
statuses:状态码配置,和series不同的是这边是具体状态码的配置,取值请参考:org.springframework.http.HttpStatus
methods:指定哪些方法的请求需要进行重试逻辑,默认值是GET方法,取值如下:
public enum HttpMethod { GET, HEAD, POST, PUT, PATCH, DELETE, OPTIONS, TRACE; }
exceptions:指定哪些异常需要进行重试逻辑,默认值是java.io.IOException
代码测试
就写个接口,在接口中记录请求次数,然后抛出一个异常模拟500,通过网关访问这个接口,如果你配置了重试次数是3,那么接口中会输出4次结果才是对的,证明重试生效了。
AtomicInteger ac = new AtomicInteger(); @GetMapping("/data") public HouseInfo getData(@RequestParam("name") String name) { if (StringUtils.isBlank(name)) { throw new RuntimeException("error"); } System.err.println(ac.addAndGet(1)); return new HouseInfo(1L, "上海", "虹口", "XX小区"); }
源码欣赏
@Override public GatewayFilter apply(RetryConfig retryConfig) { // 验证重试配置格式是否正确 retryConfig.validate(); Repeat<ServerWebExchange> statusCodeRepeat = null; if (!retryConfig.getStatuses().isEmpty() || !retryConfig.getSeries().isEmpty()) { Predicate<RepeatContext<ServerWebExchange>> repeatPredicate = context -> { ServerWebExchange exchange = context.applicationContext(); // 判断重试次数是否已经达到了配置的最大值 if (exceedsMaxIterations(exchange, retryConfig)) { return false; } // 获取响应的状态码 HttpStatus statusCode = exchange.getResponse().getStatusCode(); // 获取请求方法类型 HttpMethod httpMethod = exchange.getRequest().getMethod(); // 判断响应状态码是否在配置中存在 boolean retryableStatusCode = retryConfig.getStatuses().contains(statusCode); if (!retryableStatusCode && statusCode != null) { // null status code might mean a network exception? // try the series retryableStatusCode = retryConfig.getSeries().stream() .anyMatch(series -> statusCode.series().equals(series)); } // 判断方法是否包含在配置中 boolean retryableMethod = retryConfig.getMethods().contains(httpMethod); // 决定是否要进行重试 return retryableMethod && retryableStatusCode; }; statusCodeRepeat = Repeat.onlyIf(repeatPredicate) .doOnRepeat(context -> reset(context.applicationContext())); } //TODO: support timeout, backoff, jitter, etc... in Builder Retry<ServerWebExchange> exceptionRetry = null; if (!retryConfig.getExceptions().isEmpty()) { Predicate<RetryContext<ServerWebExchange>> retryContextPredicate = context -> { if (exceedsMaxIterations(context.applicationContext(), retryConfig)) { return false; } // 异常判断 for (Class<? extends Throwable> clazz : retryConfig.getExceptions()) { if (clazz.isInstance(context.exception())) { return true; } } return false; }; // 使用reactor extra的retry组件 exceptionRetry = Retry.onlyIf(retryContextPredicate) .doOnRetry(context -> reset(context.applicationContext())) .retryMax(retryConfig.getRetries()); } return apply(statusCodeRepeat, exceptionRetry); } public boolean exceedsMaxIterations(ServerWebExchange exchange, RetryConfig retryConfig) { Integer iteration = exchange.getAttribute(RETRY_ITERATION_KEY); //TODO: deal with null iteration return iteration != null && iteration >= retryConfig.getRetries(); } public void reset(ServerWebExchange exchange) { //TODO: what else to do to reset SWE? exchange.getAttributes().remove(ServerWebExchangeUtils.GATEWAY_ALREADY_ROUTED_ATTR); } public GatewayFilter apply(Repeat<ServerWebExchange> repeat, Retry<ServerWebExchange> retry) { return (exchange, chain) -> { if (log.isTraceEnabled()) { log.trace("Entering retry-filter"); } // chain.filter returns a Mono<Void> Publisher<Void> publisher = chain.filter(exchange) //.log("retry-filter", Level.INFO) .doOnSuccessOrError((aVoid, throwable) -> { // 获取已经重试的次数,默认值为-1 int iteration = exchange.getAttributeOrDefault(RETRY_ITERATION_KEY, -1); // 增加重试次数 exchange.getAttributes().put(RETRY_ITERATION_KEY, iteration + 1); }); if (retry != null) { // retryWhen returns a Mono<Void> // retry needs to go before repeat publisher = ((Mono<Void>)publisher).retryWhen(retry.withApplicationContext(exchange)); } if (repeat != null) { // repeatWhen returns a Flux<Void> // so this needs to be last and the variable a Publisher<Void> publisher = ((Mono<Void>)publisher).repeatWhen(repeat.withApplicationContext(exchange)); } return Mono.fromDirect(publisher); }; }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。
相关文章
Spring Boot与Spark、Cassandra系统集成开发示例
本文演示以Spark作为分析引擎,Cassandra作为数据存储,而使用Spring Boot来开发驱动程序的示例。对spring boot 与spark cassandra集成开发示例代码感兴趣的朋友跟着脚本之家小编一起学习吧2018-02-02SpringBoot框架集成ElasticSearch实现过程示例详解
这篇文章主要为大家介绍了SpringBoot如何集成ElasticSearch的实现过程示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步2021-11-11
最新评论