并发与MySQL的更新

情景是这样的,我们经常会在基础设施尚不完善的时候利用数据库设计一张表来当 作队列使用,并具有以下典型的字段设计:

column type comment
id int
data string
processed int 处理状态
update_time timestamp
create_time timestamp

这里处理状态字段 processed 用于表示数据记录是否已经被处理过了。 除了 尚未处理已处理错误 外,通常为了能让程序能够并发处理,还会增加一个状态 处理中

不过在实践中,常见的一个错误是更新状态时,未考虑并发的情景,而写成

UPDATE t_queue SET processed = 'processing' WHERE id = ?

这样做会有几个风险:

  • 如果同样id的两条数据排队开始处理,不能保证 至多处理一次
  • 实际上甚至如果并发的两个程序分别处理同一个id数据,成功的先完成,失败的后完成 processed 最后的结果会是 error ,与实际情况不一致。

解决办法

出现问题的根源是两个前提,只要打破其中一个就可以防止问题的产生

  • 有并发相同的id数据排队
  • 原来那句SQL更新无法保证只有一个成功

由于我们的需求就是让程序可以并发执行提升执行效率,所以禁止并发是不可行的。 那么还是回到原来那句 SQL 上,通常的一个方法是将 SQL 改成这样

UPDATE t_queue SET processed = 'processing'
WHERE id = ? AND processed = 'unproccessed'

在执行SQL后根据数据库返回的 影响行数 来判断更新是否成功,且这样 单句SQL 对使用 事务 是无关的。 当并发的两句相同的SQL同时提交到数据库时,为了保证原子性,数据库会对这条 数据进行加锁操作1。针对同id的SQL实际上顺序执行的,然而由于第二句执 行时,条件已不满足2

顺便一提,如果是 针对事务 且有 多句SQL 执行,尤其是还需要 在不同的 会话读取尚未提交的数据 时,需要进一步 了解数据库的隔离 3 ,换句 话说, 我们是借用数据库的特性来防并发,虽然很方便,但无异于将问题隐藏在数 据库层面,掩盖了设计与代码中的问题。

将上面的SQL推而广之

UPDATE t_queue SET processed = @target_status
WHERE id = ? AND processed = @source_status

便可应用到不同的阶段。实际上这是一种综合考虑功能与效率,原理采用加锁的方 式实现的原子性。

不加锁的原子性与CAS

比较后交换

那么针对不使用事务的情景,上面那个SQL是否可以不加锁也能实现原子性呢?答 案是可以的,我们再来回顾一下这个过程。为了情景简单,这里将情景设计为多个 线程在同时处理内存的一个变量。

  • 变量也就是内存中指向的地址的内容,假设我们想将变量A (对应地址 0x0100)增加1,在读取与更新之间有个时间差,故保证原子性 ,实际操作是在更新前再次比较之前读取到的一致

  • 多个线程操作时,也执行一样的操作,发现后来者的比较不成立,更新也就 失败了

这个过程就叫做 比较后交换(Compare-And-Swap) 4,如果使用伪代码来表 示,如下:

function cas(p : pointer to int, old : int, new : int) returns bool {
  if *p ≠ old {
    return false
  }
  *p ← new
  return true
}

Java中 java.util.concurrent.atomic 包中的实现5实际上使用 sun.misc.unsafe 包中对内存地址的操作。

原子更新

如果要保证更新操作的成功,比较失败的线程会再次进行读取,并循环这个过程直 到成功为止,特别需要注意可能会出现 死循环

function add(p : pointer to int, a : int) returns int {
  done ← false
  while not done {
    value ← *p  // Even this operation doesn't need to be atomic.
    done ← cas(p, value, value + a)
  }
  return value + a
}

CAS中的ABA问题

ABA问题是指如下图:

还是上面的例子,加入第三个线程减操作

  1. 线程1将变量A从20更新为21
  2. 线程2比较失败,进入下一次比较
  3. 线程3恰巧在1成功,2失败时执行,满足 内存内容 == 21 条件,更新为 20
  4. 线程2读取到的条件又满足,又将内存修改为 21

实际上这样的操作结果, 很可能是不符合需求的

简单来说是指并发的线程在操作变量时,前提条件刚好满足 程序的条件 ,而不是 业务需求的条件 ,特别是并发的情况下。这是一种典型的 测试结果 呈虚假反应 6

我们可以重新给问题定义一下前提条件: 更新比较的时候,需要是原来读取时的 那个数据 ,比较简单的一种办法是给数据加上版本号,变成下图:

在比较数据内容的时候,再带上版本号的比较,一致的时候才满足更新条件;同时 更新时更新版本号。

实际上这样的CAS功能实现非常底层,一般都需要直接使用汇编指令来完成7

最后我们再回到上面数据库的例子,如果我们没有使用事务,那么实际上就会产生 3个SQL提交出现ABA的问题。 这时如果业务需要对数据操作有要求,那么还是需 要额外加锁,或者增加版本号之类的机制,保证操作的数据是读取到的数据

CAS的优缺点

部分人认为在单处理器(单核单线程)系统中,可以通过禁用中断,顺序执行指令来 避免使用CAS或者其他原子操作来保证原子性(也就是前面提到两个前提里的另外一 个思路)。但实际上会引入其他代价,例如必须充分相信代码不会过分占用CPU资源 ,死循环或者内存页错误;在实际使用中禁用中断往往代价巨大。实际使用中发现 单处理器系统使用例如Linux使用的futex等机制,保证线程间的调度让程序可以并 行执行会有许多益处。

使用CAS的好处显而易见,可以避免前文举例的这种内存内容出现冲突的情景,用 来做线程同步,提高并发吞吐。

但是也必须看到,在失败的情景下,如果利用CAS做更新, 在线程调度冲突非常频 繁的系统中,失败的线程会一直重试直到成功,对CPU资源是巨大的消耗 ;并且没 有考虑调度的公平性, 很可能出现第二个线程一直被后续进来的操作堵掉的情景

还有一点值得一提,CAS只能保证一个变量(或者内存地址)的原子性,并不能保证一 段代码在并发时的原子性,当然这里不包括使用CAS当作锁来实现,因为实际上,程 序的并发往往会采用更加合适的机制来同步,例如各种锁机制。

小结

CAS是一种适用于线程冲突并不激烈,实现方式较为简单的原子操作。适用于对单个 变量进行操作的情景。

__END__


  1. 不同的数据库引擎有不同的加锁方式,但保证原子性一般都需要加锁

  2. 未使用事务时,第一句的值立即更新生效;比dirty read更严格的隔离级 别均无法满足条件

  3. https://en.wikipedia.org/wiki/Isolation_(database_systems)

  4. https://en.wikipedia.org/wiki/Compare-and-swap

  5. https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/util/concurrent/atomic/AtomicInteger.java#L134

  6. https://en.wikipedia.org/wiki/False_positives_and_false_negatives

  7. https://stackoverflow.com/questions/38984153/how-can-i-implement-aba-counter-with-c11-cas