Spring 多线程事务控制的实践

 更新时间:2023年09月16日 15:26:29   作者:qq_35987023  
本文主要介绍了Spring 多线程事务控制的实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

在Java多线程事务控制中,有一些注意事项和实例可以帮助你更好地理解和应用。

注意事项

  • 确保线程安全:在多线程环境下,确保代码是线程安全的。这可以通过使用synchronized关键字、Lock接口或Atomic类来实现。
  • 事务的隔离级别:根据需要选择适当的事务隔离级别,以避免并发问题,例如脏读、不可重复读和幻读。
  • 事务的传播行为:了解事务的传播行为,例如事务的提交和回滚如何影响其他事务。
  • 异常处理:在多线程环境下处理异常时,需要特别小心。确保捕获和处理所有异常,并正确地处理事务回滚。

spring事务隔离级别

Java Spring框架提供了一种方便的方式来管理数据库事务,它支持多种事务隔离级别。事务隔离级别决定了事务在并发执行时的隔离程度,包括对其他事务的可见性和可能出现的并发问题。

以下是Spring框架支持的事务隔离级别及其详细说明:

  • ISOLATION_DEFAULT(默认):这是系统的默认隔离级别。根据具体数据库来定义,大多数数据库默认级别是可重复读。
  • ISOLATION_READ_UNCOMMITTED(读未提交):在这个级别,一个事务可以看到其他未提交事务的变动。这种级别可以导致脏读、不可重复读和幻读的问题。
  • ISOLATION_READ_COMMITTED(读提交):在这个级别,一个事务只能看到其他事务已经提交的变动。这种级别可以避免脏读问题,但可能会出现不可重复读和幻读的问题。
  • ISOLATION_REPEATABLE_READ(可重复读):在这个级别,同一事务中多次读取的数据是一致的。这种级别可以避免脏读和不可重复读的问题,但可能会出现幻读的问题。
  • ISOLATION_SERIALIZABLE(可串行化):这是最高的事务隔离级别。在这个级别,事务串行化顺序执行,可以避免脏读、不可重复读和幻读的问题。但是这种隔离级别效率低下,因为事务通常需要等待前一个事务完成,才能继续执行。

Spring事务的默认隔离级别与数据库一致

在Spring中,可以通过以下方式设置事务的隔离级别:

@Transactional(isolation = Isolation.READ_COMMITTED)  
public void someMethod() {  
    // some code  
}

在上述代码中,@Transactional注解指定了事务的隔离级别为READ_COMMITTED。注意,虽然可以使用其他隔离级别,但并不是所有数据库都支持所有的隔离级别。使用哪种隔离级别取决于你的具体需求和数据库的能力。

spring事务传播行为

Java Spring框架中的事务传播行为是指在一个事务方法被另一个事务方法调用时,如何处理事务的传播。事务传播行为定义了在一个方法中调用另一个方法时,事务应该如何启动、提交或回滚。

以下是Spring框架支持的事务传播行为及其详细说明:

  • PROPAGATION_REQUIRED(必需):如果当前存在一个事务,那么就加入这个事务,如果当前没有事务,就新建一个事务。这是最常见的选择。
  • PROPAGATION_SUPPORTS(支持):支持当前事务,如果当前没有事务,就以非事务方式执行。
  • PROPAGATION_MANDATORY(强制):使用当前的事务,如果当前没有事务,就抛出异常。
  • PROPAGATION_REQUIRES_NEW(新建):新建事务,如果当前存在事务,把当前事务挂起。
  • PROPAGATION_NOT_SUPPORTED(不支持):以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
  • PROPAGATION_NEVER(从不):以非事务方式执行,如果当前存在事务,抛出异常。
  • PROPAGATION_NESTED(嵌套):如果当前存在一个事务,则在嵌套事务内执行。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。

在Spring中,可以通过以下方式设置事务的传播行为:

@Transactional(propagation = Propagation.REQUIRES_NEW)  
public void someMethod() {  
    // some code  
}

在上述代码中,@Transactional注解指定了事务的传播行为为REQUIRES_NEW。根据具体情况选择不同的事务传播行为以确保应用程序的数据一致性和可靠性。

spring事务默认传播行为

Spring的事务传播行为默认是PROPAGATION_REQUIRED,也就是如果当前存在一个事务,就加入该事务;如果当前没有事务,就新建一个事务。

mysql事务默认隔离级别

MySQL的事务隔离级别默认是可重复读(REPEATABLE READ),这是大多数数据库系统的默认设置。这个隔离级别可以避免脏读和不可重复读的问题,但可能会出现幻读的问题。

多线程事务控制

Spring实现事务通过ThreadLocal把事务和当前线程进行了绑定。

ThreadLocal作为本地线程变量载体,保存了当前线程的变量,并确保所有变量是线程安全的。

这些封闭隔离的变量中就包含了数据库连接,Session管理的对象以及当前事务运行的其他必要信息,而开启的新线程是获取不到这些变量和对象的。

也就是说:主线程事务与子线程事务是相互独立的

怎么验证?

验证事务 以及 多线程事务控制编码

package cn.cjf.tt;
import cn.cjf.tt.dao.UserMapper;
import cn.cjf.tt.po.User;
import cn.cjf.tt.service.UserService;
import lombok.SneakyThrows;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
// 使用Spring整合Junit专用的类加载器
@RunWith(SpringJUnit4ClassRunner.class)
// 加载配置文件或者配置类
@ContextConfiguration(locations = {"classpath:spring.xml"})//加载配置文件
public class UserTest {
    @Autowired
    private UserService userService;
    @Autowired
    private DataSourceTransactionManager transactionManager;
    @Autowired
    private UserMapper userMapper;
    /**
     * 验证数据库连接是否正常
     */
    @Test
    public void selectAllUser() {
        List<User> users = userService.selectAllUser();
        for (User i : users) {
            System.out.println(i);
        }
    }
    /**
     * 验证:能否正常插入数据
     */
    public void test() {
        final User user = new User() {{
            this.setUsername("test_" + UUID.randomUUID().toString());
            this.setPassword("123456");
            this.setCreateTime(new Date());
        }};
        userService.addUser(user);
    }
    /**
     * 验证:线程池子线程中能否正常插入数据
     */
    @Test
    public void testForBatch() throws InterruptedException {
        final ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            service.submit(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    test1();
                }
            });
        }
        Thread.sleep(5000);
    }
    /**
     * 验证:正常插入数据,抛出异常后,注解事务是否回滚
     */
    @Transactional(rollbackFor = Exception.class)
    @Test
    public void test1() throws Exception {
        final User user = new User() {{
            this.setUsername("test_" + UUID.randomUUID().toString());
            this.setPassword("123456");
            this.setCreateTime(new Date());
        }};
        userService.addUser(user);
        if (true) {
            throw new Exception();
        }
    }
    /**
     * 验证:正常插入数据,抛出异常后,手动事务是否回滚
     */
    @Test
    public void test11() {
        DefaultTransactionDefinition dd = new DefaultTransactionDefinition();
        final TransactionStatus transaction = transactionManager.getTransaction(dd);
        try {
            final User user = new User() {{
                this.setUsername("test_" + UUID.randomUUID().toString());
                this.setPassword("123456");
                this.setCreateTime(new Date());
            }};
            userService.addUser(user);
            System.out.println(user);
            if (true) {
                throw new Exception();
            }
            // User(id=13, username=test1694675733277, password=123456, salt=null, token=null, isEnabled=null, createTime=Thu Sep 14 15:15:33 CST 2023, modifiedTime=null)
            transactionManager.commit(transaction);
            System.out.println("---------------------" + Thread.currentThread().getName() + "事务提交");
        } catch (Exception e) {
            e.printStackTrace();
            transactionManager.rollback(transaction);
            System.out.println("---------------------" + Thread.currentThread().getName() + "事务回滚");
        }
    }
    /**
     * 验证:主线程事务,是否能影响到子线程事务
     */
    @Transactional(rollbackFor = Exception.class)
    @Test
    public void test2() throws Exception {
        final User user = new User() {{
            this.setUsername("test_" + UUID.randomUUID().toString());
            this.setPassword("123456");
            this.setCreateTime(new Date());
        }};
        userService.addUser(user);
        System.out.println("---------------------" + Thread.currentThread().getName() + ":" + user);
        final ExecutorService service = Executors.newFixedThreadPool(5);
        List<Integer> idList = new ArrayList<>();
        idList.add(user.getId());
        for (int i = 0; i < 5; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    final User user = new User() {{
                        this.setUsername("test_" + UUID.randomUUID().toString());
                        this.setPassword("123456");
                        this.setCreateTime(new Date());
                    }};
                    userService.addUserForTransaction(user);
                    System.out.println("---------------------" + Thread.currentThread().getName() + ":" + user);
                    final Integer id = user.getId();
                    idList.add(id);
                }
            });
        }
        Thread.sleep(5000);
        try {
            throw new Exception();
        } finally {
            for (int i = 0; i < idList.size(); i++) {
                final Integer id = idList.get(i);
                final User po = userMapper.selectByPrimaryKey(id);
                if (po == null) {
                    System.out.println("---------------------id:" + id + "事务回滚");
                } else {
                    System.out.println("---------------------id:" + id + "事务提交");
                }
                // 主线程事务未结束
                // 实际主线程事务回滚了,但子线程事务未回滚
            }
        }
    }
    /**
     * 验证:主线程事务,未能影响到子线程事务,是因为子线程的事务传播行为影响
     */
    @Transactional(rollbackFor = Exception.class)
    @Test
    public void test21() throws Exception {
        final User user = new User() {{
            this.setUsername("test_" + UUID.randomUUID().toString());
            this.setPassword("123456");
            this.setCreateTime(new Date());
        }};
        userService.addUser(user);
        System.out.println("---------------------" + Thread.currentThread().getName() + ":" + user);
        final ExecutorService service = Executors.newFixedThreadPool(5);
        List<Integer> idList = new ArrayList<>();
        idList.add(user.getId());
        for (int i = 0; i < 5; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    final User user = new User() {{
                        this.setUsername("test_" + UUID.randomUUID().toString());
                        this.setPassword("123456");
                        this.setCreateTime(new Date());
                    }};
                    userService.addUserForNestedTransaction(user);
                    System.out.println("---------------------" + Thread.currentThread().getName() + ":" + user);
                    final Integer id = user.getId();
                    idList.add(id);
                }
            });
        }
        Thread.sleep(5000);
        try {
            throw new Exception();
        } finally {
            for (int i = 0; i < idList.size(); i++) {
                final Integer id = idList.get(i);
                final User po = userMapper.selectByPrimaryKey(id);
                if (po == null) {
                    System.out.println("---------------------id:" + id + "事务回滚");
                } else {
                    System.out.println("---------------------id:" + id + "事务提交");
                }
                // 主线程事务未结束
                // 实际主线程事务回滚了,但子线程事务未回滚
            }
        }
    }
    /**
     * 验证:主线程事务,未能影响到子线程事务
     * 主线程手动控制事务,与注解自动控制事务,结果是否依然是,主线程事务不能影响到子线程事务
     */
    @Test
    public void test22() throws Exception {
        DefaultTransactionDefinition dd = new DefaultTransactionDefinition();
        final TransactionStatus transaction = transactionManager.getTransaction(dd);
        List<Integer> idList = new ArrayList<>();
        try {
            final User user = new User() {{
                this.setUsername("test_" + UUID.randomUUID().toString());
                this.setPassword("123456");
                this.setCreateTime(new Date());
            }};
            userService.addUser(user);
            System.out.println("---------------------" + Thread.currentThread().getName() + ":" + user);
            final ExecutorService service = Executors.newFixedThreadPool(5);
            idList.add(user.getId());
            for (int i = 0; i < 5; i++) {
                service.submit(new Runnable() {
                    @Override
                    public void run() {
                        final User user = new User() {{
                            this.setUsername("test_" + UUID.randomUUID().toString());
                            this.setPassword("123456");
                            this.setCreateTime(new Date());
                        }};
                        userService.addUserForNestedTransaction(user);
                        System.out.println("---------------------" + Thread.currentThread().getName() + ":" + user);
                        final Integer id = user.getId();
                        idList.add(id);
                    }
                });
            }
            Thread.sleep(5000);
            if (true) {
                throw new Exception();
            }
            // User(id=13, username=test1694675733277, password=123456, salt=null, token=null, isEnabled=null, createTime=Thu Sep 14 15:15:33 CST 2023, modifiedTime=null)
            System.out.println("---------------------" + Thread.currentThread().getName() + "事务提交");
        } catch (Exception e) {
            e.printStackTrace();
            transactionManager.rollback(transaction);
            System.out.println("---------------------" + Thread.currentThread().getName() + "事务回滚");
        }
        for (int i = 0; i < idList.size(); i++) {
            final Integer id = idList.get(i);
            final User po = userMapper.selectByPrimaryKey(id);
            if (po == null) {
                System.out.println("---------------------id:" + id + "事务回滚");
            } else {
                System.out.println("---------------------id:" + id + "事务提交");
            }
            // 主线程事务已结束
            // 主线程事务回滚了,但子线程事务未回滚
        }
    }
    /**
     * 验证:主线程事务,未能影响到子线程事务
     * 主线程手动控制事务,子线程也手动控制事务,结果是否依然是,主线程事务不能影响到子线程事务
     */
    @Test
    public void test23() throws InterruptedException {
        DefaultTransactionDefinition dd = new DefaultTransactionDefinition();
        final TransactionStatus transaction = transactionManager.getTransaction(dd);
        List<Integer> idList = new ArrayList<>();
        try {
            final User user = new User() {{
                this.setUsername("test_" + UUID.randomUUID().toString());
                this.setPassword("123456");
                this.setCreateTime(new Date());
            }};
            userService.addUser(user);
            idList.add(user.getId());
            final ExecutorService service = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 5; i++) {
                service.submit(new Runnable() {
                    @Override
                    public void run() {
                        DefaultTransactionDefinition dd = new DefaultTransactionDefinition();
                        final TransactionStatus transaction = transactionManager.getTransaction(dd);
                        try {
                            final User user = new User() {{
                                this.setUsername("test_" + UUID.randomUUID().toString());
                                this.setPassword("123456");
                                this.setCreateTime(new Date());
                            }};
                            userService.addUser(user);
                            System.out.println(user);
                            final Integer id = user.getId();
                            idList.add(id);
                            // User(id=13, username=test1694675733277, password=123456, salt=null, token=null, isEnabled=null, createTime=Thu Sep 14 15:15:33 CST 2023, modifiedTime=null)
                            transactionManager.commit(transaction);
                            System.out.println("---------------------" + Thread.currentThread().getName() + "事务提交");
                        } catch (Exception e) {
                            e.printStackTrace();
                            transactionManager.rollback(transaction);
                            System.out.println("---------------------" + Thread.currentThread().getName() + "事务回滚");
                        }
                    }
                });
            }
            Thread.sleep(5000);
            if (true) {
                throw new Exception();
            }
            // User(id=13, username=test1694675733277, password=123456, salt=null, token=null, isEnabled=null, createTime=Thu Sep 14 15:15:33 CST 2023, modifiedTime=null)
            System.out.println("---------------------" + Thread.currentThread().getName() + "事务提交");
        } catch (Exception e) {
            e.printStackTrace();
            transactionManager.rollback(transaction);
            System.out.println("---------------------" + Thread.currentThread().getName() + "事务回滚");
        }
        for (int i = 0; i < idList.size(); i++) {
            final Integer id = idList.get(i);
            final User po = userMapper.selectByPrimaryKey(id);
            if (po == null) {
                System.out.println("---------------------id:" + id + "事务回滚");
            } else {
                System.out.println("---------------------id:" + id + "事务提交");
            }
            // 主线程事务已结束
            // 主线程事务回滚了,但子线程事务未回滚
        }
    }
    /**
     * 验证结果:主线程事务不能影响到子线程事务
     * <p>
     * 主线程,子线程控制各自事务,等待一起提交
     */
    @Test
    public void test3() throws InterruptedException {
        DefaultTransactionDefinition dd = new DefaultTransactionDefinition();
        final TransactionStatus transaction = transactionManager.getTransaction(dd);
        List<Integer> idList = new ArrayList<>();
        int time = 5;
        CountDownLatch cdl = new CountDownLatch(time);
        AtomicBoolean flag = new AtomicBoolean(true);
        try {
            final User user = new User() {{
                this.setUsername("test_" + UUID.randomUUID().toString());
                this.setPassword("123456");
                this.setCreateTime(new Date());
            }};
            userService.addUser(user);
            idList.add(user.getId());
            final ExecutorService service = Executors.newFixedThreadPool(time);
            for (int i = 0; i < time; i++) {
                service.submit(new Runnable() {
                    @SneakyThrows
                    @Override
                    public void run() {
                        DefaultTransactionDefinition dd = new DefaultTransactionDefinition();
                        final TransactionStatus transaction = transactionManager.getTransaction(dd);
                        try {
                            final User user = new User() {{
                                this.setUsername("test_" + UUID.randomUUID().toString());
                                this.setPassword("123456");
                                this.setCreateTime(new Date());
                            }};
                            userMapper.insertSelective(user);
                            idList.add(user.getId());
                            System.out.println("---------------" + Thread.currentThread().getName() + "--执行成功");
                        } catch (Exception e) {
                            e.printStackTrace();
                            System.out.println("---------------" + Thread.currentThread().getName() + "--执行失败,等待事务回滚");
                            flag.set(false);
                        } finally {
                            System.out.println("---------------" + Thread.currentThread().getName() + "--等待");
                            cdl.countDown();
                            cdl.await();
                            if (flag.get()) {
                                System.out.println("---------------" + Thread.currentThread().getName() + "--提交事务");
                                transactionManager.commit(transaction);
                            } else {
                                System.out.println("---------------" + Thread.currentThread().getName() + "--回滚事务");
                                transactionManager.rollback(transaction);
                            }
                            System.out.println("---------------" + Thread.currentThread().getName() + "--End");
                        }
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("---------------" + Thread.currentThread().getName() + "--执行失败,等待事务回滚");
            flag.set(false);
        } finally {
            System.out.println("---------------" + Thread.currentThread().getName() + "--等待");
            cdl.await();
            if (flag.get()) {
                System.out.println("---------------" + Thread.currentThread().getName() + "--提交事务");
                transactionManager.commit(transaction);
            } else {
                System.out.println("---------------" + Thread.currentThread().getName() + "--回滚事务");
                transactionManager.rollback(transaction);
            }
        }
        System.out.println("---------------" + Thread.currentThread().getName() + "--End");
        for (int i = 0; i < idList.size(); i++) {
            final Integer id = idList.get(i);
            final User po = userMapper.selectByPrimaryKey(id);
            if (po == null) {
                System.out.println("---------------------id:" + id + "事务回滚");
            } else {
                System.out.println("---------------------id:" + id + "事务提交");
            }
            // 主线程事务已结束
            // 主线程事务回滚了,子线程事务也回滚
        }
    }
    /**
     * 验证结果:主线程事务不能影响到子线程事务
     * <p>
     * 主线程,子线程控制各自事务,等待一起提交
     * 验证,主线程异常,子线程未异常,事务都回滚了
     */
    @Test
    public void test31() throws InterruptedException {
        DefaultTransactionDefinition dd = new DefaultTransactionDefinition();
        final TransactionStatus transaction = transactionManager.getTransaction(dd);
        List<Integer> idList = new ArrayList<>();
        int time = 5;
        CountDownLatch cdl = new CountDownLatch(time);
        AtomicBoolean flag = new AtomicBoolean(true);
        try {
            final User user = new User() {{
                this.setUsername("test_" + UUID.randomUUID().toString());
                this.setPassword("123456");
                this.setCreateTime(new Date());
            }};
            userService.addUser(user);
            idList.add(user.getId());
            final ExecutorService service = Executors.newFixedThreadPool(time);
            for (int i = 0; i < time; i++) {
//                int finalI = i;
                service.submit(new Runnable() {
                    @SneakyThrows
                    @Override
                    public void run() {
                        DefaultTransactionDefinition dd = new DefaultTransactionDefinition();
                        final TransactionStatus transaction = transactionManager.getTransaction(dd);
                        try {
                            final User user = new User() {{
                                this.setUsername("test_" + UUID.randomUUID().toString());
                                this.setPassword("123456");
                                this.setCreateTime(new Date());
                            }};
                            userMapper.insertSelective(user);
                            idList.add(user.getId());
                            // 最后一个提交的任务,抛出异常;注释掉会全部完成,否则全部回滚
//                            if (finalI == time - 1) {
//                                throw new RuntimeException();
//                            }
                            // User(id=13, username=test1694675733277, password=123456, salt=null, token=null, isEnabled=null, createTime=Thu Sep 14 15:15:33 CST 2023, modifiedTime=null)
                            System.out.println("---------------" + Thread.currentThread().getName() + "--执行成功");
                        } catch (Exception e) {
                            e.printStackTrace();
                            System.out.println("---------------" + Thread.currentThread().getName() + "--执行失败,等待事务回滚");
                            flag.set(false);
                        } finally {
                            System.out.println("---------------" + Thread.currentThread().getName() + "--等待");
                            cdl.countDown();
                            cdl.await();
                            if (flag.get()) {
                                System.out.println("---------------" + Thread.currentThread().getName() + "--提交事务");
                                transactionManager.commit(transaction);
                            } else {
                                System.out.println("---------------" + Thread.currentThread().getName() + "--回滚事务");
                                transactionManager.rollback(transaction);
                            }
                            System.out.println("---------------" + Thread.currentThread().getName() + "--End");
                        }
                    }
                });
            }
            if (true) {
                throw new Exception();
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("---------------" + Thread.currentThread().getName() + "--执行失败,等待事务回滚");
            flag.set(false);
        } finally {
            System.out.println("---------------" + Thread.currentThread().getName() + "--等待");
            cdl.await();
            if (flag.get()) {
                System.out.println("---------------" + Thread.currentThread().getName() + "--提交事务");
                transactionManager.commit(transaction);
            } else {
                System.out.println("---------------" + Thread.currentThread().getName() + "--回滚事务");
                transactionManager.rollback(transaction);
            }
        }
        System.out.println("---------------" + Thread.currentThread().getName() + "--End");
        for (int i = 0; i < idList.size(); i++) {
            final Integer id = idList.get(i);
            final User po = userMapper.selectByPrimaryKey(id);
            if (po == null) {
                System.out.println("---------------------id:" + id + "事务回滚");
            } else {
                System.out.println("---------------------id:" + id + "事务提交");
            }
            // 主线程事务已结束
            // 主线程事务回滚了,子线程事务也回滚
        }
    }
    /**
     * 验证结果:主线程事务不能影响到子线程事务
     * <p>
     * 主线程,子线程控制各自事务,等待一起提交
     * 验证,主线程未异常,子线程异常,事务都回滚了
     */
    @Test
    public void test32() throws InterruptedException {
        DefaultTransactionDefinition dd = new DefaultTransactionDefinition();
        final TransactionStatus transaction = transactionManager.getTransaction(dd);
        List<Integer> idList = new ArrayList<>();
        int time = 5;
        CountDownLatch cdl = new CountDownLatch(time);
        AtomicBoolean flag = new AtomicBoolean(true);
        try {
            final User user = new User() {{
                this.setUsername("test_" + UUID.randomUUID().toString());
                this.setPassword("123456");
                this.setCreateTime(new Date());
            }};
            userService.addUser(user);
            idList.add(user.getId());
            final ExecutorService service = Executors.newFixedThreadPool(time);
            for (int i = 0; i < time; i++) {
                int finalI = i;
                service.submit(new Runnable() {
                    @SneakyThrows
                    @Override
                    public void run() {
                        DefaultTransactionDefinition dd = new DefaultTransactionDefinition();
                        final TransactionStatus transaction = transactionManager.getTransaction(dd);
                        try {
                            final User user = new User() {{
                                this.setUsername("test_" + UUID.randomUUID().toString());
                                this.setPassword("123456");
                                this.setCreateTime(new Date());
                            }};
                            userMapper.insertSelective(user);
                            idList.add(user.getId());
                            // 最后一个提交的任务,抛出异常;注释掉会全部完成,否则全部回滚
                            if (finalI == time - 1) {
                                throw new RuntimeException();
                            }
                            // User(id=13, username=test1694675733277, password=123456, salt=null, token=null, isEnabled=null, createTime=Thu Sep 14 15:15:33 CST 2023, modifiedTime=null)
                            System.out.println("---------------" + Thread.currentThread().getName() + "--执行成功");
                        } catch (Exception e) {
                            e.printStackTrace();
                            System.out.println("---------------" + Thread.currentThread().getName() + "--执行失败,等待事务回滚");
                            flag.set(false);
                        } finally {
                            System.out.println("---------------" + Thread.currentThread().getName() + "--等待");
                            cdl.countDown();
                            cdl.await();
                            if (flag.get()) {
                                System.out.println("---------------" + Thread.currentThread().getName() + "--提交事务");
                                transactionManager.commit(transaction);
                            } else {
                                System.out.println("---------------" + Thread.currentThread().getName() + "--回滚事务");
                                transactionManager.rollback(transaction);
                            }
                            System.out.println("---------------" + Thread.currentThread().getName() + "--End");
                        }
                    }
                });
            }
//            if (true) {
//                throw new Exception();
//            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("---------------" + Thread.currentThread().getName() + "--执行失败,等待事务回滚");
            flag.set(false);
        } finally {
            System.out.println("---------------" + Thread.currentThread().getName() + "--等待");
            cdl.await();
            if (flag.get()) {
                System.out.println("---------------" + Thread.currentThread().getName() + "--提交事务");
                transactionManager.commit(transaction);
            } else {
                System.out.println("---------------" + Thread.currentThread().getName() + "--回滚事务");
                transactionManager.rollback(transaction);
            }
        }
        System.out.println("---------------" + Thread.currentThread().getName() + "--End");
        for (int i = 0; i < idList.size(); i++) {
            final Integer id = idList.get(i);
            final User po = userMapper.selectByPrimaryKey(id);
            if (po == null) {
                System.out.println("---------------------id:" + id + "事务回滚");
            } else {
                System.out.println("---------------------id:" + id + "事务提交");
            }
            // 主线程事务已结束
            // 主线程事务回滚了,子线程事务也回滚
        }
    }
}

到此这篇关于Spring 多线程事务控制的实践的文章就介绍到这了,更多相关Spring 多线程事务控制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

最新评论