带你用Python实现Saga 分布式事务的方法

 更新时间:2021年09月07日 10:06:03   作者:叶东富  
在这篇文章里,我们介绍了 SAGA 的理论知识,也通过一个例子,完整给出了编写一个 SAGA 事务的过程,涵盖了正常成功完成,异常情况,以及成功回滚的情况,需要的朋友参考下吧

银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID,只能够通过分布式事务来解决。

分布式事务

分布式事务在分布式环境下,为了满足可用性、性能与降级服务的需要,降低一致性与隔离性的要求,一方面遵循 BASE 理论:

  • 基本业务可用性( Basic Availability )
  • 柔性状态( Soft state )
  • 最终一致性( Eventual consistency )
  • 另一方面,分布式事务也部分遵循 ACID 规范:
  • 原子性:严格遵循
  • 一致性:事务完成后的一致性严格遵循;事务中的一致性可适当放宽
  • 隔离性:并行事务间不可影响;事务中间结果可见性允许安全放宽
  • 持久性:严格遵循

SAGA

Saga 是这一篇数据库论文SAGAS提到的一个分布式事务方案。其核心思想是将长事务拆分为多个本地短事务,由 Saga 事务协调器协调,如果各个本地事务成功完成那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。

目前可用于 SAGA 的开源框架,主要为 Java 语言,其中以 seata 为代表。我们的例子采用 go 语言,使用的分布式事务框架为https://github.com/yedf/dtm,它对分布式事务的支持非常优雅。下面来详细讲解 SAGA 的组成:

DTM 事务框架里,有 3 个角色,与经典的 XA 分布式事务一样:

  • AP/应用程序,发起全局事务,定义全局事务包含哪些事务分支
  • RM/资源管理器,负责分支事务各项资源的管理
  • TM/事务管理器,负责协调全局事务的正确执行,包括 SAGA 正向 /逆向操作的执行

下面看一个成功完成的 SAGA 时序图,就很容易理解 SAGA 分布式事务:

SAGA实践

对于我们要进行的银行转账的例子,我们将在正向操作中,进行转入转出,在补偿操作中,做相反的调整。

首先我们创建账户余额表:

CREATE TABLE dtm_busi.`user_account` ( 
  `id` int(11) AUTO_INCREMENT PRIMARY KEY, 
  `user_id` int(11) not NULL UNIQUE , 
  `balance` decimal(10,2) NOT NULL DEFAULT '0.00', 
  `create_time` datetime DEFAULT now(), 
  `update_time` datetime DEFAULT now() 
); 

我们先编写核心业务代码,调整用户的账户余额

def saga_adjust_balance(cursor, uid, amount): 
  affected = utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d and balance >= -%d" %(amount, uid, amount)) 
  if affected == 0: 
    raise Exception("update error, balance not enough") 

下面我们来编写具体的正向操作 /补偿操作的处理函数

@app.post("/api/TransOutSaga") 
def trans_out_saga(): 
  saga_adjust_balance(c, out_uid, -30) 
  return {"dtm_result": "SUCCESS"} 
 
@app.post("/api/TransOutCompensate") 
def trans_out_compensate(): 
  saga_adjust_balance(c, out_uid, 30) 
  return {"dtm_result": "SUCCESS"} 
 
@app.post("/api/TransInSaga") 
def trans_in_saga(): 
  saga_adjust_balance(c, in_uid, 30) 
  return {"dtm_result": "SUCCESS"} 
 
@app.post("/api/TransInCompensate") 
def trans_in_compensate(): 
  saga_adjust_balance(c, in_uid, -30) 
  return {"dtm_result": "SUCCESS"} 

到此各个子事务的处理函数已经 OK 了,然后是开启 SAGA 事务,进行分支调用

# 这是 dtm 服务地址 
dtm = "http://localhost:8080/api/dtmsvr" 
# 这是业务微服务地址 
svc = "http://localhost:5000/api" 
 
    req = {"amount": 30} 
    s = saga.Saga(dtm, utils.gen_gid(dtm)) 
    s.add(req, svc + "/TransOutSaga", svc + "/TransOutCompensate") 
    s.add(req, svc + "/TransInSaga", svc + "/TransInCompensate") 
    s.submit() 

至此,一个完整的 SAGA 分布式事务编写完成。

如果您想要完整运行一个成功的示例,那么参考这个例子yedf/dtmcli-py-sample,将它运行起来非常简单

# 部署启动 dtm 
# 需要 docker 版本 18 以上 
git clone https://github.com/yedf/dtm 
cd dtm 
docker-compose up 
 
# 另起一个命令行 
git clone https://github.com/yedf/dtmcli-py-sample 
cd dtmcli-py-sample 
pip3 install flask dtmcli requests 
flask run 
 
# 另起一个命令行 
curl localhost:5000/api/fireSaga 

处理网络异常

假设提交给 dtm 的事务中,调用转入操作时,出现短暂的故障怎么办?按照 SAGA 事务的协议,dtm 会重试未完成的操作,这时我们要如何处理?故障有可能是转入操作完成后出网络故障,也有可能是转入操作完成中出现机器宕机。如何处理才能够保障账户余额的调整是正确无问题的?

这类网络异常的妥当处理,是分布式事务中的大难题,异常情况包括三类:重复请求、空补偿、悬挂,都需要正确处理

DTM 提供了子事务屏障功能,保证上述异常情况下的业务逻辑,只会有一次正确顺序下的成功提交。(子事务屏障详情参考分布式事务最经典的七种解决方案的子事务屏障环节)

我们把处理函数调整为:

@app.post("/api/TransOutSaga") 
def trans_out_saga(): 
  with barrier.AutoCursor(conn_new()) as cursor: 
    def busi_callback(c): 
      saga_adjust_balance(c, out_uid, -30) 
    barrier_from_req(request).call(cursor, busi_callback) 
  return {"dtm_result": "SUCCESS"} 

这里的 barrier_from_req(request).call(cursor, busi_callback)调用会使用子事务屏障技术,保证 busi_callback 回调函数仅被提交一次

您可以尝试多次调用这个 TransIn 服务,仅有一次余额调整。

处理回滚

假如银行将金额准备转入用户 2 时,发现用户 2 的账户异常,返回失败,会怎么样?我们调整处理函数,让转入操作返回失败

@app.post("/api/TransInSaga") 
def trans_in_saga(): 
  return {"dtm_result": "FAILURE"} 

我们给出事务失败交互的时序图

这里有一点,TransIn 的正向操作什么都没有做,就返回了失败,此时调用 TransIn 的补偿操作,会不会导致反向调整出错了呢?

不用担心,前面的子事务屏障技术,能够保证 TransIn 的错误如果发生在提交之前,则补偿为空操作;TransIn 的错误如果发生在提交之后,则补偿操作会将数据提交一次。

您可以将返回错误的 TransIn 改成:

@app.post("/api/TransInSaga") 
def trans_in_saga(): 
  with barrier.AutoCursor(conn_new()) as cursor: 
    def busi_callback(c): 
      saga_adjust_balance(c, in_uid, 30) 
    barrier_from_req(request).call(cursor, busi_callback) 
  return {"dtm_result": "FAILURE"} 

最后的结果余额依旧会是对的,原理可以参考:分布式事务最经典的七种解决方案的子事务屏障环节

小结

在这篇文章里,我们介绍了 SAGA 的理论知识,也通过一个例子,完整给出了编写一个 SAGA 事务的过程,涵盖了正常成功完成,异常情况,以及成功回滚的情况。相信读者通过这边文章,对 SAGA 已经有了深入的理解。

文中使用的 dtm 是新开源的 Golang 分布式事务管理框架,功能强大,支持 TCC 、SAGA 、XA 、事务消息等事务模式,支持 Go 、python 、PHP 、node 、csharp 等语言的。同时提供了非常简单易用的接口。

阅读完此篇干货,欢迎大家访问项目https://github.com/yedf/dtm,给颗星星支持!

到此这篇关于带你用Python实现Saga 分布式事务的问题的文章就介绍到这了,更多相关Python Saga 分布式事务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 一文带你轻松搞定Python正则匹配

    一文带你轻松搞定Python正则匹配

    在python 中,正则匹配用到的还是挺多的,下面这篇文章主要给大家介绍了关于Python正则匹配的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-08-08
  • python银行卡号码校验Luhn模10算法

    python银行卡号码校验Luhn模10算法

    这篇文章主要为大家介绍了python银行卡号码校验Luhn模10算法,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05
  • Python munch包 /Munch() 的用法详解

    Python munch包 /Munch() 的用法详解

    这篇文章主要介绍了Python munch包 /Munch() 的用法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-09-09
  • python每隔N秒运行指定函数的方法

    python每隔N秒运行指定函数的方法

    这篇文章主要介绍了python每隔N秒运行指定函数的方法,涉及Python的线程与时间操作技巧,非常具有实用价值,需要的朋友可以参考下
    2015-03-03
  • 如何在windows下安装Pycham2020软件(方法步骤详解)

    如何在windows下安装Pycham2020软件(方法步骤详解)

    这篇文章主要介绍了在windows下安装Pycham2020软件方法,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-05-05
  • 利用Python实现生成并识别图片验证码

    利用Python实现生成并识别图片验证码

    这篇文章主要为大家的详细介绍了如何利用Python实现生成并识别图片验证码,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2024-02-02
  • Python利用Flask动态生成汉字头像

    Python利用Flask动态生成汉字头像

    这篇文章主要为大家详细介绍了Python如何利用Flask动态生成一个汉字头像,文中的示例代码讲解详细,对我们学习Python有一定的帮助,需要的可以参考一下
    2023-01-01
  • K-近邻算法的python实现代码分享

    K-近邻算法的python实现代码分享

    这篇文章主要介绍了K-近邻算法的python实现代码分享,具有一定借鉴价值,需要的朋友可以参考下。
    2017-12-12
  • python处理大数字的方法

    python处理大数字的方法

    这篇文章主要介绍了python处理大数字的方法,涉及Python递归操作的相关技巧,需要的朋友可以参考下
    2015-05-05
  • Python matplotlib实现条形统计图

    Python matplotlib实现条形统计图

    这篇文章主要为大家详细介绍了Python matplotlib实现条形统计图,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-04-04

最新评论