import argparse import getpass import logging import json import mqtt import time from textual.app import App, ComposeResult from textual.containers import Vertical from textual.widgets import Footer, Header, Input, RichLog, Checkbox class LogReceiver(object): """Ein Thread, der auf eingehende Log-Nachrichten lauscht.""" def __init__(self, app): self.app = app args = app.args password = app.password self.mqtt_client = mqtt.mqtt_client('mqttsniffer', args.hostname, args.port, username=args.username, password=password) self.mqtt_client.add_callback("#", self.__rx__) def __rx__(self, client, userdata, message): self.app.call_from_thread(self.app.add_log, message) class LogViewerApp(App): """Eine Textual-App zum Anzeigen und Filtern von Python-Logs.""" CSS_PATH = "style.tcss" BINDINGS = [("q", "quit", "Quit")] MAX_LOGS = 50 def __init__(self, args, password): super().__init__() self.args = args self.password = password # self.all_logs = [] self.topic_filter = "" self.log_display = RichLog(highlight=True, markup=True) def compose(self) -> ComposeResult: """Erstellt die Widgets für die App.""" yield Header(name="Python Log Viewer") with Vertical(id="app-grid"): yield self.log_display with Vertical(id="bottom-bar"): yield Input(placeholder="topic filter...", id="topic_filter") yield Footer() def on_mount(self) -> None: """Startet den Log-Empfänger-Thread.""" log_receiver = LogReceiver(self) def add_log(self, record) -> None: """Fügt einen neuen Log-Eintrag hinzu und aktualisiert die Anzeige.""" asctime = time.asctime() self.all_logs.append((asctime, record)) if len(self.all_logs) > self.MAX_LOGS: self.all_logs = self.all_logs[-self.MAX_LOGS:] self._apply_filters_to_log((asctime, record)) def _apply_filters_to_log(self, data: logging.LogRecord): asctime, record = data """Prüft einen einzelnen Log-Eintrag gegen die Filter und zeigt ihn ggf. an.""" topic_match = False for topic_filter in self.topic_filter.split(","): topic_match |= topic_filter.lower() in record.topic.lower() if topic_match: try: payload = json.loads(record.payload) except: payload = record.payload.decode('utf-8') message = ( f"[[dim]{asctime}[/dim]] " f"[bold]{record.topic}[/bold] - " f"{repr(payload)}" ) self.log_display.write(message) def _update_display(self): """Löscht die Anzeige und rendert alle Logs basierend auf den aktuellen Filtern neu.""" self.log_display.clear() for record in self.all_logs: self._apply_filters_to_log(record) def on_input_changed(self, message: Input.Changed) -> None: """Aktualisiert die Filter und die Anzeige, wenn der Benutzer tippt.""" if message.input.id == "topic_filter": self.topic_filter = message.value self._update_display() self._update_display() if __name__ == "__main__": # # Parse Arguments # parser = argparse.ArgumentParser(description='This is a mqtt sniffer.') parser.add_argument('-f', dest='hostname', default='localhost', help='Hostname of the mqtt server') parser.add_argument('-p', dest='port', default=1883, type=int, help='Port of the mqtt server') parser.add_argument('-n', dest='no_credentials', action='store_true', help='Avoid asking for credentials') parser.add_argument('-u', dest='username', default=None, help='Set username for mqtt server') parser.add_argument('-t', dest='topicfilter', default="", help='Set topic filter') parser.add_argument('-l', dest='logtofile', action='store_true', help='Enable logging to file') args = parser.parse_args() # # Ask for credentials # if not args.no_credentials: if args.username == None: args.username = input("Username: ") password = getpass.getpass(prompt='Password: ', stream=None) else: args.username = None password = None # # Start Application # app = LogViewerApp(args, password) app.run()