1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
| # -*- coding: utf-8 -*- import argparse import json import logging import time import traceback from xmlrpc import client
import psutil import requests from supervisor.xmlrpc import SupervisorTransport
logging.basicConfig(format='[%(levelname)s][%(asctime)s] %(message)s', level=logging.INFO)
DEBUG = False LOCAL_ADDRESS = '127.0.0.1' INVALID_STATE_NAMES = ['BACKOFF', 'FATAL', 'EXITED', 'UNKNOWN'] INVALID_STATUS_PROCESS_INFO = {}
def get_local_address(nic_prefix='eth'): """获取本机的IP地址
当存在多个网卡时会按照字典序排序,获取名称前缀符合条件的第一张网卡的IPv4地址并返回
Args: nic_prefix:
Returns:
""" nic_infos = psutil.net_if_addrs() nic_names = sorted(list(nic_infos.keys()))
for nic_name in nic_names: if not nic_name.startswith(nic_prefix): continue for addr_info in nic_infos.get(nic_name, []): if addr_info.family.name == 'AF_INET' and addr_info.address != '127.0.0.1': return addr_info.address
raise OSError('Local Address not found')
def send_message(content, level=5): """发送告警信息 TODO: 这里可以根据自己的需要实现告警信息的发送
Args: content: _description_ level: _description_. Defaults to 5. """ print(content)
def get_proxy(server_url): """获取supervisor的本地xmlrpc客户端
Args: server_url:
Returns:
""" return client.ServerProxy('http://127.0.0.1', transport=SupervisorTransport(None, None, serverurl=server_url))
def watchdog(delay_times, max_retry_times, server_url): global INVALID_STATUS_PROCESS_INFO proxy = get_proxy(server_url)
total = 0 invalid = 0
for process_info in proxy.supervisor.getAllProcessInfo(): process_name = process_info['name'] total += 1 if process_info['statename'] in INVALID_STATE_NAMES: invalid += 1 if process_info['name'] not in INVALID_STATUS_PROCESS_INFO: INVALID_STATUS_PROCESS_INFO[process_name] = { 'info': process_info, 'detect_times': 0, 'retry_times': 0, } # 探测次数+1 INVALID_STATUS_PROCESS_INFO[process_name]['detect_times'] += 1 logging.info(f'Detected invalid status process: {process_name}') logging.info(json.dumps(process_info, indent=2, ensure_ascii=False)) if INVALID_STATUS_PROCESS_INFO[process_name]['detect_times'] <= delay_times: # 如果探测到异常的次数少于指定的次数,那么本次只发送告警,不做任何操作 error_msg = (f'检测到程序状态异常: {process_name}\n' f'状态: {process_info["statename"]}\n' f'检测到的异常次数: {INVALID_STATUS_PROCESS_INFO[process_name]["detect_times"]}\n' f'尝试重启次数: {INVALID_STATUS_PROCESS_INFO[process_name]["retry_times"]}') send_message(error_msg) continue
if INVALID_STATUS_PROCESS_INFO[process_info['name']]['retry_times'] > max_retry_times: # 如果探测到异常的次数少于指定的次数,那么本次只发送告警,不做任何操作 error_msg = (f'检测到程序状态异常: {process_name}\n' f'状态: {process_info["statename"]}\n' f'检测到的异常次数: {INVALID_STATUS_PROCESS_INFO[process_name]["detect_times"]}\n' f'!尝试重启次数(已超过重试次数): {INVALID_STATUS_PROCESS_INFO[process_name]["retry_times"]}') send_message(error_msg, level=1) continue
# 尝试重启程序 try: proxy.supervisor.stopProcess(process_name) proxy.supervisor.startProcess(process_name) except Exception: error_msg = f'Failed to restart process: {process_name}\n{traceback.format_exc()}' logging.error(error_msg) send_message(error_msg) INVALID_STATUS_PROCESS_INFO[process_name]['retry_times'] += 1
else: # 如果程序正常运行,那么删除非法状态的记录 if process_name in INVALID_STATUS_PROCESS_INFO: error_msg = (f'程序恢复正常: {process_name}\n' f'状态: {process_info["statename"]}\n' f'检测到的异常次数: {INVALID_STATUS_PROCESS_INFO[process_name]["detect_times"]}\n' f'尝试重启次数(已超过重试次数): {INVALID_STATUS_PROCESS_INFO[process_name]["retry_times"]}') send_message(error_msg) del INVALID_STATUS_PROCESS_INFO[process_name] return total, invalid
if __name__ == '__main__': parser = argparse.ArgumentParser(prog='watchdog', description="### Watchdog ###") parser.add_argument('url', metavar='XML-RPC_URL', help='Supervisor XML-RPC API url.') parser.add_argument('-i', '--interval', metavar='SECONDS', dest='interval', type=int, default=60, help='How often the check & handling loop takes ' 'action (in seconds, default 60).') parser.add_argument('-r', '--retries', metavar='TIMES', dest='retries', type=int, default=3, help='How many times that watchdog will allow when ' 'attempting to restart the program before giving ' 'up (default 3).') parser.add_argument('-d', '--delays', metavar='LOOPS', dest='delays', type=int, default=5, help='How many loops that watchdog will wait before ' 'taking the restart action (default 5).') args = parser.parse_args()
LOCAL_ADDRESS = get_local_address()
logging.info('Watchdog Started.') try: while True: logging.info('Start to detect...') count, invalid_count = watchdog(delay_times=args.delays, max_retry_times=args.retries, server_url=args.url) logging.info(f'End to detect...count={count}, invalid_count={invalid_count}') time.sleep(args.interval) except KeyboardInterrupt: logging.info('Watchdog Stopped.') except Exception: logging.error(traceback.format_exc())
|