前言

我说分布式事务是屎有没有懂得。就不该存在这种东西。

一个业务操作涉及多个独立系统/数据库,需要保证数据一致。

强一致

2PC(Two-Phase Commit):直译两阶段提交

阶段1:Prepare

  • 协调者问所有参与者:能不能提交?
  • 每个节点锁资源并返回 yes/no

阶段2:Commit

  • 如果全部 yes → 提交
  • 如果有一个 no → 回滚

这个可以保证强一致性,不过在没有commit之前的时候,事务会把所有的资源锁住,有着阻塞的性能差的问题。而且如果出现了单点故障,其它的服务也会白白地浪费一次。

3PC:多加了一个看参与者挂没挂地操作

阶段1:CanCommit(询问阶段)

  • 参与者只做检查(不执行、不锁资源)

阶段2:PreCommit(预提交)

  • 执行事务
  • 写 undo/redo 日志
  • 锁资源
  • 但不提交

阶段3:DoCommit 真正提交

最终一致

TCC(Try-Confirm-Cancel):主要体现在不依靠数据库的事务

三阶段:

  1. Try:预留资源(不真正提交)
  2. Confirm:真正执行
  3. Cancel:回滚

特点:

  • 业务侵入强
  • 可控性高

这个就时相当于完全地业务写回滚代码了。

本地消息表 + 异步补偿

这个应该就是outbox模式,先落库再发消息,不过落库就已经影响性能了。

RocketMQ事务消息

Producer → Broker:发送 Half Message

Producer:执行 DB 操作(下单)

Producer → Broker:commit / rollback

Consumer → 消费消息(扣库存)

Saga模式:把一个大事务拆成多个本地事务,如果失败就执行补偿

TCC示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
库存表(stock)
- sku_id
- total_stock
- available_stock
- reserved_stock

预留记录表(reservation)
- tx_id            // 全局事务ID(幂等键)
- sku_id
- user_id
- quantity
- status           // TRYING / CONFIRMED / CANCELED
- expire_at        // 预留过期时间(兜底释放)
- created_at
- updated_at
1
2
3
4
5
interface SeckillTccService {
    boolean tryReserve(String txId, Long userId, Long skuId, int qty);
    boolean confirm(String txId);
    boolean cancel(String txId);
}

try

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Transactional
boolean tryReserve(String txId, Long userId, Long skuId, int qty) {
    // 1. 幂等控制:如果已存在记录,直接返回(避免重复Try)
    Reservation r = reservationDao.findByTxId(txId);
    if (r != null) {
        if (r.status == CONFIRMED) return true;     // 已完成
        if (r.status == CANCELED) return false;     // 已取消(反悬挂保护)
        return true;                                // TRYING 重入
    }

    // 2. 反悬挂:若已经被Cancel标记过(比如先到Cancel后到Try)
    if (reservationDao.existsCanceled(txId)) {
        return false;
    }

    // 3. 扣减可用库存、增加预留库存(原子更新,防超卖)
    int updated = stockDao.tryReserve(skuId, qty);
    // SQL 示意:
    // UPDATE stock
    // SET available_stock = available_stock - ?, reserved_stock = reserved_stock + ?
    // WHERE sku_id = ? AND available_stock >= ?

    if (updated == 0) {
        return false; // 库存不足
    }

    // 4. 写预留记录(状态:TRYING)
    reservationDao.insert(new Reservation(
        txId, userId, skuId, qty, TRYING, now()+TTL
    ));

    return true;
}

confirm

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Transactional
boolean confirm(String txId) {
    Reservation r = reservationDao.findByTxId(txId);
    if (r == null) {
        // 允许空Confirm:可能Try未执行成功(幂等设计)
        return true;
    }

    if (r.status == CONFIRMED) return true;   // 幂等
    if (r.status == CANCELED) return false;   // 已取消

    // 1. 从预留转已售(减少 reserved,不回加 available)
    stockDao.confirm(r.skuId, r.quantity);
    // SQL 示意:
    // UPDATE stock
    // SET reserved_stock = reserved_stock - ?
    // WHERE sku_id = ? AND reserved_stock >= ?

    // 2. 更新状态
    reservationDao.updateStatus(txId, CONFIRMED);

    return true;
}

cancel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Transactional
boolean cancel(String txId) {
    Reservation r = reservationDao.findByTxId(txId);

    // 1. 空回滚(Try 还没执行或失败)
    if (r == null) {
        // 记录一个“已取消占位”,防止后续Try(反悬挂)
        reservationDao.insertCanceledMarker(txId);
        return true;
    }

    if (r.status == CANCELED) return true;     // 幂等
    if (r.status == CONFIRMED) return true;    // 已确认,不再回滚

    // 2. 释放预留库存
    stockDao.release(r.skuId, r.quantity);
    // SQL 示意:
    // UPDATE stock
    // SET available_stock = available_stock + ?, reserved_stock = reserved_stock - ?
    // WHERE sku_id = ? AND reserved_stock >= ?

    // 3. 更新状态
    reservationDao.updateStatus(txId, CANCELED);

    return true;
}