Mybatis流式查询之ResultHanlde问题
前言
正常访问数据库的查询操作,都是根据查询sql一次性返回查询结果。
但如果遇到目标数据量过大、且需要全量查询、不能分页、或者内存不想被返回的结果占用过多等需求时(例如导出excel),就可能需要流式查询。
1.准备工作
1.1.Mybaits的jar包引入
注:idea必须配置build,否则扫描不到src下的xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>relife</artifactId> <groupId>org.example</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>relife-object</artifactId> <dependencies> <!-- https://mvnrepository.com/artifact/org.openjdk.jol/jol-core --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>compile</scope> </dependency> <dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId> <version>2.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.20</version> </dependency> <!-- https://mvnrepository.com/artifact/org.mybatis/mybatis --> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>3.4.6</version> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/java</directory> <includes> <include>**/*.xml</include> </includes> </resource> </resources> </build> </project>
1.2.实体类User
public class User { private String name; private int age; private String id; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getId() { return id; } public void setId(String id) { this.id = id; } }
1.3.mybatis-config.xml和userMapper.xml
mybatis-config.xml(resource根目录)
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd"> <configuration> <environments default="development"> <environment id="development"> <transactionManager type="JDBC"/> <dataSource type="POOLED"> <property name="driver" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false"/> <property name="username" value="root"/> <property name="password" value="root"/> </dataSource> </environment> </environments> <mappers> <mapper resource="com/relife/mybatis/userMapper.xml"/> </mappers> </configuration>
userMapper.xml
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.relife.object.User"> <select id="selectUser" resultType="com.relife.object.User"> select id,name,age from user </select> </mapper>
2.流式处理
2.1流式逐条处理handle
public class UserResultHandler implements ResultHandler<User> { @Override public void handleResult(ResultContext<? extends User> resultContext) { // 这里获取流式查询每次返回的单条结果 User user = resultContext.getResultObject(); handle(user); } // 串行逐条执行handle private void handle(User user) { System.out.println(user.getId()); } }
适用于导出excel等,,上述写法缺点也很明显,单线程,串行,所有效率慢,但是类似导出excel,符合要求,也不太方便使用多线程。
2.2.流式批量多线程处理handle
为了解决效率问题,有时候会对结果有比如发送请求,更行其他内容等需求时
public class UserResultHandler<T> implements ResultHandler<T> { public final Logger logger = Logger.getLogger(this.getClass()); /** * 线程池线程数 */ private int threadPollNum = 100; public UserResultHandler() { } public UserResultHandler(int threadPollNum) { this.threadPollNum = threadPollNum; } // 线程池 public ExecutorService executorService = Executors.newFixedThreadPool(threadPollNum); // 线程执行结果 public List<Future> futureList = new ArrayList<>(); @Override public void handleResult(ResultContext<? extends T> resultContext) { // 这里获取流式查询每次返回的单条结果 T user = resultContext.getResultObject(); while (futureList.size() > 200) { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } futureList = futureList.stream().filter(future -> !future.isDone()).collect(Collectors.toList()); logger.info("条数:" + resultContext.getResultCount() + "->未完成结果" + futureList.size()); } UserThread ut = new UserThread(user); Future<?> future = executorService.submit(ut); futureList.add(future); } /** * 保证所有线程执行完成,并关闭线程池 */ public void end() { while (futureList.size() != 0) { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } futureList = futureList.stream().filter(future -> !future.isDone()).collect(Collectors.toList()); } executorService.shutdown(); } public static class UserThread<T> implements Runnable { private T obejct; public UserThread(T obejct) { this.obejct = obejct; } @Override public void run() { System.out.println(((User)obejct).getId()); } public T getObejct() { return obejct; } public void setObejct(T obejct) { this.obejct = obejct; } } }
上述写法,可用于需要线程返回值的,或者明确需要线程执行完成,且可以保证不占用过多内存。
其中Thread.sleep()方法,可以自定义时间。
3.测试
3.1.测试案例
public static void main(String[] args) throws IOException { SqlSessionFactoryBuilder builder = new SqlSessionFactoryBuilder(); SqlSessionFactory sqlSessionFactory = builder.build(Resources.getResourceAsStream("mybatis-config.xml")); System.out.println("sqlSessionFactory:" + sqlSessionFactory); SqlSession sqlSession = sqlSessionFactory.openSession(); // 正常查询 List<User> userList5 = sqlSession.selectList("selectUser"); // 流式查询 UserResultHandler userResultHandler = new UserResultHandler(100); sqlSession.select("selectUser", userResultHandler); userResultHandler.end(); sqlSession.close(); }
3.1.正常查询debug调用
调用图
3.2.流式查询debug调用
调用图
4.问题与部分源码解析
4.1加入resulthandle为什么会流式查询?
在DefaultResultSetHandler类中handleResultSet(ResultSetWrapper rsw, ResultMap resultMap, List multipleResults, ResultMapping parentMapping) throws SQLException 方法中出现了调用区别一个303行,一个206行
private void handleResultSet(ResultSetWrapper rsw, ResultMap resultMap, List<Object> multipleResults, ResultMapping parentMapping) throws SQLException { try { if (parentMapping != null) { handleRowValues(rsw, resultMap, null, RowBounds.DEFAULT, parentMapping); } else { // 有的话就使用传入的ResultHandle,没有就用默认的DefaultResultHandler if (resultHandler == null) { DefaultResultHandler defaultResultHandler = new DefaultResultHandler(objectFactory); handleRowValues(rsw, resultMap, defaultResultHandler, rowBounds, null); // 指定handle时,把结果集放到multipleResults中 multipleResults.add(defaultResultHandler.getResultList()); } else { // 非指定handle时,multipleResults中不会有结果 handleRowValues(rsw, resultMap, resultHandler, rowBounds, null); } } } finally { // issue #228 (close resultsets) closeResultSet(rsw.getResultSet()); } }
这个if判断直接导致了最后实际调用callResultHandler时的区别。
private void callResultHandler(ResultHandler<?> resultHandler, DefaultResultContext<Object> resultContext, Object rowValue) { resultContext.nextResultObject(rowValue); // 加入handle的会调用自己的代码实现,没有就用默认的DefaultResultHandler里的方法 ((ResultHandler<Object>) resultHandler).handleResult(resultContext); }
4.2.流式查询为什么没有返回值?
这个可以从两个方向探讨
- 一个是比较直接的,即什么时候返回值变成了void,见DefaultSqlSession
- 一个是为什么会这样,因为自定义实现是,没有把结果加入结果集,见DefaultResultSetHandler
1.DefaultSqlSession
// 正常查询 @Override public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) { try { MappedStatement ms = configuration.getMappedStatement(statement); return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER); } catch (Exception e) { throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } } // 流式查询 @Override public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) { try { MappedStatement ms = configuration.getMappedStatement(statement); // 此方法有返回结果,但是为空,不需要返回(见下边方法) executor.query(ms, wrapCollection(parameter), rowBounds, handler); } catch (Exception e) { throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } }
2.DefaultResultSetHandler
@Override public List<Object> handleResultSets(Statement stmt) throws SQLException { ErrorContext.instance().activity("handling results").object(mappedStatement.getId()); // 初始化返回值 final List<Object> multipleResults = new ArrayList<Object>(); int resultSetCount = 0; ResultSetWrapper rsw = getFirstResultSet(stmt); List<ResultMap> resultMaps = mappedStatement.getResultMaps(); int resultMapCount = resultMaps.size(); validateResultMapsCount(rsw, resultMapCount); while (rsw != null && resultMapCount > resultSetCount) { ResultMap resultMap = resultMaps.get(resultSetCount); // 返回值会在调用方法时,被赋值,从上述4.1可看出,只有不传值时才会把结果放入multipleResults handleResultSet(rsw, resultMap, multipleResults, null); rsw = getNextResultSet(stmt); cleanUpAfterHandlingResultSet(); resultSetCount++; } String[] resultSets = mappedStatement.getResultSets(); if (resultSets != null) { while (rsw != null && resultSetCount < resultSets.length) { ResultMapping parentMapping = nextResultMaps.get(resultSets[resultSetCount]); if (parentMapping != null) { String nestedResultMapId = parentMapping.getNestedResultMapId(); ResultMap resultMap = configuration.getResultMap(nestedResultMapId); handleResultSet(rsw, resultMap, null, parentMapping); } rsw = getNextResultSet(stmt); cleanUpAfterHandlingResultSet(); resultSetCount++; } } return collapseSingleResultList(multipleResults); } // 只有一条结果,就直接取出,多个或空,不处理 private List<Object> collapseSingleResultList(List<Object> multipleResults) { return multipleResults.size() == 1 ? (List<Object>) multipleResults.get(0) : multipleResults; }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
Spring Cloud + Nacos + Seata整合过程(分布式事务解决方案)
Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务,这篇文章主要介绍了Spring Cloud + Nacos + Seata整合过程(分布式事务解决方案),需要的朋友可以参考下2022-03-03详解java中this.getClass()和super.getClass()的实例
这篇文章主要介绍了详解java中this.getClass()和super.getClass()的实例的相关资料,需要的朋友可以参考下2017-08-08
最新评论