先简单介绍一下事件的背景,相信只要有产品发布在手机应用端,那全员推送这件 事情肯定是逃不掉的;需求便是在尽量短的时间内将推送消息发出去,并保证良好 的送达率。整个推送系统后面再总结一篇完整的回顾。
这里对于 redis 的使用情景,仅使用 list 数据结构当作队列使用。至于是否有 更合适的消息中间件,在文末论述。
单单单…全是单线程操作的困境
单机单个连接单线程发送(仅指与 redis 的交互),显然没有充分地利用硬件资源 ,那首先我们可以来了解一下在这样一个情景下性能可以到多少。
实际上笔者在阿里云可以买到的主机上都试了一下,鉴于主流 redis 版本还是只能 用到单核的,所以这个量其实相差并不大——单单单的情景,永远会先达到瓶颈,跟硬 件关系真不大。
以下测试数据均在本地开发与阿里云主机 VPC 环境中测试。
前者规格:
-
CPU Intel i7 4770 @ 3.4GHz
-
32GiB 内存
-
loopback 网卡
后者:
-
redis 服务端所在实例
-
规格 ecs.c5.xlarge IO优化实例
-
vCPU x 4 (Skylake Xeon Platinum 8163 @ 2.5GHz)
-
8GiB 内存
-
500k pps 包速率上限
-
1.5Gbps 千兆网 (这个拿 iperf3 测过,可以跑超过千兆)
-
-
redis-benchmark 与 测试程序所在实例
-
规格 ecs.c5.x2large IO优化实例
-
vCPU x 8 (Skylake Xeon Platinum 8163 @ 2.5GHz)
-
16GiB 内存
-
800k pps 包速率上限
-
1.5Gbps 千兆网 (这个拿 iperf3 测过,可以跑超过千兆)
-
redis 版本 4.0.10
由于阿里云环境用的是两个节点,所以会更实际地体现出网络的影响。消息载荷采 取了和实际发送大小接近的 250 字节。
先看一下 redis-benchmark
仅消费者
# 一个客户端连接,lpop 消息大小没意义
$ redis-benchmark -t lpop -c 1 -l
====== LPOP ======
100000 requests completed in 2.77 seconds
1 parallel clients
3 bytes payload
keep alive: 1
100.00% <= 1 milliseconds
36101.08 requests per second
仅生产者
# 一个客户端连接,默认 3 字节载荷
$ redis-benchmark -t rpush -c 1 -l
====== RPUSH ======
100000 requests completed in 2.91 seconds
1 parallel clients
3 bytes payload
keep alive: 1
100.00% <= 1 milliseconds
100.00% <= 1 milliseconds
34317.09 requests per second
# 一个客户端连接,250 字节载荷
$ redis-benchmark -t rpush -c 1 -l -d 250
====== RPUSH ======
100000 requests completed in 3.03 seconds
1 parallel clients
250 bytes payload
keep alive: 1
100.00% <= 1 milliseconds
100.00% <= 1 milliseconds
32959.79 requests per second
消费者与生产者一起
# 都是一个连接,载荷大小 250 字节
100000 requests completed in 3.14 seconds
1 parallel clients
250 bytes payload
keep alive: 1
100.00% <= 1 milliseconds
100.00% <= 1 milliseconds
31857.28 requests per second
这里可以得到两个 初步结论 :
-
载荷大小在本地回环地址,敏感程度不大 (实际测试要达到 2KiB 以上出现下 降趋势,10KiB只有原来一半左右)
-
单连接的 QPS 上限明显,无法突破 30k/s 这个级别,换更好的 CPU 也无法 显著提高
提高连接数
提高连接数,或者引入更多发送 CPU 都能提高吞吐,但随着工作资源的增加,提 升的衰减也很明显。
分析
测试的时候可以注意观察系统负载与 CPU 使用情况,会发现 user 比 system 甚 至还低,说明更多的在等待 IO 以及上下文切换上。
问题主要在于,网络操作过于频繁,在这种情景下,交互一次命令就需要从网络上 收发一次,考虑到 TCP/IP 网络多层包的封装过程,载荷的有效传输本身是一定有 极限的,特别小的载荷,在封装包中占比低于一定比例时(例如考虑 TCP 包头的 20 个字节),加上每次操作的交互都需要程序对 socket 进行一次查询(即使使 用 kqueue 或者 epoll 交给操作系统内核)还是避免不了大量的操作。打个比方, 就好像每次出门公办,回来立刻找财务报销,那么财务每时每刻都只能处理一个 人的一张单子。实践中对于规模较大,流程冗长的流程往往采取批量,定时统一 处理的方式来提高执行的效率。对于网络交互来说,这个思路也是可行的。
在 redis 中,我们可以利用 管道 (pipeline) 来实现交互命令的批量操作。
实际上在测试的时候根据调整消费者数量以及 pipe 的值会发现, 当一个 key 上 生产消费产生热点的时候,也会对流量产生影响 ,综合考虑 redis 自身是单线程 的,竞争越激烈的时候对吞吐影响也越大。
测试程序
源代码戳这里1。
为了便于解释,或者说模拟真实使用情景,这里引入使用 go 写的测试程序,也 能得到类似的结论。
总结一下测试得到的一些结论
-
连接或者客户端(不论是生产还是消费)的 CPU 对 QPS 提升效果不如批量管道
-
当消息载荷大小大到一定程度时,批量数据的增加对 QPS 提升不再敏感
-
实际在测试的时候,多开一对生产消费换另外一个键名,与原先那对加起来的
吞吐量总和不会提高, 说明测得的值,基本上是 redis 服务端单 CPU 的极限
下面再专门提一下批量操作的代码是如何发送 redis 命令的。
批量 push
如果情景比较简单,每次都预先已经确定至少有多少数据,可以直接一次打包发送:
pipe := client.Pipeline()
for i := 0; i < pipesize; i++ {
pipe.RPush(key, payload)
}
r, err := pipe.Exec()
如果想进一部提高发送效率,一般会在代码内部使用队列排队后,再分批批量发送, 而数据产生来源有时是无法控制速度与节奏的,故需要考虑两种情景
-
当累计数量达到阈值时,用于数据产生量集中的情景
-
当距离上次发送时间差达到一个值时,为了防止消息断流导致未满足第一条件 的剩余数据发不出去的情况
func groupSend(
client *redis.Client, key string,
size int, q chan int, rq chan int) {
// 用于传递给发送方法的数据
buf := make([]([]byte), 0, size)
var cnt int = 0
var timestamp = time.Now().UnixNano()
for {
select {
case b, ok := <-q:
// log.Printf("got: %d\n", ii)
if !ok {
// 有些情景 输入 chan 会关闭
// 这里 ok == false 说明不会再有输入
q = nil
break
}
cnt++
buf = append(buf, b)
if cnt >= size {
// 超过数量阈值后,批量发送一次
send(client, key, buf, rq)
buf = make([]([]byte), 0, size)
cnt = 0
}
timestamp = time.Now().UnixNano()
default:
now := time.Now().UnixNano()
// log.Printf("delta: %d\n", now-timestamp)
// log.Printf("len: %d\n", len(buf))
if (now - timestamp) > 1e9 {
// 若没有新输入消息,则检查距离上一次发送时间差
// 超过后也进行一次批量发送
send(client, key, buf, rq)
buf = make([]([]byte), 0, size)
cnt = 0
}
timestamp = time.Now().UnixNano()
time.Sleep(1 * time.Second)
}
if q == nil && len(buf) <= 0 {
break
}
}
}
批量 pop
这里有个技巧,可以根据官方文档说明2。
一般客户端取 list 数据时使用 blpop
命令,这样客户端保持连接不消耗 CPU
,只等待 IO 。但是如果一个管道中 N 个命令都传递 blpop
命令,那么当队列
只有 M (M < N)
个消息时,整个命令就需要等待 (N - M) * $sleep
时间,
显然不符合需求。按文档解释,我们需要改成,先传递 N - 1
个 lpop
命令,
最后一个再传递 blpop
,达到效果是前面的取不到立即返回 nil
,最后一个
才进行等待,最差情况下也只等待一次,与原来单个 blpop
行为几乎一致。
func consumer(option *Option, q chan int) {
opt := &redis.Options{
Addr: option.Addr,
Password: option.Pass,
DB: option.DB,
}
batch := 1
if option.Pipe > 1 {
batch = option.Pipe
}
client := redis.NewClient(opt)
defer client.Close()
for {
pipe := client.Pipeline()
// 先排入 N - 1 个 lpop
for i := 0; i < batch-1; i++ {
pipe.LPop(option.Key)
}
// 最后排入 blpop
pipe.BLPop(time.Second*1, option.Key)
r, err := pipe.Exec()
pipe.Close()
// 批量处理返回结果,取不到数据的时候会返回 nil
// 确定类型的语言这里处理起来相对麻烦,例如 go-redis 客户端
// 对 blpop 与 lpop 的返回结果封装不同,检查一下类型相应处理
for _, re := range r {
switch re.(type) {
case *redis.StringCmd:
// log.Println(re.Err() == redis.Nil)
if re.Err() != redis.Nil {
// log.Println(re.(*redis.StringCmd).Val())
q <- 1
}
case *redis.StringSliceCmd:
// log.Println(re.Err() == redis.Nil)
if re.Err() != redis.Nil {
// log.Println(re.(*redis.StringSliceCmd).Val()[1])
q <- 1
}
default:
}
}
if err != nil {
if err != redis.Nil {
log.Println("error not nil")
log.Println(err)
}
}
}
}
批量操作的风险
从 redis 具体例子来说,管道本身如果命令有相互依赖关系,有一句命令出错后, 后续命令将不会执行,可能会引起一定的风险,故在使用此类消息时应在业务上考 虑消息漏发送的情景,或者用于允许有误差的业务。
对于消息类情景,不仅限于 redis ,批量收取消息到客户端后,由于消息在业务 上都是一条一条处理地,出现异常时若未妥善处理,则可能造成批量收取的后续消 息丢失,尽量考虑使用消息设施例如 AMQP 的 ACK 机制, Kafka 的 offset commit 等。
总结
连接数 | 管道(批量) | 消息大小 | 吞吐量(QPS) |
---|---|---|---|
1 | 1 | 250bytes | 11k |
1 | 10 | 250bytes | 51k |
1 | 50 | 250bytes | 130k |
1 | 100 | 250bytes | 220k |
1 | 200 | 250bytes | 220k |
2 | 100 | 250bytes | 275k |
4 | 100 | 250bytes | 340k |
8 | 100 | 250bytes | 410k |
16 | 100 | 250bytes | 480k |
管道对使用 redis 的操作吞吐量提升显著,使用管道往往比增加硬件资源或者连 接数更有效,但也伴随着相应的风险。推荐在实际情景中挑选合适的管道大小。
\_\_END\_\_