Spring Boot集成Seata实现基于AT模式的分布式事务的解决方案

 更新时间:2024年08月12日 10:56:06   作者:HBLOGA  
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务,这篇文章主要介绍了Spring Boot集成Seata实现基于AT模式的分布式事务,需要的朋友可以参考下

1.什么是Seata?

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

AT 模式

前提

  • 基于支持本地 ACID 事务的关系型数据库。
  • Java 应用,通过 JDBC 访问数据库。

整体机制

两阶段提交协议的演变:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
  • 二阶段:
    • 提交异步化,非常快速地完成。
    • 回滚通过一阶段的回滚日志进行反向补偿。

 写隔离

  • 一阶段本地事务提交前,需要确保先拿到 全局锁 。
  • 拿不到 全局锁 ,不能提交本地事务。
  • 拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。

以一个示例来说明: 两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。 tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。

tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。

如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。 此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。 因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。

读隔离

在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。 如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。

Read Isolation: SELECT FOR UPDATE

SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。

出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。

具体例子相见:What Is Seata? | Apache Seata

2.环境搭建

安装mysql

参见代码仓库里面的mysql模块里面的docker文件夹

 install seta-server

version: "3.1"
services:
  seata-server:
    image: seataio/seata-server:latest
    hostname: seata-server
    ports:
      - "7091:7091"
      - "8091:8091"
    environment:
      - SEATA_PORT=8091
      - STORE_MODE=file

http://localhost:7091/#/Overview

default username and password is admin/admin

3.代码工程

实验目标

订单服务调用库存服务和账户余额服务进行相应的扣减,并且最终生成订单

seata-order

订单服务

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>seata</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>seata-order</artifactId>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.48</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.2</version>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-http</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.8</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
</project>

controller

package com.et.seata.order.controller;
import com.et.seata.order.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@RestController
public class HelloWorldController {
    @Autowired
    private OrderService orderService;
    @PostMapping("/create")
    public Map<String, Object> createOrder(@RequestParam("userId") Long userId,
                                              @RequestParam("productId") Long productId,
                                              @RequestParam("price") Integer price) throws IOException {
        Map<String, Object> map = new HashMap<>();
        map.put("msg", "HelloWorld");
        map.put("reuslt", orderService.createOrder(userId,productId,price));
        return map;
    }
}

service

package com.et.seata.order.service;
import com.alibaba.fastjson.JSONObject;
import com.et.seata.order.dao.OrderDao;
import com.et.seata.order.dto.OrderDO;
import io.seata.core.context.RootContext;
import io.seata.integration.http.DefaultHttpExecutor;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
 * @author liuhaihua
 * @version 1.0
 * @ClassName OrderServiceImpl
 * @Description todo
 * @date 2024/08/08/ 13:53
 */
@Slf4j
@Service
public class OrderServiceImpl implements OrderService{
   @Autowired
   OrderDao orderDao;
    @Override
    @GlobalTransactional // <1>
    public Integer createOrder(Long userId, Long productId, Integer price) throws IOException {
        Integer amount = 1; // 购买数量,暂时设置为 1。
        log.info("[createOrder] 当前 XID: {}", RootContext.getXID());
        // <2> 扣减库存
        this.reduceStock(productId, amount);
        // <3> 扣减余额
        this.reduceBalance(userId, price);
        // <4> 保存订单
        log.info("[createOrder] 保存订单");
        return this.saveOrder(userId,productId,price,amount);
    }
    private Integer saveOrder(Long userId, Long productId, Integer price,Integer amount){
      // <4> 保存订单
      OrderDO order = new OrderDO();
      order.setUserId(userId);
      order.setProductId(productId);
      order.setPayAmount(amount * price);
      orderDao.saveOrder(order);
      log.info("[createOrder] 保存订单: {}", order.getId());
      return order.getId();
   }
    private void reduceStock(Long productId, Integer amount) throws IOException {
        // 参数拼接
        JSONObject params = new JSONObject().fluentPut("productId", String.valueOf(productId))
                .fluentPut("amount", String.valueOf(amount));
        // 执行调用
        HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8082", "/stock",
                params, HttpResponse.class);
        // 解析结果
        Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity()));
        if (!success) {
            throw new RuntimeException("扣除库存失败");
        }
    }
    private void reduceBalance(Long userId, Integer price) throws IOException {
        // 参数拼接
        JSONObject params = new JSONObject().fluentPut("userId", String.valueOf(userId))
                .fluentPut("price", String.valueOf(price));
        // 执行调用
        HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8083", "/balance",
                params, HttpResponse.class);
        // 解析结果
        Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity()));
        if (!success) {
            throw new RuntimeException("扣除余额失败");
        }
    }
}

application.yaml

server:
  port: 8081 # 端口
spring:
  application:
    name: order-service
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/seata_order?useSSL=false&useUnicode=true&characterEncoding=UTF-8
    driver-class-name: com.mysql.jdbc.Driver
    username: root
    password: 123456
# Seata 配置项,对应 SeataProperties 类
seata:
  application-id: ${spring.application.name} # Seata 应用编号,默认为 ${spring.application.name}
  tx-service-group: ${spring.application.name}-group # Seata 事务组编号,用于 TC 集群名
  # 服务配置项,对应 ServiceProperties 类
  service:
    # 虚拟组和分组的映射
    vgroup-mapping:
      order-service-group: default
    # 分组和 Seata 服务的映射
    grouplist:
      default: 127.0.0.1:8091

seata-product

商品库存服务

controller

package com.et.seata.product.controller;
import com.et.seata.product.dto.ProductReduceStockDTO;
import com.et.seata.product.service.ProductService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class ProductController {
   @Autowired
   ProductService productService;
   @PostMapping("/stock")
   public Boolean reduceStock(@RequestBody ProductReduceStockDTO productReduceStockDTO) {
      log.info("[reduceStock] 收到减少库存请求, 商品:{}, 价格:{}", productReduceStockDTO.getProductId(),
            productReduceStockDTO.getAmount());
      try {
         productService.reduceStock(productReduceStockDTO.getProductId(), productReduceStockDTO.getAmount());
         // 正常扣除库存,返回 true
         return true;
      } catch (Exception e) {
         // 失败扣除库存,返回 false
         return false;
      }
   }
}

service

package com.et.seata.product.service;
import com.et.seata.product.dao.ProductDao;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class ProductServiceImpl implements ProductService {
    @Autowired
    private ProductDao productDao;
    @Override
    @Transactional // <1> 开启新事物
    public void reduceStock(Long productId, Integer amount) throws Exception {
        log.info("[reduceStock] 当前 XID: {}", RootContext.getXID());
        // <2> 检查库存
        checkStock(productId, amount);
        log.info("[reduceStock] 开始扣减 {} 库存", productId);
        // <3> 扣减库存
        int updateCount = productDao.reduceStock(productId, amount);
        // 扣除成功
        if (updateCount == 0) {
            log.warn("[reduceStock] 扣除 {} 库存失败", productId);
            throw new Exception("库存不足");
        }
        // 扣除失败
        log.info("[reduceStock] 扣除 {} 库存成功", productId);
    }
    private void checkStock(Long productId, Integer requiredAmount) throws Exception {
        log.info("[checkStock] 检查 {} 库存", productId);
        Integer stock = productDao.getStock(productId);
        if (stock < requiredAmount) {
            log.warn("[checkStock] {} 库存不足,当前库存: {}", productId, stock);
            throw new Exception("库存不足");
        }
    }
}

dao

package com.et.seata.product.dao;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
@Mapper
@Repository
public interface ProductDao {
    /**
     * 获取库存
     *
     * @param productId 商品编号
     * @return 库存
     */
    @Select("SELECT stock FROM product WHERE id = #{productId}")
    Integer getStock(@Param("productId") Long productId);
    /**
     * 扣减库存
     *
     * @param productId 商品编号
     * @param amount    扣减数量
     * @return 影响记录行数
     */
    @Update("UPDATE product SET stock = stock - #{amount} WHERE id = #{productId} AND stock >= #{amount}")
    int reduceStock(@Param("productId") Long productId, @Param("amount") Integer amount);
}

seata-balance

用户余额服务

controller

package com.et.seata.balance.controller;
import com.et.seata.balance.dto.AccountReduceBalanceDTO;
import com.et.seata.balance.service.AccountService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@Slf4j
public class AccountController {
   @Autowired
   private AccountService accountService;
   @PostMapping("/balance")
   public Boolean reduceBalance(@RequestBody AccountReduceBalanceDTO accountReduceBalanceDTO) {
      log.info("[reduceBalance] 收到减少余额请求, 用户:{}, 金额:{}", accountReduceBalanceDTO.getUserId(),
            accountReduceBalanceDTO.getPrice());
      try {
         accountService.reduceBalance(accountReduceBalanceDTO.getUserId(), accountReduceBalanceDTO.getPrice());
         // 正常扣除余额,返回 true
         return true;
      } catch (Exception e) {
         // 失败扣除余额,返回 false
         return false;
      }
   }
}

service

package com.et.seata.balance.service;
import com.et.seata.balance.dao.AccountDao;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
    @Autowired
    private AccountDao accountDao;
    @Override
    @Transactional(propagation = Propagation.REQUIRES_NEW) // <1> 开启新事物
    public void reduceBalance(Long userId, Integer price) throws Exception {
        log.info("[reduceBalance] 当前 XID: {}", RootContext.getXID());
        // <2> 检查余额
        checkBalance(userId, price);
        log.info("[reduceBalance] 开始扣减用户 {} 余额", userId);
        // <3> 扣除余额
        int updateCount = accountDao.reduceBalance(price);
        // 扣除成功
        if (updateCount == 0) {
            log.warn("[reduceBalance] 扣除用户 {} 余额失败", userId);
            throw new Exception("余额不足");
        }
        log.info("[reduceBalance] 扣除用户 {} 余额成功", userId);
    }
    private void checkBalance(Long userId, Integer price) throws Exception {
        log.info("[checkBalance] 检查用户 {} 余额", userId);
        Integer balance = accountDao.getBalance(userId);
        if (balance < price) {
            log.warn("[checkBalance] 用户 {} 余额不足,当前余额:{}", userId, balance);
            throw new Exception("余额不足");
        }
    }
}

dao

package com.et.seata.balance.dao;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
@Mapper
@Repository
public interface AccountDao {
    /**
     * 获取账户余额
     *
     * @param userId 用户 ID
     * @return 账户余额
     */
    @Select("SELECT balance FROM account WHERE id = #{userId}")
    Integer getBalance(@Param("userId") Long userId);
    /**
     * 扣减余额
     *
     * @param price 需要扣减的数目
     * @return 影响记录行数
     */
    @Update("UPDATE account SET balance = balance - #{price} WHERE id = 1 AND balance >= ${price}")
    int reduceBalance(@Param("price") Integer price);
}

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

https://github.com/Harries/springboot-demo

4.测试

  • 启动seata-order服务
  • 启动seata-product服务
  • 启动seata-balance服务

​编辑可以看到控制台输出回滚日志

2024-08-08 22:00:59.467 INFO 35051 --- [tch_RMROLE_1_16] i.s.core.rpc.netty.RmMessageListener : onMessage:xid=172.22.0.3:8091:27573281007513609,branchId=27573281007513610,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/seata_storage,applicationData=null
2024-08-08 22:00:59.467 INFO 35051 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.22.0.3:8091:27573281007513609 27573281007513610 jdbc:mysql://127.0.0.1:3306/seata_storage
2024-08-08 22:00:59.503 INFO 35051 --- [tch_RMROLE_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.22.0.3:8091:27573281007513609 branch 27573281007513610, undo_log deleted with GlobalFinished
2024-08-08 22:00:59.511 INFO 35051 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked

5.引用

https://seata.apache.org

到此这篇关于Spring Boot集成Seata实现基于AT模式的分布式事务的文章就介绍到这了,更多相关Spring Boot集成Seata分布式事务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java如何实现上传文件到服务器指定目录

    Java如何实现上传文件到服务器指定目录

    这篇文章主要介绍了Java如何实现上传文件到服务器指定目录,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-04-04
  • springboot:接收date类型的参数方式

    springboot:接收date类型的参数方式

    这篇文章主要介绍了springboot:接收date类型的参数方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • springmvc组件中的HandlerMapping解析

    springmvc组件中的HandlerMapping解析

    这篇文章主要介绍了springmvc九大组件中的HandlerMapping解析,HandlerMapping表示的是一个URL与一个Handler(可以简单的理解为Controller中有@RequestMapping注解的方法)之间的映射关系,需要的朋友可以参考下
    2023-09-09
  • postman测试传入List<String>参数方式

    postman测试传入List<String>参数方式

    这篇文章主要介绍了postman测试传入List<String>参数方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • 解析Spring Cloud Bus消息总线

    解析Spring Cloud Bus消息总线

    这篇文章主要介绍了Spring Cloud Bus消息总线的介绍及使用,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-10-10
  • Spring AOP + 注解实现统一注解功能

    Spring AOP + 注解实现统一注解功能

    本文我们通过Spring AOP和Java的自定义注解来实现日志的插入功能,非常不错,具有一定的参考借鉴价值,需要的朋友一起看看吧
    2018-05-05
  • 老生常谈Java虚拟机垃圾回收机制(必看篇)

    老生常谈Java虚拟机垃圾回收机制(必看篇)

    下面小编就为大家带来一篇老生常谈Java虚拟机垃圾回收机制(必看篇)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-08-08
  • 基于Java文件输入输出流实现文件上传下载功能

    基于Java文件输入输出流实现文件上传下载功能

    这篇文章主要为大家详细介绍了基于Java文件输入输出流实现文件上传下载功能,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-04-04
  • Java设计模式之模板方法模式详解

    Java设计模式之模板方法模式详解

    这篇文章主要为大家详细介绍了Java设计模式之模板方法模式,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-03-03
  • 从零开始让你的Spring Boot项目跑在Linux服务器

    从零开始让你的Spring Boot项目跑在Linux服务器

    这篇文章主要给大家介绍了如何从零开始让你的Spring Boot项目跑在Linux服务器的相关资料,由于springboot是内嵌了tomcat,所以可以直接将项目打包上传至服务器上,需要的朋友可以参考下
    2021-11-11

最新评论