
| # -*- coding: utf-8 -*- import argparse import json import logging import time import traceback from xmlrpc import client
import psutil import requests from supervisor.xmlrpc import SupervisorTransport
logging.basicConfig(format='[%(levelname)s][%(asctime)s] %(message)s', level=logging.INFO)
DEBUG = False LOCAL_ADDRESS = '127.0.0.1' INVALID_STATE_NAMES = ['BACKOFF', 'FATAL', 'EXITED', 'UNKNOWN'] INVALID_STATUS_PROCESS_INFO = {}
def get_local_address(nic_prefix='eth'): """获取本机的IP地址
当存在多个网卡时会按照字典序排序,获取名称前缀符合条件的第一张网卡的IPv4地址并返回
Args: nic_prefix:
Returns:
""" nic_infos = psutil.net_if_addrs() nic_names = sorted(list(nic_infos.keys()))
for nic_name in nic_names: if not nic_name.startswith(nic_prefix): continue for addr_info in nic_infos.get(nic_name, []): if addr_info.family.name == 'AF_INET' and addr_info.address != '127.0.0.1': return addr_info.address
raise OSError('Local Address not found')
def send_message(content, level=5): """发送告警信息 TODO: 这里可以根据自己的需要实现告警信息的发送
Args: content: _description_ level: _description_. Defaults to 5. """ print(content)
def get_proxy(server_url): """获取supervisor的本地xmlrpc客户端
Args: server_url:
Returns:
""" return client.ServerProxy('http://127.0.0.1', transport=SupervisorTransport(None, None, serverurl=server_url))
def watchdog(delay_times, max_retry_times, server_url): global INVALID_STATUS_PROCESS_INFO proxy = get_proxy(server_url)
total = 0 invalid = 0
for process_info in proxy.supervisor.getAllProcessInfo(): process_name = process_info['name'] total += 1 if process_info['statename'] in INVALID_STATE_NAMES: invalid += 1 if process_info['name'] not in INVALID_STATUS_PROCESS_INFO: INVALID_STATUS_PROCESS_INFO[process_name] = { 'info': process_info, 'detect_times': 0, 'retry_times': 0, } # 探测次数+1 INVALID_STATUS_PROCESS_INFO[process_name]['detect_times'] += 1 logging.info(f'Detected invalid status process: {process_name}') logging.info(json.dumps(process_info, indent=2, ensure_ascii=False)) if INVALID_STATUS_PROCESS_INFO[process_name]['detect_times'] <= delay_times: # 如果探测到异常的次数少于指定的次数,那么本次只发送告警,不做任何操作 error_msg = (f'检测到程序状态异常: {process_name}\n' f'状态: {process_info["statename"]}\n' f'检测到的异常次数: {INVALID_STATUS_PROCESS_INFO[process_name]["detect_times"]}\n' f'尝试重启次数: {INVALID_STATUS_PROCESS_INFO[process_name]["retry_times"]}') send_message(error_msg) continue
if INVALID_STATUS_PROCESS_INFO[process_info['name']]['retry_times'] > max_retry_times: # 如果探测到异常的次数少于指定的次数,那么本次只发送告警,不做任何操作 error_msg = (f'检测到程序状态异常: {process_name}\n' f'状态: {process_info["statename"]}\n' f'检测到的异常次数: {INVALID_STATUS_PROCESS_INFO[process_name]["detect_times"]}\n' f'!尝试重启次数(已超过重试次数): {INVALID_STATUS_PROCESS_INFO[process_name]["retry_times"]}') send_message(error_msg, level=1) continue
# 尝试重启程序 try: proxy.supervisor.stopProcess(process_name) proxy.supervisor.startProcess(process_name) except Exception: error_msg = f'Failed to restart process: {process_name}\n{traceback.format_exc()}' logging.error(error_msg) send_message(error_msg) INVALID_STATUS_PROCESS_INFO[process_name]['retry_times'] += 1
else: # 如果程序正常运行,那么删除非法状态的记录 if process_name in INVALID_STATUS_PROCESS_INFO: error_msg = (f'程序恢复正常: {process_name}\n' f'状态: {process_info["statename"]}\n' f'检测到的异常次数: {INVALID_STATUS_PROCESS_INFO[process_name]["detect_times"]}\n' f'尝试重启次数(已超过重试次数): {INVALID_STATUS_PROCESS_INFO[process_name]["retry_times"]}') send_message(error_msg) del INVALID_STATUS_PROCESS_INFO[process_name] return total, invalid
if __name__ == '__main__': parser = argparse.ArgumentParser(prog='watchdog', description="### Watchdog ###") parser.add_argument('url', metavar='XML-RPC_URL', help='Supervisor XML-RPC API url.') parser.add_argument('-i', '--interval', metavar='SECONDS', dest='interval', type=int, default=60, help='How often the check & handling loop takes ' 'action (in seconds, default 60).') parser.add_argument('-r', '--retries', metavar='TIMES', dest='retries', type=int, default=3, help='How many times that watchdog will allow when ' 'attempting to restart the program before giving ' 'up (default 3).') parser.add_argument('-d', '--delays', metavar='LOOPS', dest='delays', type=int, default=5, help='How many loops that watchdog will wait before ' 'taking the restart action (default 5).') args = parser.parse_args()
LOCAL_ADDRESS = get_local_address()
logging.info('Watchdog Started.') try: while True: logging.info('Start to detect...') count, invalid_count = watchdog(delay_times=args.delays, max_retry_times=args.retries, server_url=args.url) logging.info(f'End to detect...count={count}, invalid_count={invalid_count}') time.sleep(args.interval) except KeyboardInterrupt: logging.info('Watchdog Stopped.') except Exception: logging.error(traceback.format_exc())
|