Nagios status module added

This commit is contained in:
Dirk Alders 2024-08-12 15:21:20 +02:00
parent 3d7f2eca05
commit fd556d92ad

200
status.py Normal file
View File

@ -0,0 +1,200 @@
import argparse
import json
import os
import requests
import time
DEBUG = False
class colors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def headline(color: bool) -> str:
rv = ""
if color:
rv += colors.BOLD + colors.UNDERLINE
rv += "NAGIOS Status:"
if color:
rv += colors.ENDC
rv += "\n\n"
return rv
class nagios_service(dict):
SID_OK = 2
SID_WARNING = 4
SID_ERROR = 16
#
SNAME_OK = "OKAY"
SNAME_FLAPPING = "FLAPPING"
SNAME_WARNING = "WARNING"
SNAME_ERROR = "ERROR"
SNAME_UNKNOWN = "UNKNOWN"
TIMEFORMAT = "%d.%m.%Y %H:%M"
def __init__(self, service_dict) -> None:
super().__init__(service_dict)
def is_problem(self):
return self["status"] != self.SID_OK or self.flapping()
def host_name(self):
return self['host_name']
def name(self):
return self['description']
def status(self):
status = self["status"]
default = self.SNAME_UNKNOWN
if status == self.SID_OK and self.flapping():
return self.SNAME_FLAPPING
return {
self.SID_OK: self.SNAME_OK,
self.SID_WARNING: self.SNAME_WARNING,
self.SID_ERROR: self.SNAME_ERROR
}.get(status, default)
def last_okay(self):
return time.strftime(self.TIMEFORMAT, time.localtime(self["last_time_ok"] / 1000))
def info(self):
return self.get("plugin_output") or self.get("long_plugin_output")
def flapping(self):
return self["is_flapping"]
def __color__(self):
return {
self.SNAME_OK: colors.OKGREEN,
self.SNAME_FLAPPING: colors.OKBLUE,
self.SNAME_WARNING: colors.WARNING,
self.SNAME_ERROR: colors.FAIL,
self.SNAME_UNKNOWN: colors.OKCYAN
}.get(self.status())
return colors.OKBLUE
def __head__(self, color=False):
rv = headline(color)
rv += "+-----------------------------------------------------------------------------------------\n"
rv += "| Host | Service Name | State | Last time Okay |\n"
rv += "+---------------------------+-----------------+----------+------------------+-------------\n"
return rv
def __foot__(self):
return "+---------------------------+-----------------+----------+------------------+-------------\n"
def __str__(self, color=False):
cols = []
cols.append((self.host_name, 25, False))
cols.append((self.name, 15, color))
cols.append((self.status, 8, color))
cols.append((self.last_okay, 16, color))
cols.append((self.info, 0, False))
#
rv = ""
for txt_method, l, c in cols:
txt = txt_method()
if len(rv) > 0:
rv += " | "
if c:
if txt_method == self.last_okay:
rv += colors.OKGREEN
else:
rv += self.__color__()
if l > 0:
rv += txt[:l]
rv += " " * (l-len(txt))
else:
rv += txt
if c:
rv += colors.ENDC
#
rv = "| " + rv + "\n"
if DEBUG:
rv += json.dumps(self, indent=4, sort_keys=True)
return rv
class __nagios_status__(dict):
def __init__(self) -> None:
super().__init__()
def all_problems(self):
rv = __nagios_status__()
for host in self:
for service in self[host]:
if service.is_problem():
rv.__add_service__(host, service)
return rv
def __add_service__(self, host, service):
if host not in self:
self[host] = []
self[host].append(nagios_service(service))
def __str__(self, color=False) -> str:
rv = ""
for host in self:
for service in self[host]:
if len(rv) == 0:
rv += service.__head__(color=color)
rv += service.__str__(color=color)
if len(rv) > 0:
rv += service.__foot__()
else:
rv += headline(color)
rv += "No monitorings to be reported."
return rv + "\n"
class nagios_status(__nagios_status__):
URL = "nagios4/cgi-bin/statusjson.cgi?query=servicelist&details=true" # &servicestatus=warning"
KEY_DATA = 'data'
KEY_SERVICELIST = 'servicelist'
def __init__(self, host, secure) -> None:
super().__init__()
#
if secure:
prefix = "https://"
else:
prefix = "http://"
r = requests.get(os.path.join(prefix, host, self.URL))
data = r.json()
hosts = data.get(self.KEY_DATA, {}).get(self.KEY_SERVICELIST, {})
for host in hosts:
for service in hosts.get(host, {}):
self.__add_service__(host, hosts.get(host).get(service))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Returns the status of all or partial nagios monitorings.")
parser.add_argument("hostname", help="The hostname and port of the nagios server (e.g. nagios:8080)")
parser.add_argument("-a", "--all", action="store_true", default=False,
help="print the status of all nagios monitorings. Default is all problems.")
parser.add_argument("-s", "--secure", action="store_true", default=False, help="Enables secure connection (https)")
parser.add_argument("-m", "--monochrome", action="store_true", default=False, help="No colored output")
args = parser.parse_args()
#
try:
s = nagios_status(args.hostname, args.secure)
except requests.ConnectionError:
print(headline(not args.monochrome) + "Can not connect to given host.")
else:
if args.all:
print(s.__str__(color=not args.monochrome))
else:
print(s.all_problems().__str__(color=not args.monochrome))