分布式事务存在问题
单体应用被拆分成微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用三个独立的数据源,业务操作需要调用三个服务来完成。此时每个服务内部的数据一致性由本地事务保证,但是全局的数据一致性无法保证
用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:
仓储服务:对给定的商品扣除仓储数量。
订单服务:根据采购需求创建订单。
帐户服务:从用户帐户中扣除余额。
Seata简介 Seata (Simple Extensible Autonomous Transaction Architecture,简单可扩展自治事务框架) 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。
典型的分布式事务过程
分布式事务处理过程的ID+三组件模型
ID
说明
Transaction ID XID
全局唯一的事务ID
术语
说明
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
处理过程
TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID
XID在微服务调用链路的上下文中传播
RM向TC注册分支事务,将其纳入XID对应全局事务的管辖
TM向TC发起针对XID的全局提交或回滚
TC调度XID下管辖的全部分支事务完成提交或回滚请求
快速入门 下载地址
seata-server-0.9.0解压到指定目录并修改conf目录下的file.com配置文件
新建数据量seata(表在文件里提供)
修改..\seata\conf里面的registry.conf(指定注册中心为nacos)
1 2 3 4 5 6 7 8 9 10 registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "nacos" nacos { serverAddr = "localhost:8848" namespace = "" cluster = "default" } ....
先启动Nacos(8848),再启动seata-server
订单/库存/账户业务数据库准备 使用教程
创建三个微服务(订单、库存、账户)
当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成
(该操作跨越三个数据库,两次远程调用,明显存在分布式事务问题 )
创建业务数据库
seata_order:存储订单的数据库
1 2 3 4 5 6 7 8 9 10 11 CREATE DATABASE seata_order;USE seata_order; CREATE TABLE t_order( id BIGINT (11 ) NOT NULL AUTO_INCREMENT PRIMARY KEY , user_id BIGINT (11 ) DEFAULT NULL COMMENT '用户id' , product_id BIGINT (11 ) DEFAULT NULL COMMENT '产品id' , count INT (11 ) DEFAULT NULL COMMENT '数量' , money DECIMAL (11 ,0 ) DEFAULT NULL COMMENT '金额' , status INT (1 ) DEFAULT NULL COMMENT '订单状态:0创建中,1已完结' )ENGINE= InnoDB AUTO_INCREMENT= 7 CHARSET= utf8;
seata_storage:存储库存的数据库
1 2 3 4 5 6 7 8 9 10 11 CREATE DATABASE seata_storage;USE seata_storage; CREATE TABLE t_storage( id BIGINT (11 ) NOT NULL AUTO_INCREMENT PRIMARY KEY , product_id BIGINT (11 ) DEFAULT NULL COMMENT '产品id' , total INT (11 ) DEFAULT NULL COMMENT '总库存' , used INT (11 ) DEFAULT NULL COMMENT '已用库存' , residue INT (11 ) DEFAULT NULL COMMENT '剩余库存' )ENGINE= InnoDB AUTO_INCREMENT= 7 CHARSET= utf8; INSERT INTO t_storage(id, product_id, total, used, residue) VALUES (1 ,1 ,100 ,0 ,100 );
seata_account:存储账户信息的数据库
1 2 3 4 5 6 7 8 9 10 11 CREATE DATABASE seata_account;USE seata_account; CREATE TABLE t_account( id BIGINT (11 ) NOT NULL AUTO_INCREMENT PRIMARY KEY , user_id BIGINT (11 ) DEFAULT NULL COMMENT '用户id' , total DECIMAL (10 ,0 ) DEFAULT NULL COMMENT '总额度' , used DECIMAL (10 ,0 ) DEFAULT NULL COMMENT '已用额度' , residue DECIMAL (10 ,0 ) DEFAULT 0 COMMENT '剩余可用额度' )ENGINE= InnoDB AUTO_INCREMENT= 7 CHARSET= utf8; INSERT INTO t_account(id, user_id, total, used, residue) VALUES (1 ,1 ,1000 ,0 ,1000 );
对上述数据库表创建对应的回滚日志表
订单-库存-账户三个库各建各自的回滚日志表
..\environment\seata\conf 目录下db_undo_log.sql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 drop table `undo_log`;CREATE TABLE `undo_log` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `branch_id` bigint (20 ) NOT NULL , `xid` varchar (100 ) NOT NULL , `context` varchar (128 ) NOT NULL , `rollback_info` longblob NOT NULL , `log_status` int (11 ) NOT NULL , `log_created` datetime NOT NULL , `log_modified` datetime NOT NULL , `ext` varchar (100 ) DEFAULT NULL , PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE= InnoDB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
订单/库存/账户业务微服务准备
业务需求:
下订单——减库存——扣余额——改(订单)状态
新建订单Order-Module 1.新建seata-order-service-2001
2.引入pom
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 <dependencies > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-seata</artifactId > <exclusions > <exclusion > <artifactId > seata-all</artifactId > <groupId > io.seata</groupId > </exclusion > </exclusions > </dependency > <dependency > <groupId > io.seata</groupId > <artifactId > seata-all</artifactId > <version > 0.9.0</version > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-openfeign</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-actuator</artifactId > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.37</version > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > druid-spring-boot-starter</artifactId > <version > 1.1.10</version > </dependency > <dependency > <groupId > org.mybatis.spring.boot</groupId > <artifactId > mybatis-spring-boot-starter</artifactId > <version > 2.0.0</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <optional > true</optional > </dependency > </dependencies >
3.配置yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 server: port: 2001 spring: application: name: seata-order-service cloud: alibaba: seata: tx-service-group: fsp_tx_group nacos: discovery: server-addr: localhost:8848 datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_order username: ***** password: ***** feign: hystrix: enabled: false logging: level: io: seata: info mybatis: mapperLocations: classpath:mapper/*.xml
4.file.conf
5.registry.conf
6.domain
CommonResult
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Data @AllArgsConstructor @NoArgsConstructor public class CommonResult <T> { private Integer code; private String message; private T data; public CommonResult (Integer code, String message) { this (code, message, null ); } }
Order
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Data @AllArgsConstructor @NoArgsConstructor public class Order { private Long id; private Long userId; private Long productId; private Integer count; private BigDecimal money; private Integer status; }
7.dao
1 2 3 4 5 6 7 8 9 10 @Mapper public interface OrderDao { void create (Order order) ; void update (@Param("userId") Long userId, @Param("status") Integer status) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace ="com.cyan.springcloud.alibaba.dao.OrderDao" > <resultMap id ="BaseResultMap" type ="com.cyan.springcloud.alibaba.domain.Order" > <id column ="id" property ="id" jdbcType ="BIGINT" /> <result column ="user_id" property ="userId" jdbcType ="BIGINT" /> <result column ="product_id" property ="productId" jdbcType ="BIGINT" /> <result column ="count" property ="count" jdbcType ="INTEGER" /> <result column ="money" property ="money" jdbcType ="DECIMAL" /> <result column ="status" property ="status" jdbcType ="INTEGER" /> </resultMap > <insert id ="create" parameterType ="com.cyan.springcloud.alibaba.domain.Order" > INSERT INTO t_order(user_id, product_id, "count", money, status) VALUES (null, #{userId}, #{productId}, #{count}, #{money}, 0); </insert > <update id ="update" parameterType ="com.cyan.springcloud.alibaba.domain.Order" > UPDATE t_order SET status = 1 WHERE user_id = #{userId} AND status = #{status}; </update > </mapper >
8.service
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public interface OrderService { void create (Order order) ; } @FeignClient(value = "seata-storage-service") public interface StorageService { @PostMapping("/storage/decrease") CommonResult decrease (@RequestParam("productId") Long productId, @RequestParam("count") Integer count) ; } @FeignClient(value = "seata-account-service") public interface AccountService { @PostMapping("/account/decrease") CommonResult decrease (@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Slf4j @Service public class OrderServiceImpl implements OrderService { @Resource private OrderDao orderDao; @Resource private StorageService storageService; @Resource private AccountService accountService; @Override public void create (Order order) { log.info("*****************开始新建订单" ); orderDao.create(order); log.info("*****************订单微服务开始调用库存扣减" ); storageService.decrease(order.getProductId(), order.getCount()); log.info("*****************订单微服务调用库存扣减结束" ); log.info("*****************订单微服务开始调用账户扣减" ); accountService.decrease(order.getUserId(), order.getMoney()); log.info("*****************订单微服务开始调用账户结束" ); log.info("*****************开始修改订单状态" ); orderDao.update(order.getUserId(), 0 ); log.info("*****************修改订单状态结束" ); log.info("*****************成功下单!!!" ); } }
9.controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @RestController @RequestMapping("/order") public class OrderController { @Resource private OrderService orderService; @GetMapping("/create") public CommonResult create (Order order) { orderService.create(order); return new CommonResult (200 , "订单创建成功" ); } }
10.MyBatis和数据源代理配置
1 2 3 4 5 @Configuration @MapperScan({"com.cyan.springcloud.alibaba.dao"}) public class MyBatisConfig {}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Configuration public class DataSourceProxyConfig { @Value("${mybatis.mapperLocations}") private String mapperLocations; @Bean @ConfigurationProperties(prefix = "spring.datasource") public DataSource druidDataSource () { return new DruidDataSource (); } @Bean public DataSourceProxy dataSourceProxy (DataSource dataSource) { return new DataSourceProxy (dataSource); } @Bean public SqlSessionFactory sqlSessionFactoryBean (DataSourceProxy dataSourceProxy) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean (); sqlSessionFactoryBean.setDataSource(dataSourceProxy); sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver ().getResources(mapperLocations)); sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory ()); return sqlSessionFactoryBean.getObject(); } }
新建库存Storage-Module 新建账户Account-Module 1.新建seata-storage-service-2002
….一模一样
验证
正常下单
http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100
手动制造超时异常,当库存和账户金额扣减后,订单状态没有设置为已完成(没有从零变为一),由于feign的重试机制,账户余额还有可能被多次扣减
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Service public class OrderServiceImpl implements OrderService { @Resource private OrderDao orderDao; @Resource private StorageService storageService; @Resource private AccountService accountService; @Override @GlobalTransactional(name = "fsp-create-order", rollbackFor = Exception.class) public void create (Order order) { log.info("*****************开始新建订单" ); orderDao.create(order); log.info("*****************订单微服务开始调用库存扣减" ); storageService.decrease(order.getProductId(), order.getCount()); log.info("*****************订单微服务调用库存扣减结束" ); log.info("*****************订单微服务开始调用账户扣减" ); accountService.decrease(order.getUserId(), order.getMoney()); log.info("*****************订单微服务开始调用账户结束" ); log.info("*****************开始修改订单状态" ); orderDao.update(order.getUserId(), 0 ); log.info("*****************修改订单状态结束" ); log.info("*****************成功下单!!!" ); } }
Seata原理
分布式事务执行流程
TM开启分布式事务(TM向TC注册全局事务记录)
按业务场景,编排数据库、服务等事务内资源(RM向TC汇报资源准备状态)
TM结束分布式事务,事务一阶段结束(TM通知TC提交回滚分布式事务)
TC汇总事务信息,决定分布式事务是提交还是回滚
TCC通知所有RM提交/回滚资源,事务二阶段结束
AT 模式 提供无侵入自动补偿的事务模式,目前已支持MySQL、Oracle、PostgreSQL、TiDB 和 MariaDB。H2、DB2、SQLServer、达梦开发中
前提
基于支持本地 ACID 事务的关系型数据库。
Java 应用,通过 JDBC 访问数据库。
整体机制
两阶段提交协议的演变:
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
二阶段:
提交异步化,非常快速地完成。
回滚通过一阶段的回滚日志进行反向补偿。
一阶段加载 在一阶段,seata会拦截 ”业务SQL“
解析SQL语义,找到”业务SQL“要更新的业务数据,在业务数据被更新前,将其保存成”before image“
执行”业务SQL” 更新业务数据,在业务数据更新之后
其保存成”after image“,最后生成行锁
以上操作全部在一个数据库事务内完成,这样保证一阶段操作的原子性
二阶段提交 二阶段如果顺利提交,由于”业务SQL “在一阶段已经提交至数据库,所以seata框架只需要将一阶段保存的快照数据和行锁删掉,完成数据清理即可
二阶段回滚 二阶段如果是回滚,seata就需要回滚一阶段已经执行的”业务SQL“,还原业务数据。回滚方式是用”before image“还原业务数据,但是还原前要先校验脏写,对比”数据库当前业务数据“和”after image“,
如果两个数据完全一致说明没有脏写,可以还原业务数据
如果不一致说明有脏写,出现脏写就需要转人工处理