Java基于SpringBoot和tk.mybatis实现事务读写分离代码实例

 更新时间:2023年10月07日 10:00:31   作者:sunct  
这篇文章主要介绍了Java基于SpringBoot和tk.mybatis实现事务读写分离代码实例,读写分离,基本的原理是让主数据库处理事务性增、改、删操作,而从数据库处理SELECT查询操作,数据库复制被用来把事务性操作导致的变更同步到集群中的从数据库,需要的朋友可以参考下

什么是读写分离?

读写分离,基本的原理是让主数据库处理事务性增、改、删操作( INSERT、UPDATE、 DELETE) ,而从数据库处理SELECT查询操作。

数据库复制被用来把事务性操作导致的变更同步到集群中的从数据库。

为什么要读写分离呢?

  • 因为数据库的“写”(写10000条数据可能要3分钟)操作是比较耗时的。
  • 但是数据库的“读”(读10000条数据可能只要5秒钟)
  • 所以读写分离,解决的是,数据库的写入,影响了查询的效率。

源码

先定义数据源读写类型

/**
 * 数据源类型
 *
 * @author sunchangtan
 */
public enum DataSourceType {
    WRITE, READ
}

定义数据库连接的Holder

import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
import org.springframework.core.NamedThreadLocal;
/**
 * 数据库连接的Holder
 *
 * @author sunchangtan
 */
public class ConnectionHolder {
    /**
     * 当前数据库链接是读还是写
     */
    public final static ThreadLocal<DataSourceType> CURRENT_CONNECTION = new NamedThreadLocal<DataSourceType>("routingdatasource's key") {
        protected DataSourceType initialValue() {
            return DataSourceType.WRITE;
        }
    };
    /**
     * 当前线程所有数据库链接
     */
    public final static ThreadLocal<Map<DataSourceType, Connection>> CONNECTION_CONTEXT = new NamedThreadLocal<Map<DataSourceType, Connection>>("connection map") {
        protected Map<DataSourceType, Connection> initialValue() {
            return new HashMap<>();
        }
    };
    /**
     * 强制写数据源
     */
    public final static ThreadLocal<Boolean> FORCE_WRITE = new NamedThreadLocal<Boolean>("FORCE_WRITE");
}

定义数据源的Holder

import org.springframework.core.NamedThreadLocal;
/**
 * 数据源的Holder
 *
 * @author sunchangtan
 */
public class DataSourceHolder {
	/**
	 * 当前数据组
	 */
	public final static ThreadLocal<DataSourceType> CURRENT_DATASOURCE = new NamedThreadLocal<>("routingdatasource's key");
	static {
		setCurrentDataSource(DataSourceType.WRITE);
	}
	public static void setCurrentDataSource(DataSourceType dataSourceType){
		CURRENT_DATASOURCE.set(dataSourceType);
	}
	public static DataSourceType getCurrentDataSource(){
		return CURRENT_DATASOURCE.get();
	}
	public static void clearDataSource() {
		CURRENT_DATASOURCE.remove();
	}
}

定义数据源代理类,处理读写数据库的路由

import java.io.PrintWriter;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.logging.Logger;
import javax.sql.DataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.Constants;
import org.springframework.jdbc.datasource.ConnectionProxy;
/**
 * 数据库代理,具体数据源由DataSourceRouter提供
 * 
 * @author sunchangtan
 */
public class DataSourceProxy implements DataSource {
	private static final Constants constants = new Constants(Connection.class);
	private static final Log logger = LogFactory.getLog(DataSourceProxy.class);
	private Boolean defaultAutoCommit = Boolean.TRUE;
	private Integer defaultTransactionIsolation = 2;
	private DataSourceRouter dataSourceRouter;
	public DataSourceProxy(DataSourceRouter dataSourceRouter) {
		this.dataSourceRouter = dataSourceRouter;
	}
	public void setDefaultAutoCommit(boolean defaultAutoCommit) {
		this.defaultAutoCommit = defaultAutoCommit;
	}
	public void setDefaultTransactionIsolation(int defaultTransactionIsolation) {
		this.defaultTransactionIsolation = defaultTransactionIsolation;
	}
	public void setDefaultTransactionIsolationName(String constantName) {
		setDefaultTransactionIsolation(constants.asNumber(constantName).intValue());
	}
	protected Boolean defaultAutoCommit() {
		return this.defaultAutoCommit;
	}
	protected Integer defaultTransactionIsolation() {
		return this.defaultTransactionIsolation;
	}
	@Override
	public Connection getConnection() throws SQLException {
		return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(),
				new Class<?>[] { ConnectionProxy.class }, new LazyConnectionInvocationHandler());
	}
	@Override
	public Connection getConnection(String username, String password) throws SQLException {
		return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(),
				new Class<?>[] { ConnectionProxy.class }, new LazyConnectionInvocationHandler(username, password));
	}
	private class LazyConnectionInvocationHandler implements InvocationHandler {
		private String username;
		private String password;
		private Boolean readOnly = Boolean.FALSE;
		private Integer transactionIsolation;
		private Boolean autoCommit;
		private boolean closed = false;
		public LazyConnectionInvocationHandler() {
			this.autoCommit = defaultAutoCommit();
			this.transactionIsolation = defaultTransactionIsolation();
		}
		public LazyConnectionInvocationHandler(String username, String password) {
			this();
			this.username = username;
			this.password = password;
		}
		@Override
		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
			// Invocation on ConnectionProxy interface coming in...
			if (method.getName().equals("setTransactionIsolation") && args != null && (Integer) args[0] == Connection.TRANSACTION_SERIALIZABLE) {
				 args[0] = defaultTransactionIsolation();
				ConnectionHolder.FORCE_WRITE.set(Boolean.TRUE);
			}
			if (method.getName().equals("equals")) {
				// We must avoid fetching a target Connection for "equals".
				// Only consider equal when proxies are identical.
				return (proxy == args[0]);
			} else if (method.getName().equals("hashCode")) {
				// We must avoid fetching a target Connection for "hashCode",
				// and we must return the same hash code even when the target
				// Connection has been fetched: use hashCode of Connection
				// proxy.
				return System.identityHashCode(proxy);
			} else if (method.getName().equals("unwrap")) {
				if (((Class<?>) args[0]).isInstance(proxy)) {
					return proxy;
				}
			} else if (method.getName().equals("isWrapperFor")) {
				if (((Class<?>) args[0]).isInstance(proxy)) {
					return true;
				}
			} else if (method.getName().equals("getTargetConnection")) {
				// Handle getTargetConnection method: return underlying
				// connection.
				return getTargetConnection(method);
			}
			if (!hasTargetConnection()) {
				// No physical target Connection kept yet ->
				// resolve transaction demarcation methods without fetching
				// a physical JDBC Connection until absolutely necessary.
				if (method.getName().equals("toString")) {
					return "Lazy Connection proxy for target DataSource [" + dataSourceRouter.getTargetDataSource() + "]";
				} else if (method.getName().equals("getMetaData")) {
					return dataSourceRouter.getTargetDataSource().getConnection().getMetaData();
				} else if (method.getName().equals("isReadOnly")) {
					return this.readOnly;
				} else if (method.getName().equals("setReadOnly")) {
					this.readOnly = (Boolean) args[0];
					return null;
				} else if (method.getName().equals("getTransactionIsolation")) {
					if (this.transactionIsolation != null) {
						return this.transactionIsolation;
					}
					// Else fetch actual Connection and check there,
					// because we didn't have a default specified.
				} else if (method.getName().equals("setTransactionIsolation")) {
					this.transactionIsolation = (Integer) args[0];
					return null;
				} else if (method.getName().equals("getAutoCommit")) {
					if (this.autoCommit != null) {
						return this.autoCommit;
					}
					// Else fetch actual Connection and check there,
					// because we didn't have a default specified.
				} else if (method.getName().equals("setAutoCommit")) {
					this.autoCommit = (Boolean) args[0];
					return null;
				} else if (method.getName().equals("commit")) {
					// Ignore: no statements created yet.
					return null;
				} else if (method.getName().equals("rollback")) {
					// Ignore: no statements created yet.
					return null;
				} else if (method.getName().equals("getWarnings")) {
					return null;
				} else if (method.getName().equals("clearWarnings")) {
					return null;
				} else if (method.getName().equals("close")) {
					// Ignore: no target connection yet.
					this.closed = true;
					return null;
				} else if (method.getName().equals("isClosed")) {
					return this.closed;
				} else if (this.closed) {
					// Connection proxy closed, without ever having fetched a
					// physical JDBC Connection: throw corresponding
					// SQLException.
					throw new SQLException("Illegal operation: connection is closed");
				}
			} else {
				if (method.getName().equals("commit")) {
					Map<DataSourceType, Connection> connectionMap = ConnectionHolder.CONNECTION_CONTEXT.get();
					Connection writeCon = connectionMap.get(DataSourceType.WRITE);
					if (writeCon != null) {
						writeCon.commit();
					}
					return null;
				}
				if (method.getName().equals("rollback")) {
					Map<DataSourceType, Connection> connectionMap = ConnectionHolder.CONNECTION_CONTEXT.get();
					Connection writeCon = connectionMap.get(DataSourceType.WRITE);
					if (writeCon != null) {
						writeCon.rollback();
					}
					return null;
				}
				if (method.getName().equals("close")) {
		            ConnectionHolder.FORCE_WRITE.set(Boolean.FALSE);
					Map<DataSourceType, Connection> connectionMap = ConnectionHolder.CONNECTION_CONTEXT.get();
					Connection readCon = connectionMap.remove(DataSourceType.READ);
					if (readCon != null) {
					    readCon.close();
                    }
					Connection writeCon = connectionMap.remove(DataSourceType.WRITE);
					if (writeCon != null) {
						writeCon.close();
					}
					this.closed = true;
					return null;
				}
			}
			// Target Connection already fetched,
			// or target Connection necessary for current operation ->
			// invoke method on target connection.
			try {
			    return method.invoke(
	                     ConnectionHolder.CONNECTION_CONTEXT.get().get(ConnectionHolder.CURRENT_CONNECTION.get()), args);
			} catch (InvocationTargetException ex) {
				throw ex.getTargetException();
			}
		}
		/**
		 * Return whether the proxy currently holds a target Connection.
		 */
		private boolean hasTargetConnection() {
			return (ConnectionHolder.CONNECTION_CONTEXT.get() != null
					&& ConnectionHolder.CONNECTION_CONTEXT.get().get(ConnectionHolder.CURRENT_CONNECTION.get()) != null);
		}
		/**
		 * Return the target Connection, fetching it and initializing it if
		 * necessary.
		 */
		private Connection getTargetConnection(Method operation) throws SQLException {
			// No target Connection held -> fetch one.
			if (logger.isDebugEnabled()) {
				logger.debug("Connecting to database for operation '" + operation.getName() + "'");
			}
			// Fetch physical Connection from DataSource.
			Connection target = (this.username != null)
					? dataSourceRouter.getTargetDataSource().getConnection(this.username, this.password)
					: dataSourceRouter.getTargetDataSource().getConnection();
			// Apply kept transaction settings, if any.
			if (this.readOnly) {
				try {
					target.setReadOnly(this.readOnly);
				} catch (Exception ex) {
					// "read-only not supported" -> ignore, it's just a hint
					// anyway
					logger.debug("Could not set JDBC Connection read-only", ex);
				}
			}
			if (this.transactionIsolation != null && !this.transactionIsolation.equals(defaultTransactionIsolation())) {
				target.setTransactionIsolation(this.transactionIsolation);
			}
			if (DataSourceType.READ == ConnectionHolder.CURRENT_CONNECTION.get()) {
				try {
					target.setAutoCommit(true);
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
			if (this.autoCommit != null && this.autoCommit != target.getAutoCommit()) {
				if (DataSourceType.WRITE == ConnectionHolder.CURRENT_CONNECTION.get()) {
					target.setAutoCommit(this.autoCommit);
				}
			}
			return target;
		}
	}
	@Override
	public PrintWriter getLogWriter() throws SQLException {
		return dataSourceRouter.getTargetDataSource().getLogWriter();
	}
	@Override
	public void setLogWriter(PrintWriter out) throws SQLException {
		dataSourceRouter.getTargetDataSource().setLogWriter(out);
	}
	@Override
	public int getLoginTimeout() throws SQLException {
		return dataSourceRouter.getTargetDataSource().getLoginTimeout();
	}
	@Override
	public void setLoginTimeout(int seconds) throws SQLException {
		dataSourceRouter.getTargetDataSource().setLoginTimeout(seconds);
	}
	// ---------------------------------------------------------------------
	// Implementation of JDBC 4.0's Wrapper interface
	// ---------------------------------------------------------------------
	@Override
	@SuppressWarnings("unchecked")
	public <T> T unwrap(Class<T> iface) throws SQLException {
		if (iface.isInstance(this)) {
			return (T) this;
		}
		return dataSourceRouter.getTargetDataSource().unwrap(iface);
	}
	@Override
	public boolean isWrapperFor(Class<?> iface) throws SQLException {
		return (iface.isInstance(this) || dataSourceRouter.getTargetDataSource().isWrapperFor(iface));
	}
	// ---------------------------------------------------------------------
	// Implementation of JDBC 4.1's getParentLogger method
	// ---------------------------------------------------------------------
	@Override
	public Logger getParentLogger() {
		return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
	}
}

定义路由接口

/**
 * 数据库路由
 *
 */
public interface DataSourceRouter {
	/**
	 * 根据自己的需要,实现数据库路由,可以是读写分离的数据源,或者是分表后的数据源
	 * @return
	 */
	public DataSource getTargetDataSource();
}

实现读库路由的基类

import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.datasource.lookup.DataSourceLookup;
import org.springframework.jdbc.datasource.lookup.JndiDataSourceLookup;
import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List;
/**
 *  读写数据源路由的基类
 *
 * @author sunchangtan
 */
public abstract class AbstractMasterSlaverDataSourceRouter implements DataSourceRouter, InitializingBean {
	// 配置文件中配置的read-only datasoure
	// 可以为真实的datasource,也可以jndi的那种
	private List<Object> readDataSources;
	private Object writeDataSource;
	private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();
	private List<DataSource> resolvedReadDataSources;
	private DataSource resolvedWriteDataSource;
	// read-only data source的数量,做负载均衡的时候需要
	private int readDsSize;
	public List<DataSource> getResolvedReadDataSources() {
		return resolvedReadDataSources;
	}
	public int getReadDsSize() {
		return readDsSize;
	}
	public void setReadDataSoures(List readDataSoures) {
		this.readDataSources = readDataSoures;
	}
	public void setWriteDataSource(Object writeDataSource) {
		this.writeDataSource = writeDataSource;
	}
	public void setDataSourceLookup(DataSourceLookup dataSourceLookup) {
		this.dataSourceLookup = (dataSourceLookup != null ? dataSourceLookup : new JndiDataSourceLookup());
	}
	@Override
	public void afterPropertiesSet() {
		if (writeDataSource == null) {
			throw new IllegalArgumentException("Property 'writeDataSource' is required");
		}
		this.resolvedWriteDataSource = resolveSpecifiedDataSource(writeDataSource);
		if (this.readDataSources == null || this.readDataSources.size() ==0) {
			throw new IllegalArgumentException("Property 'resolvedReadDataSources' is required");
		}
		resolvedReadDataSources = new ArrayList<DataSource>(readDataSources.size());
		for (Object item : readDataSources) {
			resolvedReadDataSources.add(resolveSpecifiedDataSource(item));
		}
		readDsSize = readDataSources.size();
	}
	protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
		if (dataSource instanceof DataSource) {
			return (DataSource) dataSource;
		}
		else if (dataSource instanceof String) {
			return this.dataSourceLookup.getDataSource((String) dataSource);
		}
		else {
			throw new IllegalArgumentException(
					"Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);
		}
	}
	@Override
	public DataSource getTargetDataSource() {
		if (DataSourceType.WRITE.equals(ConnectionHolder.CURRENT_CONNECTION.get())) {
			return resolvedWriteDataSource;
		} else {
			return loadBalance();
		}
	}
	protected abstract DataSource loadBalance();
}

实现简单的轮询路由,其他路由方式,大家可以自行实现

import javax.sql.DataSource;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 
 * 简单实现读数据源负载均衡
 *
 */
public class RoundRobinMasterSlaverDataSourceRouter extends AbstractMasterSlaverDataSourceRouter {
	private AtomicInteger count = new AtomicInteger(0);
	@Override
	protected DataSource loadBalance() {
		int index = Math.abs(count.incrementAndGet()) % getReadDsSize();
		return getResolvedReadDataSources().get(index);
	}
}

处理“只读事务到读库,读写事务到写库”的事务

import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import javax.sql.DataSource;
/**
 *  事务管理
 *  处理“只读事务到读库,读写事务到写库”
 *  
 * @author sunchangtan 
 */
public class MasterSlaverDataSourceTransactionManager extends DataSourceTransactionManager {
    public MasterSlaverDataSourceTransactionManager(DataSource dataSource) {
        super(dataSource);
    }
    /**
     * 只读事务到读库,读写事务到写库
     * @param transaction
     * @param definition
     */
    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        //设置数据源
        boolean readOnly = definition.isReadOnly();
        if(readOnly) {
            DataSourceHolder.setCurrentDataSource(DataSourceType.READ);
        } else {
            DataSourceHolder.setCurrentDataSource(DataSourceType.WRITE);
        }
        super.doBegin(transaction, definition);
    }
    /**
     * 清理本地线程的数据源
     * @param transaction
     */
    @Override
    protected void doCleanupAfterCompletion(Object transaction) {
        super.doCleanupAfterCompletion(transaction);
        DataSourceHolder.clearDataSource();
    }
}

mybatis的读写分离的插件,需要配置到mybatis-config.xml

import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.executor.statement.RoutingStatementHandler;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.logging.jdbc.ConnectionLogger;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.DefaultReflectorFactory;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.factory.DefaultObjectFactory;
import org.apache.ibatis.reflection.wrapper.DefaultObjectWrapperFactory;
import org.springframework.jdbc.datasource.ConnectionProxy;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.Properties;
/**
 * 数据源读写分离路由
 */
@Slf4j
@Intercepts({@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})})
public class MasterSlaveInterceptor implements Interceptor {
    public Object intercept(Invocation invocation) throws Throwable {
        Connection conn = (Connection) invocation.getArgs()[0];
        conn = unwrapConnection(conn);
        if (conn instanceof ConnectionProxy) {
            //强制走写库
            if (ConnectionHolder.FORCE_WRITE.get() != null && ConnectionHolder.FORCE_WRITE.get()) {
                if (log.isDebugEnabled()) {
                    log.debug("本事务强制走写库");
                }
                routeConnection(DataSourceType.WRITE, conn);
                return invocation.proceed();
            }
            StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
            MetaObject metaObject = MetaObject.forObject(statementHandler, new DefaultObjectFactory(), new DefaultObjectWrapperFactory(), new DefaultReflectorFactory());
            MappedStatement mappedStatement;
            if (statementHandler instanceof RoutingStatementHandler) {
                mappedStatement = (MappedStatement) metaObject.getValue("delegate.mappedStatement");
            } else {
                mappedStatement = (MappedStatement) metaObject.getValue("mappedStatement");
            }
            if(mappedStatement.getId().endsWith(".insertSelective!selectKey")) {
                System.out.println("111");
            }
            DataSourceType key = DataSourceHolder.getCurrentDataSource();
            if (key == null) {
                key = DataSourceType.WRITE;
                String sel = statementHandler.getBoundSql().getSql().trim().substring(0, 3);
                if (sel.equalsIgnoreCase("sel")
                        && !mappedStatement.getId().endsWith(".insert!selectKey")
                        && !mappedStatement.getId().endsWith(".insertSelective!selectKey")) {
                    key = DataSourceType.READ;
                }
            }
            if(key == DataSourceType.WRITE) {
                if (log.isDebugEnabled()) {
                    log.debug("当前数据库为写库");
                }
            } else if(key == DataSourceType.READ) {
                if (log.isDebugEnabled()) {
                    log.debug("当前数据库为读库");
                }
            }
            routeConnection(key, conn);
        }
        return invocation.proceed();
    }
    private void routeConnection(DataSourceType key, Connection conn) {
        ConnectionHolder.CURRENT_CONNECTION.set(key);
        // 同一个线程下保证最多只有一个写数据链接和读数据链接
        if (!ConnectionHolder.CONNECTION_CONTEXT.get().containsKey(key)) {
            ConnectionProxy conToUse = (ConnectionProxy) conn;
            conn = conToUse.getTargetConnection();
            ConnectionHolder.CONNECTION_CONTEXT.get().put(key, conn);
        }
    }
    public Object plugin(Object target) {
        if (target instanceof StatementHandler) {
            return Plugin.wrap(target, this);
        } else {
            return target;
        }
    }
    public void setProperties(Properties properties) {
        // NOOP
    }
    /**
     * MyBatis wraps the JDBC Connection with a logging proxy but Spring registers the original connection so it should
     * be unwrapped before calling {@code DataSourceUtils.isConnectionTransactional(Connection, DataSource)}
     *
     * @param connection May be a {@code ConnectionLogger} proxy
     * @return the original JDBC {@code Connection}
     */
    private Connection unwrapConnection(Connection connection) {
        if (Proxy.isProxyClass(connection.getClass())) {
            InvocationHandler handler = Proxy.getInvocationHandler(connection);
            if (handler instanceof ConnectionLogger) {
                return ((ConnectionLogger) handler).getConnection();
            }
        }
        return connection;
    }
}

定义springboot的主从数据库配置

import com.sample.dao.dynamic.DataSourceProxy;
import com.sample.dao.dynamic.DataSourceRouter;
import com.sample.dao.dynamic.RoundRobinMasterSlaverDataSourceRouter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Collections;
/**
 * 主从数据源的配置
 * @author sunchangtan
 */
@EnableConfigurationProperties({DataSourceMasterConfig.class, DataSourceSlaveConfig.class})
@Configuration
public class MasterSlaverDataSourceConfig {
    @Resource
    private DataSourceSlaveConfig dataSourceSlaveConfig;
    @Resource
    private DataSourceMasterConfig dataSourceMasterConfig;
    @Bean
    @ConditionalOnMissingBean
    public DataSourceRouter readRoutingDataSource() {
        RoundRobinMasterSlaverDataSourceRouter proxy = new RoundRobinMasterSlaverDataSourceRouter();
        proxy.setReadDataSoures(Collections.singletonList(dataSourceSlaveConfig.createDataSource()));
        proxy.setWriteDataSource(dataSourceMasterConfig.createDataSource());
        return proxy;
    }
    @Bean
    public DataSource dataSource(DataSourceRouter dataSourceRouter) {
        return new DataSourceProxy(dataSourceRouter);
    }
}

springboot中配置数据库事务

import com.sample.dao.dynamic.MasterSlaverDataSourceTransactionManager;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.aop.Advisor;
import org.springframework.aop.aspectj.AspectJExpressionPointcut;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.interceptor.*;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
 * 事务管理的配置
 *
 * @author sunchangtan
 * @date 2018/8/30 11:22
 */
@Aspect
@Configuration
public class TransactionManagerConfigurer {
    private static final int TX_METHOD_TIMEOUT = 50000;
    private static final String AOP_POINTCUT_EXPRESSION = "execution(* com.sample.***.service..*.*(..))";
    @Resource
    private DataSource dataSource;
    @Bean
    public PlatformTransactionManager transactionManager() {
        return new MasterSlaverDataSourceTransactionManager(dataSource);
    }
    /**
     * 事务的实现Advice
     *
     * @return
     */
    @Bean
    public TransactionInterceptor txAdvice(PlatformTransactionManager transactionManager) {
        NameMatchTransactionAttributeSource source = new NameMatchTransactionAttributeSource();
        RuleBasedTransactionAttribute readOnlyTx = new RuleBasedTransactionAttribute();
        readOnlyTx.setReadOnly(true);
        //使用PROPAGATION_SUPPORTS:支持当前事务,如果当前没有事务,就以非事务方式执行。 如果查询中出现异常,那么当前事务也可以回滚
        readOnlyTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);
        RuleBasedTransactionAttribute requiredTx = new RuleBasedTransactionAttribute();
        requiredTx.setRollbackRules(Collections.singletonList(new RollbackRuleAttribute(Exception.class)));
        //使用PROPAGATION_REQUIRED:如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。 如果需要数据库增删改,必须要使用事务
        requiredTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        requiredTx.setTimeout(TX_METHOD_TIMEOUT);
        Map<String, TransactionAttribute> txMap = new HashMap<>();
        txMap.put("add*", requiredTx);
        txMap.put("save*", requiredTx);
        txMap.put("insert*", requiredTx);
        txMap.put("update*", requiredTx);
        txMap.put("delete*", requiredTx);
        txMap.put("remove*", requiredTx);
        txMap.put("upload*", requiredTx);
        txMap.put("generate*", requiredTx);
        txMap.put("import*", requiredTx);
        txMap.put("bind*", requiredTx);
        txMap.put("unbind*", requiredTx);
        txMap.put("cancel*", requiredTx);
        txMap.put("send*", requiredTx);
        txMap.put("create*", requiredTx);
        txMap.put("compute*", requiredTx);
        txMap.put("recompute*", requiredTx);
        txMap.put("execute*", requiredTx);
        //txMap.put("submit*", requiredTx);
        txMap.put("get*", readOnlyTx);
        txMap.put("query*", readOnlyTx);
        txMap.put("list*", readOnlyTx);
        txMap.put("has*", readOnlyTx);
        txMap.put("exist*", readOnlyTx);
        txMap.put("download*", readOnlyTx);
        txMap.put("export*", readOnlyTx);
        txMap.put("search*", readOnlyTx);
        txMap.put("check*", readOnlyTx);
        txMap.put("load*", readOnlyTx);
        txMap.put("find*", readOnlyTx);
        source.setNameMap(txMap);
        return new TransactionInterceptor(transactionManager, source);
    }
    /**
     * 切面的定义,pointcut及advice
     *
     * @param txAdvice
     * @return
     */
    @Bean
    public Advisor txAdviceAdvisor(@Qualifier("txAdvice") TransactionInterceptor txAdvice) {
        AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
        pointcut.setExpression(AOP_POINTCUT_EXPRESSION);
        return new DefaultPointcutAdvisor(pointcut, txAdvice);
    }
}

到此这篇关于Java基于SpringBoot和tk.mybatis实现事务读写分离代码实例的文章就介绍到这了,更多相关SpringBoot事务读写分离实例内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java使用线程池批量处理数据操作具体流程

    Java使用线程池批量处理数据操作具体流程

    这篇文章主要给大家介绍了关于Java使用线程池批量处理数据操作的相关资料,Java多线程编程中线程池是一个非常重要的概念,线程池可以提高线程的复用率和任务调度的效率,尤其是当需要查询大批量数据时,需要的朋友可以参考下
    2023-06-06
  • java利用Future实现多线程执行与结果聚合实例代码

    java利用Future实现多线程执行与结果聚合实例代码

    这篇文章主要给大家介绍了关于java利用Future实现多线程执行与结果聚合的相关资料,Future模式的核心,去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他业务逻辑,需要的朋友可以参考下
    2021-12-12
  • SpringCloud Feign客户端使用流程

    SpringCloud Feign客户端使用流程

    在springcloud中,openfeign是取代了feign作为负载均衡组件的,feign最早是netflix提供的,他是一个轻量级的支持RESTful的http服务调用框架,内置了ribbon,而ribbon可以提供负载均衡机制,因此feign可以作为一个负载均衡的远程服务调用框架使用
    2023-01-01
  • 在ChatGPT的API中支持多轮对话的实现方法

    在ChatGPT的API中支持多轮对话的实现方法

    ChatGPT是由OpenAI研发的一种预训练语言模型,只能在OpenAI平台上进行训练,目前并不对外开放训练接口,这篇文章主要介绍了在ChatGPT的API中支持多轮对话的实现方法,需要的朋友可以参考下
    2023-02-02
  • Spring boot监控Actuator-Admin实现过程详解

    Spring boot监控Actuator-Admin实现过程详解

    这篇文章主要介绍了Spring boot监控Actuator-Admin实现过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-09-09
  • Spring Boot Admin 监控指标接入Grafana可视化的实例详解

    Spring Boot Admin 监控指标接入Grafana可视化的实例详解

    Spring Boot Admin2 自带有部分监控图表,如图,有线程、内存Heap和内存Non Heap,这篇文章主要介绍了Spring Boot Admin 监控指标接入Grafana可视化,需要的朋友可以参考下
    2022-11-11
  • 一篇文章带你了解常用的Maven命令

    一篇文章带你了解常用的Maven命令

    这篇文章主要为大家介绍了常用的Maven命令 ,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-01-01
  • springboot整合Mybatis、JPA、Redis的示例代码

    springboot整合Mybatis、JPA、Redis的示例代码

    这篇文章主要介绍了springboot整合Mybatis、JPA、Redis的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • spring mvc使用@InitBinder标签对表单数据绑定的方法

    spring mvc使用@InitBinder标签对表单数据绑定的方法

    这篇文章主要介绍了spring mvc使用@InitBinder标签对表单数据绑定的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-03-03
  • 浅谈MyBatis中@MapKey的妙用

    浅谈MyBatis中@MapKey的妙用

    这篇文章主要介绍了MyBatis中@MapKey的妙用,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01

最新评论