各位巨巨好,本次笔者想来简单谈一下集中收集多台主机上利用Python logging模块产生的日志的办法。

TL;DR

  • 利用logging自带的DatagramHandler发送UDP包
  • 自定义添加Filter来添加发送的主机名
  • 收集的机器上再转发至Kafka持久化
  • 利用kafkacat命令来查看

思路

对于生产环境来说,收集日志无非两种大方向。或者, 其中推还可以分为同步异步

  • 推:由产生日志的程序自身发送出来
  • 拉:由远端主机来目标机器拉取日志(文件或本地端口等)
  • 同步:由产生日志的程序自身发送出来
  • 异步:主机上安装agent类程序不影响业务程序进行日志推送

这些特点各有优缺点,所以笔者需要在这里先设置目标要求

  • 不影响业务程序的功能,以及尽量不性能
  • 不采用agent类程序发送,简化部署与维护
  • 存储(持久化)的日志是可以流式提供给使用者的
  • 使用者可以方便的对日志内容进行过滤,历史记录并不十分重要

笔者在这里也必须首先申明,日志的内容是程序运行时的日志,不作业务统计用,比如错误栈,调式追踪信息等。这些内容都是不那么重要,丢失一些无足轻重

最后一点是选择这样的方案是出于,第一业务主机并不多(不超过100台),第二产生日志内容并不算大(高峰流量每秒大约1000条以内,每条日志一般不足1KiB),第三管理成本大于应用成本(即尽量让部署和运用简单,配置安装集中化)。

现成方案

笔者曾经在其他场景下也做过类似的需求,一般都是以在目标主机上安装agent类 程序的方式进行收集,相比之下运用的主机可能不由运维部门控制,要安装程序无法推动。另外由于这些软件所使用的语言与运行环境多种多样对于运维的有门槛(例如Fluentd使用Ruby,Logstash使用JRuby,Scribe使用CPP,Flume等其他使用Java等),对组织的管理是一项挑战。并且因运用的不熟悉,一旦部署的软件在生产环境出现问题(例如程序崩溃、端口宕机、甚至性能问题)无从下手

所以笔者选定了使用UDP方式进行发送,这样的方案有以下优点:

  • Python logging模块自带DatagramHandler,不需要依赖任何第三方模块,理论上所有Python实现都可以用
  • UDP报文发送不面向连接,即使目标无法到达,对业务程序本身几乎没有影响
  • 仅需要对logging进行配置,无需其他繁琐的配置

选用Kafka作为日志持久化的优点:

  • 基于消费者指针偏移的消费,多消费者性能开销较小
  • 可以限制同一个日志话题(topic)的日志量,便于管理
  • 设计时即考虑了分布式,容灾、扩容等相对靠谱
  • 一般有处理数据的业务都会有Java与ZooKeeper(未来可能非必要)环境

我们会得到如下的拓扑结构:

+-------+
| hosts |+
+-------+|+
 +-------+|
  +---+---+
      |
      |
      v UDP
      |
      |
+-----+-----+
| collector |
+-----+-----+
      |
      |
      v Kafka
      |
      |
  +---+---+           +----------+
  | kafka +--->--->---+ consumer |
  +-------+           +----------+

方案的难点与实现

logging模块的配置

Python的日志模块设计和Java非常相似;根据Python Guide中的日志章节, 有三种方案可以配置。利用ini文件的fileConfig,利用Python字典类型的dictConfig,与原生的Python源代码。但文档也仅此而已,并没有给出一些情景的例子,最后还是得自己摸索。后文提到的添加Filter时无法实现或者非常不直观,在阅读了logging标准库源代码后发现某些属性前二者是无法实现的,所以笔者最后还是选择了第三种。

在Python代码中代入任意位置的源代码文件可以利用 flask.config.Config.from_pyfile。如果不想依赖Flask的话,直接把其中的实现抄过来就是。

配置文件的内容为

# -*- coding: utf-8 -*-
import logging
import logging.handlers
import socket
import sys

class HostnameFilter(logging.Filter):
    hostname = ''

    def __init__(self):
        self.hostname = socket.gethostname()


    def filter(self, record):
        record.hostname = self.hostname
        return True

hostname = HostnameFilter()

h = logging.Formatter(
    '[%(asctime)s %(levelname)-7s %(hostname)s(%(name)s) '
    '<%(process)d> %(filename)s:%(lineno)d] %(message)s')
f = logging.Formatter(
    '[%(asctime)s %(levelname)-7s (%(name)s) '
    '<%(process)d> %(filename)s:%(lineno)d] %(message)s')

console = logging.StreamHandler(sys.stderr)
console.setFormatter(f)
console.setLevel(logging.INFO)

udp = logging.handlers.DatagramHandler('yourhostname', 5152)
udp.setFormatter(h)
udp.setLevel(logging.INFO)
udp.addFilter(hostname)

logger = logging.getLogger()
logger.handlers.clear()
logger.addHandler(console)
logger.addHandler(udp)
logger.setLevel(logging.INFO)

logger = logging.getLogger('hera')
logger.handlers.clear()
logger.addHandler(console)
logger.addHandler(udp)
logger.setLevel(logging.INFO)
logger.propagate = False

logger = logging.getLogger('gunicorn.error')
logger.handlers.clear()
logger.addHandler(console)
logger.addHandler(udp)
logger.setLevel(logging.INFO)
logger.propagate = False

Python logging的记录中有诸多属性但是没有主机名,所以需要自己添加。

注意这里的Filter是添加在Handler上的。

收集器与转发

这个就相对简单了,并且可控。需要注意的是UDP发出来的内容并非纯文本而是Pickle编码过的内容,需要自己解码并重新格式化。

# -*- coding: utf-8 -*-
import argparse
from datetime import datetime
import logging
import pickle
import struct
import sys

from kafka import KafkaClient, SimpleProducer

from wichita import app, ready
from wichita import logger as _logger

logger = _logger.getChild('logging_collector')

def format(attr):
    attr['message'] = logging.makeLogRecord(attr).getMessage()
    attr['time'] = datetime.fromtimestamp(attr['created'])

    fmt = ('[%(time)s %(levelname)-7s %(hostname)s(%(name)s) '
           '<%(process)d> %(filename)s:%(lineno)d] %(message)s')

    return fmt % attr


def run(brokers, topic):

    client = KafkaClient(brokers)
    producer = SimpleProducer(client)

    while True:
        try:
            msg = process_line()
            producer.send_messages(topic, msg)
        except Exception as err:
            logger.exception('error')


def process_line():
        dgram_size = sys.stdin.buffer.read(4)
        if len(dgram_size) < 4:
            return
        slen = struct.unpack('>L', dgram_size)[0]
        data = sys.stdin.buffer.read(slen)

        while len(data) < slen:
            data = data + sys.stdin.buffer.read(slen - len(data))

        t = pickle.loads(data)

        msg = format(t).encode('utf-8')
        return msg


def main():
    ready()

    ap = argparse.ArgumentParser()
    ap.add_argument('-t', '--topic', required=True)
    ap.add_argument('-B', '--kafka-brokers',
                    required=True, action='append')

    options = ap.parse_args()
    logger.info(options)

    run(options.kafka_brokers, options.topic)


if __name__ == '__main__':
    sys.exit(main())

查看日志

Kafkacat

$ /path/to/kafkacat -B brokers -t topic ...

总结

使用自带UDP发送日志的效果既不影响务对性能影响也不大。但随着数据量变大或网络 质量的原因造成数据丢失的情况(毕竟不面向连接),根据实际需求可以采取更可靠的 传输方式。 最后笔者想说的是选用的方案因综合考虑是否适合运用的环境和管理运用的人,这样才不会引起解决问题的同时由引入了更大的问题。

参考资料