import argparse import json import os import requests import time DEBUG = False class colors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' def headline(color: bool) -> str: rv = "" if color: rv += colors.BOLD + colors.UNDERLINE rv += "NAGIOS Status:" if color: rv += colors.ENDC rv += "\n\n" return rv class nagios_service(dict): SID_OK = 2 SID_WARNING = 4 SID_ERROR = 16 # SNAME_OK = "OKAY" SNAME_WARNING_LAST = "WARNING_LAST" SNAME_ERROR_LAST = "ERROR_LAST" SNAME_FLAPPING = "FLAPPING" SNAME_WARNING = "WARNING" SNAME_ERROR = "ERROR" SNAME_UNKNOWN = "UNKNOWN" TIMEFORMAT = "%d.%m.%Y %H:%M" def __init__(self, service_dict, last) -> None: super().__init__(service_dict) self.__last__ = last def is_problem(self): return self.status() != self.SNAME_OK def host_name(self): return self['host_name'] def name(self): return self['description'] def status(self): status = self["status"] default = self.SNAME_UNKNOWN tm_interrest = time.localtime(time.time() - 60 * 60 * self.__last__) if status == self.SID_OK: if self.flapping(): return self.SNAME_FLAPPING elif self.__last_critical__() > tm_interrest: return self.SNAME_ERROR_LAST elif self.__last_warning__() > tm_interrest: return self.SNAME_WARNING_LAST return { self.SID_OK: self.SNAME_OK, self.SID_WARNING: self.SNAME_WARNING, self.SID_ERROR: self.SNAME_ERROR }.get(status, default) def last_okay(self): return time.strftime(self.TIMEFORMAT, time.localtime(self["last_time_ok"] / 1000)) def __last_warning__(self): return time.localtime(self["last_time_warning"] / 1000) def last_warning(self): return time.strftime(self.TIMEFORMAT, self.__last_warning__()) def __last_critical__(self): return time.localtime(self["last_time_critical"] / 1000) def last_critical(self): return time.strftime(self.TIMEFORMAT, self.__last_critical__()) def last_unknown(self): return time.strftime(self.TIMEFORMAT, time.localtime(self["last_time_unknown"] / 1000)) def info(self): return self.get("plugin_output") or self.get("long_plugin_output") def flapping(self): return self["is_flapping"] def __color__(self): return { self.SNAME_OK: colors.OKGREEN, self.SNAME_WARNING_LAST: colors.OKBLUE, self.SNAME_ERROR_LAST: colors.OKBLUE, self.SNAME_FLAPPING: colors.OKBLUE, self.SNAME_WARNING: colors.WARNING, self.SNAME_ERROR: colors.FAIL, self.SNAME_UNKNOWN: colors.OKCYAN }.get(self.status()) def __head__(self, color=False): rv = headline(color) rv += "+---------------------------------------------------------------------------------------------\n" rv += "| Host | Service Name | State | Last time Okay |\n" rv += "+---------------------------+-----------------+--------------+------------------+-------------\n" return rv def __foot__(self): return "+---------------------------+-----------------+--------------+------------------+-------------\n" def __str__(self, color=False): cols = [] cols.append((self.host_name, 25, False)) cols.append((self.name, 15, color)) cols.append((self.status, 12, color)) cols.append((self.last_okay, 16, color)) cols.append((self.info, 0, False)) # rv = "" for txt_method, l, c in cols: txt = txt_method() if len(rv) > 0: rv += " | " if c: if txt_method == self.last_okay: rv += colors.OKGREEN else: rv += self.__color__() if l > 0: rv += txt[:l] rv += " " * (l-len(txt)) else: rv += txt if c: rv += colors.ENDC # rv = "| " + rv + "\n" if DEBUG: rv += json.dumps(self, indent=4, sort_keys=True) return rv class __nagios_status__(dict): def __init__(self) -> None: super().__init__() def all_problems(self): rv = __nagios_status__() for host in self: for service in self[host]: if service.is_problem(): rv.__add_service__(host, service, service.__last__) return rv def __add_service__(self, host, service, last): if host not in self: self[host] = [] self[host].append(nagios_service(service, last)) def __str__(self, color=False) -> str: rv = "" for host in self: for service in self[host]: if len(rv) == 0: rv += service.__head__(color=color) rv += service.__str__(color=color) if len(rv) > 0: rv += service.__foot__() else: rv += headline(color) rv += "No monitorings to be reported." return rv + "\n" class nagios_status(__nagios_status__): URL = "nagios4/cgi-bin/statusjson.cgi?query=servicelist&details=true" KEY_DATA = 'data' KEY_SERVICELIST = 'servicelist' def __init__(self, host, secure, last) -> None: super().__init__() # if secure: prefix = "https://" else: prefix = "http://" r = requests.get(os.path.join(prefix, host, self.URL)) data = r.json() hosts = data.get(self.KEY_DATA, {}).get(self.KEY_SERVICELIST, {}) for host in hosts: for service in hosts.get(host, {}): self.__add_service__(host, hosts.get(host).get(service), last) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Returns the status of all or partial nagios monitorings.") parser.add_argument("hostname", help="The hostname and port of the nagios server (e.g. nagios:8080)") parser.add_argument("-a", "--all", action="store_true", default=False, help="print the status of all nagios monitorings. Default is all problems.") parser.add_argument("-last", "--last", type=int, default=0, help="Report problems of the last l hours.") parser.add_argument("-s", "--secure", action="store_true", default=False, help="Enables secure connection (https)") parser.add_argument("-m", "--monochrome", action="store_true", default=False, help="No colored output") args = parser.parse_args() # try: s = nagios_status(args.hostname, args.secure, args.last) except requests.ConnectionError: print(headline(not args.monochrome) + "Can not connect to given host.") except requests.exceptions.JSONDecodeError: print(headline(not args.monochrome) + "No data received. Nagios is possibly down.") else: if args.all: print(s.__str__(color=not args.monochrome)) else: print(s.all_problems().__str__(color=not args.monochrome))