上个月用python的gevent协程库写了一个tcp服务。日志库使用python标准日志库logging。一个月后,发现一个偶发的bug。这个bug发生时,用python的标准日志库自带的FileHandler写的日志会发往socket占用的文件描述符fd。结果就是,客户端收到了本要打印到磁盘上的日志。花了不少时间定位排查这个bug,仍然没有结果。我开始怀疑是gevent协程库和python的标准日志库logging有冲突。协程库会错误的把logging打开的文件描述符fd关闭并分配给新创建的socket,于是日志就打印到socket占用的fd了。
后来,写了一个检测脚本用来监控这个情况发生的概率。该脚本每分钟会检查进程占用的所有fd,一但发现用来打印本地日志的文件fd不见了,就重启服务进程。核心代码如下:
lsof -p $pid | grep -q ${log_file_name}
if [ $? != 0 ]
then
# 报警代码 ...
fi
发现bug发生的频率大概是一个星期一次。但这毕竟根本上解决不了问题啊,又找不到bug的原因,怎么办?
那就自定义一个File Handler吧!
决定自定义一个File Handler,这个File Handler工作在另外一个单独的进程,这样无论如何日志用的fd都不会跟主进程的各种socket用的fd冲突了吧。
代码如下,主要用到的技术进程通讯和用python的__getattribute__
魔法把截获类实例的方法调用。这样,只需要把旧的代码中的File Handler(我用了TimedRotatingFileHandler)换成自定义的Handler Class,所有其他旧代码都无需改动。
#!/usr/bin/env python3
'''
created by kamuszhou*AT*tencent.com, zausiu*AT*gmail.com http://ykyi.net
Nov 20, 2018
'''
import logging
import logging.handlers
from functools import partial
from multiprocessing import Process, Queue
class MySpecialHandler(logging.StreamHandler):
def __init__(self, *args, **kargs):
self._q = Queue()
self._p = Process(target=MySpecialHandler.__run, args=(self._q,))
self._p.start()
self._q.put(('__init__', args, kargs))
def join(self):
self._p.join()
def __run(q):
handler = None
while True:
# op, params = q.get()
method, args, kwargs = q.get()
if method == '__init__':
handler = logging.handlers.TimedRotatingFileHandler(*args, **kwargs)
else: # 主进程的日志调用实际上被转到这里
getattr(handler, method)(*args, **kwargs)
def __proxy(self, name, *args, **kwargs):
# 把调用的方法名和方法参数通过Queue传到专门的日志进程。
self._q.put((name, args, kwargs))
fun = logging.StreamHandler.__getattribute__(self, name)
# print('call method: ', name, args, kwargs)
# 如果是setLevel函数,再调用一次父类的方法
if name in {'setLevel'}:
return fun(*args, **kwargs)
def __getattribute__(self, name):
'''
Hook大法!截获所有方法
'''
attr = logging.StreamHandler.__getattribute__(self, name)
if hasattr(attr, '__call__') and name not in {'join', 'emit'}:
return partial(MySpecialHandler.__proxy, self, name)
else:
return attr
if __name__ == '__main__':
handler = MySpecialHandler('/data/tmp/ttt.log', when='D', interval=1, backupCount=90)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s: %(levelname)s %(message)s')
handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.propagate = False # Don't propagate the logging to ROOT
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.debug('debug testtttttttt')
logger.info('info testtttttttttt')
logger.warn('warn testtttttttttt')
logger.error('error testtttttttt')
logger.critical('critical testttttt')
handler.join()
这里自定义的日志进程类只是一个很粗糙的实现,一但跑起来,只能手动杀进程。反正我的使用场景是一个服务。所以,我也懒得加‘优雅的退出代码’。
另外,这里创建自定义日志Handler的父类是StreamHandler,它还有一个重要的函数是emit。如果想定制这么一个Handler,把日志发给kafka而不需要起进程。则子类需要重写父类的emit方法。比如:
def emit(self, record):
msg = self.format(record) # 日志会以record的形式传入该函数,用format把它格式化
self.kafka_broker.send(msg, self.topic)