import argparse import config import getpass import logging import mqtt import re import report import time from textual.app import App, ComposeResult from textual.containers import Vertical from textual.widgets import Footer, Header, Input, RichLog, Button, Select try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' logger = logging.getLogger(ROOT_LOGGER_NAME) class MqttHandler(object): def __init__(self, app): self.app = app args = app.args password = app.password self.mqtt_client = mqtt.mqtt_client('mqtt_sniffer', args.hostname, args.port, username=args.username, password=password) self.mqtt_client.add_callback("#", self.__rx__) def __get_logger__(self, prefix, topic): return logging.getLogger(ROOT_LOGGER_NAME).getChild(prefix + '.' + topic.replace('/', '.')) def __rx__(self, client, userdata, message): logger = self.__get_logger__('rx', message.topic) logger.debug("Received message: topic=%s, payload=%s type=%s", message.topic, repr(message.payload), repr(type(message.payload))) self.app.call_from_thread(self.app.add_log, message) def send(self, topic, payload): logger = self.__get_logger__('tx', topic) logger.debug("Sending message: topic=%s, payload=%s type=%s", topic, repr(payload), repr(type(payload))) self.mqtt_client.send(topic, payload) class OptionSelectList(dict): def __init__(self): super().__init__(self) # self.__default_value__ = True self.__selection_regex__ = "" def __sorted_keys__(self): rv = list(self.keys()) rv.sort() return rv def SetSelectionRegEx(self, regex: str) -> None: self.__selection_regex__ = regex def AddEntry(self, entry) -> None: if entry not in self: self[entry] = self.__default_value__ def Toggle(self, entry_num): if entry_num < 0: if not self.__selection_regex__: self.__default_value__ = entry_num == -1 for key in self: if self.__match__(key): self[key] = entry_num == -1 elif entry_num > len(self): raise ValueError(f"The Entry '{entry} is not in the list") else: entry = self.__sorted_keys__()[entry_num] self[entry] = not self[entry] def __match__(self, key): try: match = len(re.findall(self.__selection_regex__, key)) > 0 except re.error: match = True # No valid regular expression return match def GetSelectList(self): rv = [] if len(self) > 2: rv.append(('All', -1)) rv.append(('None', -2)) for index, key in enumerate(self.__sorted_keys__()): if self.__match__(key): prefix = "\\[X] " if self[key] else "\\[-] " rv.append((prefix + key, index)) return rv def IsSelected(self, entry): return self.get(entry, False) class MySelect(Select): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__new_options__ = [] def set_options(self, options): self.__new_options__ = options def _on_enter(self, event): super().set_options(self.__new_options__) return super()._on_enter(event) class MqttSniffer(App): """a textual application for viewing mqtt messages.""" CSS_PATH = "style.tcss" BINDINGS = [ ("q", "quit", "Quit"), ("c", "clear_screen", "Clear") ] MAX_LOGS = 1000 def __init__(self, args, password): super().__init__() self.args = args self.password = password # self.mqtt = None self.all_logs = [] self.__topic_select_list__ = OptionSelectList() self.__topic_selection__ = MySelect((), prompt="Full", id="topic_filter") self.send_topic = "" self.send_payload = "" self.log_display = RichLog(highlight=True, markup=True) def compose(self) -> ComposeResult: """Create the widgets for the application.""" yield Header(name="Python Log Viewer") with Vertical(id="app-grid"): yield self.log_display with Vertical(id="filter-bar"): yield self.__topic_selection__ yield Input(placeholder="Filter", id="select_filter") with Vertical(id="send-bar"): yield Input(placeholder="topic...", id="send_topic") yield Input(placeholder="payload...", id="send_payload") yield Button("Send", variant="success", id="send_button") yield Footer() def on_mount(self) -> None: """start the mqtt receiver.""" self.mqtt = MqttHandler(self) def add_log(self, record) -> None: """Add new mqt messages and update the tui.""" asctime = time.asctime() self.__topic_select_list__.AddEntry(record.topic) self.__topic_selection__.set_options(self.__topic_select_list__.GetSelectList()) self.all_logs.append((asctime, record)) if len(self.all_logs) > self.MAX_LOGS: self.all_logs = self.all_logs[-self.MAX_LOGS:] self._apply_filters_to_log((asctime, record)) def _apply_filters_to_log(self, data: logging.LogRecord): asctime, record = data if self.__topic_select_list__.IsSelected(record.topic): message = ( f"[[dim]{asctime}[/dim]] " f"[bold]{record.topic}[/bold] - " f"{repr(record.payload)}" ) self.log_display.write(message) def action_clear_screen(self): self.all_logs = [] self._update_display() def _update_display(self): """Clean the display and render all mqtt messages based on the current filters.""" self.log_display.clear() for record in self.all_logs: self._apply_filters_to_log(record) def on_input_changed(self, message: Input.Changed) -> None: """Update the tui inputs and execute task, if requireed.""" if message.input.id == "select_filter": self.__topic_select_list__.SetSelectionRegEx(message.value) self.__topic_selection__.set_options(self.__topic_select_list__.GetSelectList()) self.__topic_selection__.prompt = "Full" if not message.value else "Filtered" if message.input.id == "send_topic": self.send_topic = message.value elif message.input.id == "send_payload": self.send_payload = message.value def on_select_changed(self, message: Select.Changed) -> None: if message.select.id == 'topic_filter': if type(message.value) is int: self.__topic_select_list__.Toggle(message.value) self.__topic_selection__.clear() self.__topic_selection__.set_options(self.__topic_select_list__.GetSelectList()) self._update_display() def on_button_pressed(self, event: Button.Pressed) -> None: """Event handler called when a button is pressed.""" if event.button.id == "send_button": if self.mqtt is not None: if len(self.send_topic) > 0: self.mqtt.send(self.send_topic, self.send_payload) else: logger.warning("Can't send mqtt message with empty topic. topic=%s; payload=%s", repr(self.send_topic), repr(self.send_payload)) if __name__ == "__main__": # # Logging # if config.DEBUG: report.add_handler_socket(logger) # # Parse Arguments # parser = argparse.ArgumentParser(description='This is a mqtt sniffer.') parser.add_argument('-f', dest='hostname', default='localhost', help='Hostname of the mqtt server') parser.add_argument('-p', dest='port', default=1883, type=int, help='Port of the mqtt server') parser.add_argument('-n', dest='no_credentials', action='store_true', help='Avoid asking for credentials') parser.add_argument('-u', dest='username', default=None, help='Set username for mqtt server') parser.add_argument('-t', dest='topicfilter', default="", help='Set topic filter') parser.add_argument('-l', dest='logtofile', action='store_true', help='Enable logging to file') args = parser.parse_args() # # Ask for credentials # if not args.no_credentials: if args.username == None: args.username = input("Username: ") password = getpass.getpass(prompt='Password: ', stream=None) else: args.username = None password = None # # Start Application # app = MqttSniffer(args, password) app.run()