Java中基于DeferredResult的异步服务详解

 更新时间:2023年12月15日 10:02:27   作者:爱喝咖啡的程序员  
这篇文章主要介绍了Java中基于DeferredResult的异步服务详解,DeferredResult字面意思是"延迟结果",它允许Spring MVC收到请求后,立即释放(归还)容器线程,以便容器可以接收更多的外部请求,提升吞吐量,需要的朋友可以参考下

一. 简介

Servlet3.0提供了基于servlet的异步处理api,Spring MVC只是将这些api进行了一系列的封装,从而实现了DeferredResult。

DeferredResult字面意思是"延迟结果",它允许Spring MVC收到请求后,立即释放(归还)容器线程,以便容器可以接收更多的外部请求,提升吞吐量,与此同时,DeferredResult将陷入阻塞,直到我们主动将结果set到DeferredResult,最后,DeferredResult会重新申请容器线程,并将本次请求返回给客户端。

二. 使用

1. 监听器 onTimeout()

当deferredResult被创建出来之后,执行setResult()之前,这之间的时间超过设定值时(比如下方案例中设置为5秒超时),则被判定为超时。

DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
// 设置超时事件
deferredResult.onTimeout(() -> {
    System.out.println("异步线程执行超时, 异步线程的名称: " + Thread.currentThread().getName());
    deferredResult.setResult("异步线程执行超时");
});

2. 监听器 onError()

当onTimeout()或onCompletion()等回调函数中的代码报错时,则会执行监听器onError()的回调函数。

PS: DeferredResult之外的代码报错不会影响到onError()。

DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
// 设置异常事件
deferredResult.onError((throwable) -> {
    System.out.println("异步请求出现错误,异步线程的名称: " + Thread.currentThread().getName() + "异常: " + throwable);
    deferredResult.setErrorResult("异步线程执行出错");
});

3. 监听器 onCompletion()

代码任意位置调用了同一个DeferredResult的setResult()后,则会被DeferredResult的onCompletion()监听器捕获到。

Spring会任选一条容器线程来执行onCompletion( )中的代码(由于请求线程已被释放(归还),所以此处可能再次由同一条请求线程来处理,也可能由其他线程来处理)。

DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
// 设置完成事件
deferredResult.onCompletion(() -> {
    System.out.println("异步线程执行完毕,异步线程的名称: " + Thread.currentThread().getName());
});

完整的代码为:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
public class DemoController {
    // 自定义线程池
    public static ExecutorService exec = Executors.newCachedThreadPool();
    @GetMapping("/demo")
    public DeferredResult<String> demoResult() {
        System.out.println("容器线程: " + Thread.currentThread().getName());
        // 创建DeferredResult对象,设置超时时长 20秒
        DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
        // 设置超时事件
        deferredResult.onTimeout(() -> {
            System.out.println("异步线程执行超时, 异步线程的名称: " + Thread.currentThread().getName());
            // throw new RuntimeException("超时事件报错了!");
            deferredResult.setResult("异步线程执行超时");
        });
        // 设置异常事件
        deferredResult.onError((throwable) -> {
            System.out.println("异步请求出现错误,异步线程的名称: " + Thread.currentThread().getName() + "异常: " + throwable);
            deferredResult.setErrorResult("异步线程执行出错");
        });
        // 设置完成事件
        deferredResult.onCompletion(() -> {
            System.out.println("异步线程执行完毕,异步线程的名称: " + Thread.currentThread().getName());
        });
        exec.execute(() -> {
            System.out.println("[线程池] 异步线程的名称: " + Thread.currentThread().getName());
            deferredResult.setResult("异步线程执行完毕");
        });
        System.out.println("Servlet thread release");
        return deferredResult;
    }
}

三. 拓展

有些业务场景下,我们希望新的请求触发(激活)之前陷入阻塞的请求,此外可以通过不同的key来区分不同的请求。

比如apollo在实现时就利用了DeferredResult。客户端向服务器端发送轮询请求,服务端收到请求后,会立刻释放容器线程,并阻塞本次请求,若apollo托管的配置文件没有发生任何改变,则轮询请求会超时(返回304)。当有新的配置发布时,服务端会调用DeferredResult setResult()方法,进入onCompletion(),并使尚未超时的轮寻请求正常返回(200)。

大概如下: 

@SpringBootApplication
public class DemoApplication implements WebMvcConfigurer {
 
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}
 
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Collection;
@RestController
public class ApolloController {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    //guava中的Multimap,多值map,对map的增强,一个key可以保持多个value
    private Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedSetMultimap(HashMultimap.create());
    //模拟长轮询
    @RequestMapping(value = "/watch/{namespace}", method = RequestMethod.GET, produces = "text/html")
    public DeferredResult<String> watch(@PathVariable("namespace") String namespace) {
        logger.info("Request received");
        DeferredResult<String> deferredResult = new DeferredResult<>();
        //当deferredResult完成时(不论是超时还是异常还是正常完成),移除watchRequests中相应的watch key
        deferredResult.onCompletion(new Runnable() {
            @Override
            public void run() {
                System.out.println("remove key:" + namespace);
                watchRequests.remove(namespace, deferredResult);
            }
        });
        watchRequests.put(namespace, deferredResult);
        logger.info("Servlet thread released");
        return deferredResult;
    }
    //模拟发布namespace配置
    @RequestMapping(value = "/publish/{namespace}", method = RequestMethod.GET, produces = "text/html")
    public Object publishConfig(@PathVariable("namespace") String namespace) {
        if (watchRequests.containsKey(namespace)) {
            Collection<DeferredResult<String>> deferredResults = watchRequests.get(namespace);
            Long time = System.currentTimeMillis();
            //通知所有watch这个namespace变更的长轮训配置变更结果
            for (DeferredResult<String> deferredResult : deferredResults) {
                deferredResult.setResult(namespace + " changed:" + time);
            }
        }
        return "success";
    }
}

当请求超时的时候会产生AsyncRequestTimeoutException,我们定义一个全局异常捕获类:

 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ControllerAdvice
class GlobalControllerExceptionHandler {
    protected static final Logger logger = LoggerFactory.getLogger(GlobalControllerExceptionHandler.class);
    @ResponseStatus(HttpStatus.NOT_MODIFIED)//返回304状态码
    @ResponseBody
    @ExceptionHandler(AsyncRequestTimeoutException.class) //捕获特定异常
    public void handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e, HttpServletRequest request) {
        System.out.println("handleAsyncRequestTimeoutException");
    }
}

然后我们通过postman工具发送请求//localhost:8080/watch/mynamespace,请求会挂起,60秒后,DeferredResult超时,客户端正常收到了304状态码,表明在这个期间配置没有变更过。

然后我们在模拟配置变更的情况,再次发起请求//localhost:8080/watch/mynamespace,等待个10秒钟(不要超过60秒),然后调用//localhost:8080/publish/mynamespace,发布配置变更。这时postman会立刻收到response响应结果:  

mynamespace changed:1538880050147

表明在轮训期间有配置变更过。

这里我们用了一个MultiMap来存放所有轮训的请求,Key对应的是namespace,value对应的是所有watch这个namespace变更的异步请求DeferredResult,需要注意的是:在DeferredResult完成的时候记得移除MultiMap中相应的key,避免内存溢出请求。

采用这种长轮询的好处是,相比一直循环请求服务器,实例一多的话会对服务器产生很大的压力,http长轮询的方式会在服务器变更的时候主动推送给客户端,其他时间客户端是挂起请求的,这样同时满足了性能和实时性。

四. DeferredResult与Callable的区别

DeferredResult和Callable都可以在Controller层的方法中直接返回,请求收到后,释放容器线程,在另一个线程中通过异步的方式执行任务,最后将请求返回给客户端。

不同之处在于,使用Callable时,当其它线程中的任务执行完毕后,请求会立刻返回给客户端,而DeferredResult则需要用户在代码中手动set值到DeferredResult,否则即便异步线程中的任务执行完毕,DeferredResult仍然不会向客户端返回任何结果。

到此这篇关于Java中基于DeferredResult的异步服务详解的文章就介绍到这了,更多相关基于DeferredResult的异步服务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • JavaWeb利用struts实现文件下载时改变文件名称

    JavaWeb利用struts实现文件下载时改变文件名称

    这篇文章主要为大家详细介绍了JavaWeb利用struts实现文件下载时改变文件名称的相关资料,需要的朋友可以参考下
    2016-06-06
  • SpringBoot引入swagger报错处理的解决方法

    SpringBoot引入swagger报错处理的解决方法

    这篇文章主要给大家介绍SpringBoot引入swagger是会出现报错的处理解决方法,文中有详细的解决过程,感兴趣的小伙伴可以跟着小编一起来学习吧
    2023-06-06
  • Nacos源码之注册中心的实现详解

    Nacos源码之注册中心的实现详解

    这篇文章主要为大家介绍了Nacos源码之注册中心的实现详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-02-02
  • 详解Java线程池的增长过程

    详解Java线程池的增长过程

    在本篇文章里小编给大家整理的是关于Java线程池的增长过程以及相关知识点,需要的朋友们可以参考下。
    2019-08-08
  • Java流式操作之Collectors工具类操作指南

    Java流式操作之Collectors工具类操作指南

    Collectors是Collector的工具类,类中提供了很多流收集、归约、分组、分区等方法,方便我们直接使用,下面这篇文章主要给大家介绍了关于Java流式操作之Collectors工具类操作的相关资料,需要的朋友可以参考下
    2023-05-05
  • Java设计模式之抽象工厂模式

    Java设计模式之抽象工厂模式

    这篇文章主要为大家详细介绍了Java设计模式之抽象工厂模式的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-03-03
  • spring-security关闭登录框的实现示例

    spring-security关闭登录框的实现示例

    这篇文章主要介绍了spring-security关闭登录框的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-05-05
  • SpringBoot如何使用TestEntityManager进行JPA集成测试

    SpringBoot如何使用TestEntityManager进行JPA集成测试

    TestEntityManager是Spring Framework提供的一个测试框架,它可以帮助我们进行 JPA 集成测试,在本文中,我们将介绍如何使用 TestEntityManager 进行 JPA 集成测试,感兴趣的跟着小编一起来学习吧
    2023-06-06
  • spring-cloud入门之eureka-server(服务发现)

    spring-cloud入门之eureka-server(服务发现)

    本篇文章主要介绍了spring-cloud入门之eureka-server(服务发现),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-01-01
  • Java实现平滑加权轮询算法之降权和提权详解

    Java实现平滑加权轮询算法之降权和提权详解

    所有负载均衡的场景几乎都会用到这个平滑加权轮询算法,下面这篇文章主要给大家介绍了关于Java实现平滑加权轮询算法之降权和提权的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2022-04-04

最新评论