前言
我说分布式事务是屎有没有懂得。就不该存在这种东西。
一个业务操作涉及多个独立系统/数据库,需要保证数据一致。
强一致
2PC(Two-Phase Commit):直译两阶段提交
阶段1:Prepare
- 协调者问所有参与者:能不能提交?
- 每个节点锁资源并返回 yes/no
阶段2:Commit
- 如果全部 yes → 提交
- 如果有一个 no → 回滚
这个可以保证强一致性,不过在没有commit之前的时候,事务会把所有的资源锁住,有着阻塞的性能差的问题。而且如果出现了单点故障,其它的服务也会白白地浪费一次。
3PC:多加了一个看参与者挂没挂地操作
阶段1:CanCommit(询问阶段)
阶段2:PreCommit(预提交)
- 执行事务
- 写 undo/redo 日志
- 锁资源
- 但不提交
阶段3:DoCommit 真正提交
最终一致
TCC(Try-Confirm-Cancel):主要体现在不依靠数据库的事务
三阶段:
- Try:预留资源(不真正提交)
- Confirm:真正执行
- Cancel:回滚
特点:
这个就时相当于完全地业务写回滚代码了。
本地消息表 + 异步补偿:
这个应该就是outbox模式,先落库再发消息,不过落库就已经影响性能了。
RocketMQ事务消息:
Producer → Broker:发送 Half Message
Producer:执行 DB 操作(下单)
Producer → Broker:commit / rollback
Consumer → 消费消息(扣库存)
Saga模式:把一个大事务拆成多个本地事务,如果失败就执行补偿
TCC示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
库存表(stock)
- sku_id
- total_stock
- available_stock
- reserved_stock
预留记录表(reservation)
- tx_id // 全局事务ID(幂等键)
- sku_id
- user_id
- quantity
- status // TRYING / CONFIRMED / CANCELED
- expire_at // 预留过期时间(兜底释放)
- created_at
- updated_at
|
1
2
3
4
5
|
interface SeckillTccService {
boolean tryReserve(String txId, Long userId, Long skuId, int qty);
boolean confirm(String txId);
boolean cancel(String txId);
}
|
try
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
@Transactional
boolean tryReserve(String txId, Long userId, Long skuId, int qty) {
// 1. 幂等控制:如果已存在记录,直接返回(避免重复Try)
Reservation r = reservationDao.findByTxId(txId);
if (r != null) {
if (r.status == CONFIRMED) return true; // 已完成
if (r.status == CANCELED) return false; // 已取消(反悬挂保护)
return true; // TRYING 重入
}
// 2. 反悬挂:若已经被Cancel标记过(比如先到Cancel后到Try)
if (reservationDao.existsCanceled(txId)) {
return false;
}
// 3. 扣减可用库存、增加预留库存(原子更新,防超卖)
int updated = stockDao.tryReserve(skuId, qty);
// SQL 示意:
// UPDATE stock
// SET available_stock = available_stock - ?, reserved_stock = reserved_stock + ?
// WHERE sku_id = ? AND available_stock >= ?
if (updated == 0) {
return false; // 库存不足
}
// 4. 写预留记录(状态:TRYING)
reservationDao.insert(new Reservation(
txId, userId, skuId, qty, TRYING, now()+TTL
));
return true;
}
|
confirm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@Transactional
boolean confirm(String txId) {
Reservation r = reservationDao.findByTxId(txId);
if (r == null) {
// 允许空Confirm:可能Try未执行成功(幂等设计)
return true;
}
if (r.status == CONFIRMED) return true; // 幂等
if (r.status == CANCELED) return false; // 已取消
// 1. 从预留转已售(减少 reserved,不回加 available)
stockDao.confirm(r.skuId, r.quantity);
// SQL 示意:
// UPDATE stock
// SET reserved_stock = reserved_stock - ?
// WHERE sku_id = ? AND reserved_stock >= ?
// 2. 更新状态
reservationDao.updateStatus(txId, CONFIRMED);
return true;
}
|
cancel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@Transactional
boolean cancel(String txId) {
Reservation r = reservationDao.findByTxId(txId);
// 1. 空回滚(Try 还没执行或失败)
if (r == null) {
// 记录一个“已取消占位”,防止后续Try(反悬挂)
reservationDao.insertCanceledMarker(txId);
return true;
}
if (r.status == CANCELED) return true; // 幂等
if (r.status == CONFIRMED) return true; // 已确认,不再回滚
// 2. 释放预留库存
stockDao.release(r.skuId, r.quantity);
// SQL 示意:
// UPDATE stock
// SET available_stock = available_stock + ?, reserved_stock = reserved_stock - ?
// WHERE sku_id = ? AND reserved_stock >= ?
// 3. 更新状态
reservationDao.updateStatus(txId, CANCELED);
return true;
}
|