目录


先简单介绍一下事件的背景,相信只要有产品发布在手机应用端,那全员推送这件 事情肯定是逃不掉的;需求便是在尽量短的时间内将推送消息发出去,并保证良好 的送达率。整个推送系统后面再总结一篇完整的回顾。

这里对于 redis 的使用情景,仅使用 list 数据结构当作队列使用。至于是否有 更合适的消息中间件,在文末论述。

单单单…全是单线程操作的困境

单机单个连接单线程发送(仅指与 redis 的交互),显然没有充分地利用硬件资源 ,那首先我们可以来了解一下在这样一个情景下性能可以到多少。

实际上笔者在阿里云可以买到的主机上都试了一下,鉴于主流 redis 版本还是只能 用到单核的,所以这个量其实相差并不大——单单单的情景,永远会先达到瓶颈,跟硬 件关系真不大。

以下测试数据均在本地开发与阿里云主机 VPC 环境中测试。

前者规格:

  • CPU Intel i7 4770 @ 3.4GHz
  • 32GiB 内存
  • loopback 网卡

后者:

  • redis 服务端所在实例
    • 规格 ecs.c5.xlarge IO优化实例
    • vCPU x 4 (Skylake Xeon Platinum 8163 @ 2.5GHz)
    • 8GiB 内存
    • 500k pps 包速率上限
    • 1.5Gbps 千兆网 (这个拿 iperf3 测过,可以跑超过千兆)
  • redis-benchmark 与 测试程序所在实例
    • 规格 ecs.c5.x2large IO优化实例
    • vCPU x 8 (Skylake Xeon Platinum 8163 @ 2.5GHz)
    • 16GiB 内存
    • 800k pps 包速率上限
    • 1.5Gbps 千兆网 (这个拿 iperf3 测过,可以跑超过千兆)

*redis 版本 4.0.10*

由于阿里云环境用的是两个节点,所以会更实际地体现出网络的影响。消息载荷采 取了和实际发送大小接近的 250 字节。

先看一下 redis-benchmark

仅消费者

# 一个客户端连接,lpop 消息大小没意义
$ redis-benchmark -t lpop -c 1 -l
====== LPOP ======
  100000 requests completed in 2.77 seconds
  1 parallel clients
  3 bytes payload
  keep alive: 1

100.00% <= 1 milliseconds
36101.08 requests per second

仅生产者

# 一个客户端连接,默认 3 字节载荷
$ redis-benchmark -t rpush -c 1 -l
====== RPUSH ======
  100000 requests completed in 2.91 seconds
  1 parallel clients
  3 bytes payload
  keep alive: 1

100.00% <= 1 milliseconds
100.00% <= 1 milliseconds
34317.09 requests per second

# 一个客户端连接,250 字节载荷
$ redis-benchmark -t rpush -c 1 -l -d 250
====== RPUSH ======
  100000 requests completed in 3.03 seconds
  1 parallel clients
  250 bytes payload
  keep alive: 1

100.00% <= 1 milliseconds
100.00% <= 1 milliseconds
32959.79 requests per second

消费者与生产者一起

# 都是一个连接,载荷大小 250 字节
  100000 requests completed in 3.14 seconds
  1 parallel clients
  250 bytes payload
  keep alive: 1

100.00% <= 1 milliseconds
100.00% <= 1 milliseconds
31857.28 requests per second

这里可以得到两个 初步结论

  • 载荷大小在本地回环地址,敏感程度不大 (实际测试要达到 2KiB 以上出现下 降趋势,10KiB只有原来一半左右)
  • 单连接的 QPS 上限明显,无法突破 30k/s 这个级别,换更好的 CPU 也无法 显著提高

提高连接数

提高连接数,或者引入更多发送 CPU 都能提高吞吐,但随着工作资源的增加,提 升的衰减也很明显。

连接与吞吐量关系,消息大小 3 字节
连接与吞吐量关系,消息大小 3 字节
连接与吞吐量关系,消息大小 250 字节
连接与吞吐量关系,消息大小 250 字节

分析

测试的时候可以注意观察系统负载与 CPU 使用情况,会发现 user 比 system 甚 至还低,说明更多的在等待 IO 以及上下文切换上。

问题主要在于,网络操作过于频繁,在这种情景下,交互一次命令就需要从网络上 收发一次,考虑到 TCP/IP 网络多层包的封装过程,载荷的有效传输本身是一定有 极限的,特别小的载荷,在封装包中占比低于一定比例时(例如考虑 TCP 包头的 20 个字节),加上每次操作的交互都需要程序对 socket 进行一次查询(即使使 用 kqueue 或者 epoll 交给操作系统内核)还是避免不了大量的操作。打个比方, 就好像每次出门公办,回来立刻找财务报销,那么财务每时每刻都只能处理一个 人的一张单子。实践中对于规模较大,流程冗长的流程往往采取批量,定时统一 处理的方式来提高执行的效率。对于网络交互来说,这个思路也是可行的。

在 redis 中,我们可以利用 管道 (pipeline) 来实现交互命令的批量操作。

批量处理与吞吐量的关系,消息大小 3 字节
批量处理与吞吐量的关系,消息大小 3 字节
批量处理与吞吐量的关系,消息大小 250 字节
批量处理与吞吐量的关系,消息大小 250 字节

实际上在测试的时候根据调整消费者数量以及 pipe 的值会发现, 当一个 key 上 生产消费产生热点的时候,也会对流量产生影响 ,综合考虑 redis 自身是单线程 的,竞争越激烈的时候对吞吐影响也越大。

测试程序

源代码戳这里1

为了便于解释,或者说模拟真实使用情景,这里引入使用 go 写的测试程序,也 能得到类似的结论。

(单个连接)增加批量处理提高吞吐量的效果
(单个连接)增加批量处理提高吞吐量的效果
(已批量处理情况)增加连接提高吞吐量的效果
(已批量处理情况)增加连接提高吞吐量的效果

总结一下测试得到的一些结论

  • 连接或者客户端(不论是生产还是消费)的 CPU 对 QPS 提升效果不如批量管道
  • 当消息载荷大小大到一定程度时,批量数据的增加对 QPS 提升不再敏感
  • 实际在测试的时候,多开一对生产消费换另外一个键名,与原先那对加起来的

吞吐量总和不会提高, 说明测得的值,基本上是 redis 服务端单 CPU 的极限

下面再专门提一下批量操作的代码是如何发送 redis 命令的。

批量 push

如果情景比较简单,每次都预先已经确定至少有多少数据,可以直接一次打包发送:

pipe := client.Pipeline()
for i := 0; i < pipesize; i++ {
  pipe.RPush(key, payload)
}
r, err := pipe.Exec()

如果想进一部提高发送效率,一般会在代码内部使用队列排队后,再分批批量发送, 而数据产生来源有时是无法控制速度与节奏的,故需要考虑两种情景

  • 当累计数量达到阈值时,用于数据产生量集中的情景
  • 当距离上次发送时间差达到一个值时,为了防止消息断流导致未满足第一条件 的剩余数据发不出去的情况
func groupSend(
    client *redis.Client, key string,
    size int, q chan int, rq chan int) {

    // 用于传递给发送方法的数据
    buf := make([]([]byte), 0, size)
    var cnt int = 0
    var timestamp = time.Now().UnixNano()

    for {
        select {
        case b, ok := <-q:
            // log.Printf("got: %d\n", ii)
            if !ok {
                // 有些情景 输入 chan 会关闭
                // 这里 ok == false 说明不会再有输入
                q = nil
                break
            }
            cnt++
            buf = append(buf, b)
            if cnt >= size {
                // 超过数量阈值后,批量发送一次
                send(client, key, buf, rq)
                buf = make([]([]byte), 0, size)
                cnt = 0
            }
            timestamp = time.Now().UnixNano()
        default:
            now := time.Now().UnixNano()
            // log.Printf("delta: %d\n", now-timestamp)
            // log.Printf("len: %d\n", len(buf))
            if (now - timestamp) > 1e9 {
                // 若没有新输入消息,则检查距离上一次发送时间差
                // 超过后也进行一次批量发送
                send(client, key, buf, rq)
                buf = make([]([]byte), 0, size)
                cnt = 0
            }
            timestamp = time.Now().UnixNano()
            time.Sleep(1 * time.Second)
        }

        if q == nil && len(buf) <= 0 {
            break
        }
    }
}

批量 pop

这里有个技巧,可以根据官方文档说明2

一般客户端取 list 数据时使用 blpop 命令,这样客户端保持连接不消耗 CPU ,只等待 IO 。但是如果一个管道中 N 个命令都传递 blpop 命令,那么当队列 只有 M (M < N) 个消息时,整个命令就需要等待 (N - M) * $sleep 时间, 显然不符合需求。按文档解释,我们需要改成,先传递 N - 1lpop 命令, 最后一个再传递 blpop ,达到效果是前面的取不到立即返回 nil ,最后一个 才进行等待,最差情况下也只等待一次,与原来单个 blpop 行为几乎一致。

func consumer(option *Option, q chan int) {
    opt := &redis.Options{
        Addr:     option.Addr,
        Password: option.Pass,
        DB:       option.DB,
    }

    batch := 1
    if option.Pipe > 1 {
        batch = option.Pipe
    }

    client := redis.NewClient(opt)
    defer client.Close()

    for {
        pipe := client.Pipeline()

        // 先排入 N - 1 个 lpop
        for i := 0; i < batch-1; i++ {
            pipe.LPop(option.Key)
        }
        // 最后排入 blpop
        pipe.BLPop(time.Second*1, option.Key)

        r, err := pipe.Exec()
        pipe.Close()

        // 批量处理返回结果,取不到数据的时候会返回 nil
        // 确定类型的语言这里处理起来相对麻烦,例如 go-redis 客户端
        // 对 blpop 与 lpop 的返回结果封装不同,检查一下类型相应处理
        for _, re := range r {
            switch re.(type) {
            case *redis.StringCmd:
                // log.Println(re.Err() == redis.Nil)
                if re.Err() != redis.Nil {
                    // log.Println(re.(*redis.StringCmd).Val())
                    q <- 1
                }
            case *redis.StringSliceCmd:
                // log.Println(re.Err() == redis.Nil)
                if re.Err() != redis.Nil {
                    // log.Println(re.(*redis.StringSliceCmd).Val()[1])
                    q <- 1
                }
            default:
            }
        }

        if err != nil {
            if err != redis.Nil {
                log.Println("error not nil")
                log.Println(err)
            }
        }

    }
}

批量操作的风险

从 redis 具体例子来说,管道本身如果命令有相互依赖关系,有一句命令出错后, 后续命令将不会执行,可能会引起一定的风险,故在使用此类消息时应在业务上考 虑消息漏发送的情景,或者用于允许有误差的业务。

对于消息类情景,不仅限于 redis ,批量收取消息到客户端后,由于消息在业务 上都是一条一条处理地,出现异常时若未妥善处理,则可能造成批量收取的后续消 息丢失,尽量考虑使用消息设施例如 AMQP 的 ACK 机制, Kafka 的 offset commit 等。

总结

简要梳理一下 go 测试程序的结果数据
连接数 管道(批量) 消息大小 吞吐量(QPS)
1 1 250bytes 11k
1 10 250bytes 51k
1 50 250bytes 130k
1 100 250bytes 220k
1 200 250bytes 220k
2 100 250bytes 275k
4 100 250bytes 340k
8 100 250bytes 410k
16 100 250bytes 480k

管道对使用 redis 的操作吞吐量提升显著,使用管道往往比增加硬件资源或者连 接数更有效,但也伴随着相应的风险。推荐在实际情景中挑选合适的管道大小。

__END__


  1. https://github.com/aleiphoenix/redis-list-bench

  2. https://redis.io/commands/BLPOP#codeblpopcode-inside-a-codemulticode--codeexeccode-transaction