123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- import argparse
- import json
- import os
- import requests
- import time
-
- DEBUG = False
-
-
- class colors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKCYAN = '\033[96m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
-
-
- def headline(color: bool) -> str:
- rv = ""
- if color:
- rv += colors.BOLD + colors.UNDERLINE
- rv += "NAGIOS Status:"
- if color:
- rv += colors.ENDC
- rv += "\n\n"
- return rv
-
-
- class nagios_service(dict):
- SID_OK = 2
- SID_WARNING = 4
- SID_ERROR = 16
- #
- SNAME_OK = "OKAY"
- SNAME_WARNING_LAST = "WARNING_LAST"
- SNAME_ERROR_LAST = "ERROR_LAST"
- SNAME_FLAPPING = "FLAPPING"
- SNAME_WARNING = "WARNING"
- SNAME_ERROR = "ERROR"
- SNAME_UNKNOWN = "UNKNOWN"
- TIMEFORMAT = "%d.%m.%Y %H:%M"
-
- def __init__(self, service_dict, last) -> None:
- super().__init__(service_dict)
- self.__last__ = last
-
- def is_problem(self):
- return self.status() != self.SNAME_OK
-
- def host_name(self):
- return self['host_name']
-
- def name(self):
- return self['description']
-
- def status(self):
- status = self["status"]
- default = self.SNAME_UNKNOWN
- tm_interrest = time.localtime(time.time() - 60 * 60 * self.__last__)
- if status == self.SID_OK:
- if self.flapping():
- return self.SNAME_FLAPPING
- elif self.__last_critical__() > tm_interrest:
- return self.SNAME_ERROR_LAST
- elif self.__last_warning__() > tm_interrest:
- return self.SNAME_WARNING_LAST
- return {
- self.SID_OK: self.SNAME_OK,
- self.SID_WARNING: self.SNAME_WARNING,
- self.SID_ERROR: self.SNAME_ERROR
- }.get(status, default)
-
- def last_okay(self):
- return time.strftime(self.TIMEFORMAT, time.localtime(self["last_time_ok"] / 1000))
-
- def __last_warning__(self):
- return time.localtime(self["last_time_warning"] / 1000)
-
- def last_warning(self):
- return time.strftime(self.TIMEFORMAT, self.__last_warning__())
-
- def __last_critical__(self):
- return time.localtime(self["last_time_critical"] / 1000)
-
- def last_critical(self):
- return time.strftime(self.TIMEFORMAT, self.__last_critical__())
-
- def last_unknown(self):
- return time.strftime(self.TIMEFORMAT, time.localtime(self["last_time_unknown"] / 1000))
-
- def info(self):
- return self.get("plugin_output") or self.get("long_plugin_output")
-
- def flapping(self):
- return self["is_flapping"]
-
- def __color__(self):
- return {
- self.SNAME_OK: colors.OKGREEN,
- self.SNAME_WARNING_LAST: colors.OKBLUE,
- self.SNAME_ERROR_LAST: colors.OKBLUE,
- self.SNAME_FLAPPING: colors.OKBLUE,
- self.SNAME_WARNING: colors.WARNING,
- self.SNAME_ERROR: colors.FAIL,
- self.SNAME_UNKNOWN: colors.OKCYAN
- }.get(self.status())
-
- def __head__(self, color=False):
- rv = headline(color)
- rv += "+---------------------------------------------------------------------------------------------\n"
- rv += "| Host | Service Name | State | Last time Okay |\n"
- rv += "+---------------------------+-----------------+--------------+------------------+-------------\n"
- return rv
-
- def __foot__(self):
- return "+---------------------------+-----------------+--------------+------------------+-------------\n"
-
- def __str__(self, color=False):
- cols = []
- cols.append((self.host_name, 25, False))
- cols.append((self.name, 15, color))
- cols.append((self.status, 12, color))
- cols.append((self.last_okay, 16, color))
- cols.append((self.info, 0, False))
- #
- rv = ""
- for txt_method, l, c in cols:
- txt = txt_method()
- if len(rv) > 0:
- rv += " | "
- if c:
- if txt_method == self.last_okay:
- rv += colors.OKGREEN
- else:
- rv += self.__color__()
- if l > 0:
- rv += txt[:l]
- rv += " " * (l-len(txt))
- else:
- rv += txt
- if c:
- rv += colors.ENDC
- #
- rv = "| " + rv + "\n"
- if DEBUG:
- rv += json.dumps(self, indent=4, sort_keys=True)
- return rv
-
-
- class __nagios_status__(dict):
- def __init__(self) -> None:
- super().__init__()
-
- def all_problems(self):
- rv = __nagios_status__()
- for host in self:
- for service in self[host]:
- if service.is_problem():
- rv.__add_service__(host, service, service.__last__)
- return rv
-
- def __add_service__(self, host, service, last):
- if host not in self:
- self[host] = []
- self[host].append(nagios_service(service, last))
-
- def __str__(self, color=False) -> str:
- rv = ""
- for host in self:
- for service in self[host]:
- if len(rv) == 0:
- rv += service.__head__(color=color)
- rv += service.__str__(color=color)
- if len(rv) > 0:
- rv += service.__foot__()
- else:
- rv += headline(color)
- rv += "No monitorings to be reported."
- return rv + "\n"
-
-
- class nagios_status(__nagios_status__):
- URL = "nagios4/cgi-bin/statusjson.cgi?query=servicelist&details=true"
- KEY_DATA = 'data'
- KEY_SERVICELIST = 'servicelist'
-
- def __init__(self, host, secure, last) -> None:
- super().__init__()
- #
- if secure:
- prefix = "https://"
- else:
- prefix = "http://"
- r = requests.get(os.path.join(prefix, host, self.URL))
- data = r.json()
-
- hosts = data.get(self.KEY_DATA, {}).get(self.KEY_SERVICELIST, {})
- for host in hosts:
- for service in hosts.get(host, {}):
- self.__add_service__(host, hosts.get(host).get(service), last)
-
-
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="Returns the status of all or partial nagios monitorings.")
- parser.add_argument("hostname", help="The hostname and port of the nagios server (e.g. nagios:8080)")
- parser.add_argument("-a", "--all", action="store_true", default=False,
- help="print the status of all nagios monitorings. Default is all problems.")
- parser.add_argument("-last", "--last", type=int, default=0, help="Report problems of the last l hours.")
- parser.add_argument("-s", "--secure", action="store_true", default=False, help="Enables secure connection (https)")
- parser.add_argument("-m", "--monochrome", action="store_true", default=False, help="No colored output")
- args = parser.parse_args()
- #
- try:
- s = nagios_status(args.hostname, args.secure, args.last)
- except requests.ConnectionError:
- print(headline(not args.monochrome) + "Can not connect to given host.")
- except json.decoder.JSONDecodeError:
- print(headline(not args.monochrome) + "No data received. Nagios is possibly down.")
- else:
- if args.all:
- print(s.__str__(color=not args.monochrome))
- else:
- print(s.all_problems().__str__(color=not args.monochrome))
|