diff --git a/mqtt_sniffer b/mqtt_sniffer new file mode 100755 index 0000000..e0f6eb7 --- /dev/null +++ b/mqtt_sniffer @@ -0,0 +1,6 @@ +#!/bin/bash +# +BASEPATH=$(dirname $0) + +$BASEPATH/venv/bin/python $BASEPATH/mqtt_sniffer.py $* + diff --git a/mqtt_sniffer.py b/mqtt_sniffer.py index a62ed7b..ff0511a 100644 --- a/mqtt_sniffer.py +++ b/mqtt_sniffer.py @@ -9,7 +9,7 @@ import time from textual.app import App, ComposeResult from textual.containers import Vertical -from textual.widgets import Footer, Header, Input, RichLog, Button +from textual.widgets import Footer, Header, Input, RichLog, Button, Select try: from config import APP_NAME as ROOT_LOGGER_NAME @@ -41,6 +41,68 @@ class MqttHandler(object): self.mqtt_client.send(topic, payload) +class OptionSelectList(dict): + def __init__(self): + super().__init__(self) + # + self.__default_value__ = True + self.__selection_regex__ = "" + + def __sorted_keys__(self): + rv = list(self.keys()) + rv.sort() + return rv + + def SetSelectionRegEx(self, regex: str) -> None: + self.__selection_regex__ = regex + + def AddEntry(self, entry) -> None: + if entry not in self: + self[entry] = self.__default_value__ + + def Toggle(self, entry_num): + if entry_num < 0: + self.__default_value__ = entry_num == -1 + for key in self: + self[key] = self.__default_value__ + elif entry_num > len(self): + raise ValueError(f"The Entry '{entry} is not in the list") + else: + entry = self.__sorted_keys__()[entry_num] + self[entry] = not self[entry] + + def GetSelectList(self): + rv = [] + if len(self) > 2: + rv.append(('All', -1)) + rv.append(('None', -2)) + for index, key in enumerate(self.__sorted_keys__()): + try: + match = len(re.findall(self.__selection_regex__, key)) > 0 + except re.error: + match = True # No valid regular expression + if match: + prefix = "\\[X] " if self[key] else "\\[-] " + rv.append((prefix + key, index)) + return rv + + def IsSelected(self, entry): + return self.get(entry, False) + + +class MySelect(Select): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.__new_options__ = [] + + def set_options(self, options): + self.__new_options__ = options + + def _on_enter(self, event): + super().set_options(self.__new_options__) + return super()._on_enter(event) + + class MqttSniffer(App): """a textual application for viewing mqtt messages.""" @@ -58,7 +120,8 @@ class MqttSniffer(App): # self.mqtt = None self.all_logs = [] - self.topic_filter = "" + self.__topic_select_list__ = OptionSelectList() + self.__topic_selection__ = MySelect((), prompt="Full", id="topic_filter") self.send_topic = "" self.send_payload = "" self.log_display = RichLog(highlight=True, markup=True) @@ -68,8 +131,10 @@ class MqttSniffer(App): yield Header(name="Python Log Viewer") with Vertical(id="app-grid"): yield self.log_display - with Vertical(id="bottom-bar"): - yield Input(placeholder="topic filter...", id="topic_filter") + with Vertical(id="filter-bar"): + yield self.__topic_selection__ + yield Input(placeholder="Filter", id="select_filter") + with Vertical(id="send-bar"): yield Input(placeholder="topic...", id="send_topic") yield Input(placeholder="payload...", id="send_payload") yield Button("Send", variant="success", id="send_button") @@ -82,6 +147,8 @@ class MqttSniffer(App): def add_log(self, record) -> None: """Add new mqt messages and update the tui.""" asctime = time.asctime() + self.__topic_select_list__.AddEntry(record.topic) + self.__topic_selection__.set_options(self.__topic_select_list__.GetSelectList()) self.all_logs.append((asctime, record)) if len(self.all_logs) > self.MAX_LOGS: self.all_logs = self.all_logs[-self.MAX_LOGS:] @@ -89,15 +156,8 @@ class MqttSniffer(App): def _apply_filters_to_log(self, data: logging.LogRecord): asctime, record = data - """filter the mqtt messages.""" - topic_match = False - for topic_filter in self.topic_filter.split(","): - try: - topic_match |= len(re.findall(topic_filter, record.topic)) > 0 - except re.error: - pass # No valid regular expression - if topic_match: + if self.__topic_select_list__.IsSelected(record.topic): message = ( f"[[dim]{asctime}[/dim]] " f"[bold]{record.topic}[/bold] - " @@ -118,14 +178,24 @@ class MqttSniffer(App): def on_input_changed(self, message: Input.Changed) -> None: """Update the tui inputs and execute task, if requireed.""" - if message.input.id == "topic_filter": - self.topic_filter = message.value - self._update_display() - elif message.input.id == "send_topic": + if message.input.id == "select_filter": + self.__topic_select_list__.SetSelectionRegEx(message.value) + self.__topic_selection__.set_options(self.__topic_select_list__.GetSelectList()) + self.__topic_selection__.prompt = "Full" if not message.value else "Filtered" + if message.input.id == "send_topic": self.send_topic = message.value elif message.input.id == "send_payload": self.send_payload = message.value + def on_select_changed(self, message: Select.Changed) -> None: + if message.select.id == 'topic_filter': + if type(message.value) is int: + self.__topic_select_list__.Toggle(message.value) + self.__topic_selection__.clear() + self.__topic_selection__.set_options(self.__topic_select_list__.GetSelectList()) + + self._update_display() + def on_button_pressed(self, event: Button.Pressed) -> None: """Event handler called when a button is pressed.""" if event.button.id == "send_button": diff --git a/style.tcss b/style.tcss index 44441e6..51c5f48 100644 --- a/style.tcss +++ b/style.tcss @@ -5,10 +5,18 @@ height: 1fr; } -#bottom-bar { +#filter-bar { layout: grid; - grid-size: 4; - grid-columns: 3fr 2fr 2fr 1fr; + grid-size: 2; + grid-columns: 4fr 1fr; + height: 4; + border-top: solid $primary; +} + +#send-bar { + layout: grid; + grid-size: 3; + grid-columns: 3fr 3fr 1fr; height: 4; border-top: solid $primary; }