Reactor定制一个生产的WebClient实现示例
1 为什么要用 WebClient
刚开始尝试使用 Spring WebFlux 的时候,很多人都会使用 Mono.fromFuture() 将异步请求转成 Mono 对象,或者 Mono.fromSupplier() 将请求转成 MOno 对象,这两种方式在响应式编程中都是不建议的,都会阻塞当前线程。
1.1 Mono.fromFuture() VS WebClient
Mono.fromFuture()方法和使用 WebClient 调用第三方接口之间存在以下区别:
- 异步 vs. 非阻塞
Mono.fromFuture()方法适用于接收一个 java.util.concurrent.Future 对象,并将其转换为响应式的 Mono。这是一个阻塞操作,因为它会等待 Future 对象完成。而使用 WebClient 调用第三方接口是异步和非阻塞的,它不会直接阻塞应用程序的执行,而是使用事件驱动的方式处理响应。
可扩展性和灵活性:使用 WebClient 可以更灵活地进行配置和处理,例如设置超时时间、请求头、重试机制等。WebClient 还可以与许多其他 Spring WebFlux 组件集成,如 WebSockets、Server-Sent Events 等。而 Mono.fromFuture() 是适用于单个 Future 对象转化为 Mono 的情况,可扩展性较差。
- 错误处理
WebClient 提供了更丰富的错误处理机制,可以通过 onStatus、onError 等方法来处理不同的 HTTP 状态码或异常。同时,WebClient 还提供了更灵活的重试和回退策略。Mono.fromFuture() 方法只能将 Future 对象的结果包装在 Mono 中,不提供特定的错误处理机制。
- 阻塞操作
Mono.fromFuture() 会阻塞。当调用 Mono.fromFuture() 方法将 Future 转换为 Mono 时,它会等待 Future 对象的结果返回。在这个等待的过程中,Mono.fromFuture()方法会阻塞当前的线程。这意味着,如果 Future 的结果在运行过程中没有返回,则当前线程会一直阻塞,直到 Future 对象返回结果或者超时。因此,在使用 Mono.fromFuture() 时需要注意潜在的阻塞风险。另外,需要确保F uture 的任务在后台线程中执行,以免阻塞应用程序的主线程。
1.2 Mono.fromFuture VS Mono.fromSupplier
Mono.fromSupplier() 和 Mono.fromFuture() 都是用于将异步执行的操作转换为响应式的 Mono 对象,但它们的区别在于:
Mono.fromSupplier() 适用于一个提供者/生产者,可以用来表示某个操作的结果,该操作是一些纯计算并且没有阻塞的方法。也就是说,Mono.fromSupplier() 将其参数 (Supplier) 所提供的操作异步执行,并将其结果打包成一个 Mono 对象。
Mono.fromFuture() 适用于一个 java.util.concurrent.Future 对象,将其封装成 Mono 对象。这意味着调用 Mono.fromFuture() 方法将阻塞当前线程,直到异步操作完成返回一个 Future 对象。
因此,Mono.fromSupplier() 与 Mono.fromFuture() 的主要区别在于:
Mono.fromSupplier() 是一个非阻塞的操作,不会阻塞当前线程。这个方法用于执行计算型的任务,返回一个封装了计算结果的 Mono 对象。
Mono.fromFuture() 是阻塞操作,会阻塞当前线程,直到异步操作完毕并返回看,它适用于处理 java.util.concurrent.Future 对象。
需要注意的是,如果 Supplier 提供的操作是阻塞的,则 Mono.fromSupplier() 方法本身也会阻塞线程。但通常情况下,Supplier 提供的操作是纯计算型的,不会阻塞线程。
因此,可以使用 Mono.fromSupplier() 方法将一个纯计算型的操作转换为 Mono 对象,而将一个异步返回结果的操作转换为 Mono 对象时,可以使用 Mono.fromFuture() 方法。
2 定制化自己的 WebClient
2.1 初始化 WebClient
WebClient 支持建造者模式,使用 WebClient 建造者模式支持开发自己的个性化 WebClient,比如支持设置接口调用统一耗时、自定义底层 Http 客户端、调用链路、打印接口返回日志、监控接口耗时等等。
WebClient builder 支持以下方法
interface Builder { /** * 配置请求基础的url,如:baseUrl = "https://abc.go.com/v1";和 uriBuilderFactory 冲突,如果有 uriBuilderFactory ,则忽略 baseUrl */ Builder baseUrl(String baseUrl); /** * URI 请求的默认变量。也和 uriBuilderFactory 冲突,如果有 uriBuilderFactory ,则忽略 defaultUriVariables */ Builder defaultUriVariables(Map<String, ?> defaultUriVariables); /** * 提供一个预配置的UriBuilderFactory实例 */ Builder uriBuilderFactory(UriBuilderFactory uriBuilderFactory); /** * 默认 header */ Builder defaultHeader(String header, String... values); /** * 默认cookie */ Builder defaultCookie(String cookie, String... values); /** * 提供一个 consumer 来定制每个请求 */ Builder defaultRequest(Consumer<RequestHeadersSpec<?>> defaultRequest); /** * 添加一个filter,可以添加多个 */ Builder filter(ExchangeFilterFunction filter); /** * 配置要使用的 ClientHttpConnector。这对于插入或自定义底层HTTP 客户端库(例如SSL)的选项非常有用。 */ Builder clientConnector(ClientHttpConnector connector); /** * Configure the codecs for the {@code WebClient} in the * {@link #exchangeStrategies(ExchangeStrategies) underlying} * {@code ExchangeStrategies}. * @param configurer the configurer to apply * @since 5.1.13 */ Builder codecs(Consumer<ClientCodecConfigurer> configurer); /** * 提供一个预先配置了ClientHttpConnector和ExchangeStrategies的ExchangeFunction。 这是对 clientConnector 的一种替代,并且有效地覆盖了它们。 */ Builder exchangeFunction(ExchangeFunction exchangeFunction); /** * Builder the {@link WebClient} instance. */ WebClient build(); // 其他方法 }
2.2 日志打印及监控
- 打印参数、url、返回
- 参数和返回需要转成json
- 需要打印正常返回日志和异常
- 正常监控、异常监控、总监控以及响应时间
.doOnSuccess(response-> { log.info("get.success, url={}, response={}, param={}", url, response); }) .doOnError(error-> { log.info("get.error, url={}", url, error); // 监控 }) .doFinally(res-> { //监控 })
2.3 返回处理
retrieve() // 声明如何提取响应。例如,提取一个ResponseEntity的状态,头部和身体:
.bodyToMono(clazz) 将返回body内容转成clazz对象,clazz 对象可以自己指定类型。如果碰到有问题的无法转化的,也可以先转成String,然后自己实现一个工具类,将String转成 class 对象。
2.3.1 get
public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) { long start = System.currentTimeMillis(); return webClient.get() .uri(url) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("get.success, url={}, response={}, param={}", url, response); }) .doOnError(error-> { log.info("get.param.error, url={}", url, error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); }
2.3.2 get param 请求
public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) { long start = System.currentTimeMillis(); URI uri = UriComponentsBuilder.fromUriString(url) .queryParams(param) .build() .toUri(); return webClient.get() .uri(uri) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param)); }) .doOnError(error-> { log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error); }) .onErrorReturn(defaultClass) .doFinally(res-> { // 监控 or 打印日志 or 耗时 }) .publishOn(customScheduler); }
2.3.3 post json 请求
public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) { final long start = System.currentTimeMillis(); return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_JSON) .cookies(cookies -> cookies.setAll(parameter.getCookies())) .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType())) .headers(headers -> headers.setAll(parameter.getHeaders())) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody()); }) .doOnError(error-> { log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); }
2.3.4 post form Data 请求
public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) { final long start = System.currentTimeMillis(); return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_FORM_URLENCODED) .cookies(cookies -> cookies.setAll(parameter.getCookies())) .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam())) .headers(headers -> headers.setAll(parameter.getMapHeaders())) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter)); }) .doOnError(error-> { log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); }
2.4 异常处理
异常返回兜底
onErrorReturn 发现异常返回兜底数据
异常处理
状态码转成异常抛出
.onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode())))
监控异常
.doOnError(error -> { // log and monitor })
3 完整的 WebClient
package com.geniu.reactor.webclient; import com.geniu.utils.JsonUtil; import io.netty.channel.ChannelOption; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.util.UriComponentsBuilder; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; import reactor.netty.tcp.SslProvider; import reactor.netty.tcp.TcpClient; import java.net.URI; import java.time.Duration; import java.util.function.Function; /** * @Author: prepared * @Date: 2023/8/15 11:05 */ @Slf4j public class CustomerWebClient { public static final CustomerWebClient instance = new CustomerWebClient(); /** * 限制并发数 100 */ Scheduler customScheduler = Schedulers.newParallel("CustomScheduler", 100); private final WebClient webClient; private CustomerWebClient() { final SslContextBuilder sslBuilder = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE); final SslProvider ssl = SslProvider.builder().sslContext(sslBuilder) .defaultConfiguration(SslProvider.DefaultConfigurationType.TCP).build(); final int cpuCores = Runtime.getRuntime().availableProcessors(); final int selectorCount = Math.max(cpuCores / 2, 4); final int workerCount = Math.max(cpuCores * 2, 8); final LoopResources pool = LoopResources.create("HCofSWC", selectorCount, workerCount, true); final Function<? super TcpClient, ? extends TcpClient> tcpMapper = tcp -> tcp .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .option(ChannelOption.SO_TIMEOUT, 10000) .secure(ssl) .runOn(pool); ConnectionProvider.Builder httpClientOfSWC = ConnectionProvider .builder("HttpClientOfSWC") .maxConnections(100_000) .pendingAcquireTimeout(Duration.ofSeconds(6)); final ConnectionProvider connectionProvider = httpClientOfSWC.build(); final HttpClient hc = HttpClient.create(connectionProvider) .tcpConfiguration(tcpMapper); final Function<HttpClient, HttpClient> hcMapper = rhc -> rhc .compress(true); final WebClient.Builder wcb = WebClient.builder() .clientConnector(new ReactorClientHttpConnector(hcMapper.apply(hc))); // .filter(new TraceRequestFilter()); 可以通过Filter 增加trace追踪 this.webClient = wcb.build(); } public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) { long start = System.currentTimeMillis(); return webClient.get() .uri(url) .accept(MediaType.APPLICATION_JSON) .retrieve() .onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode()))) .bodyToMono(clazz) .doOnSuccess(response-> { log.info("get.success, url={}, response={}, param={}", url, response); }) .doOnError(error-> { log.info("get.param.error, url={}", url, error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); } public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) { long start = System.currentTimeMillis(); URI uri = UriComponentsBuilder.fromUriString(url) .queryParams(param) .build() .toUri(); return webClient.get() .uri(uri) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param)); }) .doOnError(error-> { log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); } public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) { final long start = System.currentTimeMillis(); return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_JSON) .cookies(cookies -> cookies.setAll(parameter.getCookies())) .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType())) .headers(headers -> headers.setAll(parameter.getHeaders())) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody()); }) .doOnError(error-> { log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); } public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) { final long start = System.currentTimeMillis(); return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_FORM_URLENCODED) .cookies(cookies -> cookies.setAll(parameter.getCookies())) .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam())) .headers(headers -> headers.setAll(parameter.getMapHeaders())) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response-> { log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter)); }) .doOnError(error-> { log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error); }) .onErrorReturn(defaultClass) .doFinally(res-> { }) .publishOn(customScheduler); } }
以上就是Reactor定制一个生产的WebClient实现示例的详细内容,更多关于Reactor定制生产WebClient的资料请关注脚本之家其它相关文章!
相关文章
Spring应用中使用acutator/refresh刷新属性不生效的问题分析及解决
在Spring应用收到/actuator/refresh的POST请求后,标注了@RefreshScope以及@ConfiguratioinProperties的bean会被Spring容器重新加载,但是,在实际应用中,并没有按照预期被Spring容器加载,本文将讨论导致这种未按预期刷新的一种原因,感兴趣的朋友可以参考下2024-01-01SpringCloud Gateway读取Request Body方式
这篇文章主要介绍了SpringCloud Gateway读取Request Body方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-03-03
最新评论