Select widget for topics

This commit is contained in:
Dirk Alders 2025-07-24 23:11:35 +02:00
parent 1db7cb2451
commit ae3dd84aed
3 changed files with 103 additions and 19 deletions

6
mqtt_sniffer Executable file
View File

@ -0,0 +1,6 @@
#!/bin/bash
#
BASEPATH=$(dirname $0)
$BASEPATH/venv/bin/python $BASEPATH/mqtt_sniffer.py $*

View File

@ -9,7 +9,7 @@ import time
from textual.app import App, ComposeResult
from textual.containers import Vertical
from textual.widgets import Footer, Header, Input, RichLog, Button
from textual.widgets import Footer, Header, Input, RichLog, Button, Select
try:
from config import APP_NAME as ROOT_LOGGER_NAME
@ -41,6 +41,68 @@ class MqttHandler(object):
self.mqtt_client.send(topic, payload)
class OptionSelectList(dict):
def __init__(self):
super().__init__(self)
#
self.__default_value__ = True
self.__selection_regex__ = ""
def __sorted_keys__(self):
rv = list(self.keys())
rv.sort()
return rv
def SetSelectionRegEx(self, regex: str) -> None:
self.__selection_regex__ = regex
def AddEntry(self, entry) -> None:
if entry not in self:
self[entry] = self.__default_value__
def Toggle(self, entry_num):
if entry_num < 0:
self.__default_value__ = entry_num == -1
for key in self:
self[key] = self.__default_value__
elif entry_num > len(self):
raise ValueError(f"The Entry '{entry} is not in the list")
else:
entry = self.__sorted_keys__()[entry_num]
self[entry] = not self[entry]
def GetSelectList(self):
rv = []
if len(self) > 2:
rv.append(('All', -1))
rv.append(('None', -2))
for index, key in enumerate(self.__sorted_keys__()):
try:
match = len(re.findall(self.__selection_regex__, key)) > 0
except re.error:
match = True # No valid regular expression
if match:
prefix = "\\[X] " if self[key] else "\\[-] "
rv.append((prefix + key, index))
return rv
def IsSelected(self, entry):
return self.get(entry, False)
class MySelect(Select):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__new_options__ = []
def set_options(self, options):
self.__new_options__ = options
def _on_enter(self, event):
super().set_options(self.__new_options__)
return super()._on_enter(event)
class MqttSniffer(App):
"""a textual application for viewing mqtt messages."""
@ -58,7 +120,8 @@ class MqttSniffer(App):
#
self.mqtt = None
self.all_logs = []
self.topic_filter = ""
self.__topic_select_list__ = OptionSelectList()
self.__topic_selection__ = MySelect((), prompt="Full", id="topic_filter")
self.send_topic = ""
self.send_payload = ""
self.log_display = RichLog(highlight=True, markup=True)
@ -68,8 +131,10 @@ class MqttSniffer(App):
yield Header(name="Python Log Viewer")
with Vertical(id="app-grid"):
yield self.log_display
with Vertical(id="bottom-bar"):
yield Input(placeholder="topic filter...", id="topic_filter")
with Vertical(id="filter-bar"):
yield self.__topic_selection__
yield Input(placeholder="Filter", id="select_filter")
with Vertical(id="send-bar"):
yield Input(placeholder="topic...", id="send_topic")
yield Input(placeholder="payload...", id="send_payload")
yield Button("Send", variant="success", id="send_button")
@ -82,6 +147,8 @@ class MqttSniffer(App):
def add_log(self, record) -> None:
"""Add new mqt messages and update the tui."""
asctime = time.asctime()
self.__topic_select_list__.AddEntry(record.topic)
self.__topic_selection__.set_options(self.__topic_select_list__.GetSelectList())
self.all_logs.append((asctime, record))
if len(self.all_logs) > self.MAX_LOGS:
self.all_logs = self.all_logs[-self.MAX_LOGS:]
@ -89,15 +156,8 @@ class MqttSniffer(App):
def _apply_filters_to_log(self, data: logging.LogRecord):
asctime, record = data
"""filter the mqtt messages."""
topic_match = False
for topic_filter in self.topic_filter.split(","):
try:
topic_match |= len(re.findall(topic_filter, record.topic)) > 0
except re.error:
pass # No valid regular expression
if topic_match:
if self.__topic_select_list__.IsSelected(record.topic):
message = (
f"[[dim]{asctime}[/dim]] "
f"[bold]{record.topic}[/bold] - "
@ -118,14 +178,24 @@ class MqttSniffer(App):
def on_input_changed(self, message: Input.Changed) -> None:
"""Update the tui inputs and execute task, if requireed."""
if message.input.id == "topic_filter":
self.topic_filter = message.value
self._update_display()
elif message.input.id == "send_topic":
if message.input.id == "select_filter":
self.__topic_select_list__.SetSelectionRegEx(message.value)
self.__topic_selection__.set_options(self.__topic_select_list__.GetSelectList())
self.__topic_selection__.prompt = "Full" if not message.value else "Filtered"
if message.input.id == "send_topic":
self.send_topic = message.value
elif message.input.id == "send_payload":
self.send_payload = message.value
def on_select_changed(self, message: Select.Changed) -> None:
if message.select.id == 'topic_filter':
if type(message.value) is int:
self.__topic_select_list__.Toggle(message.value)
self.__topic_selection__.clear()
self.__topic_selection__.set_options(self.__topic_select_list__.GetSelectList())
self._update_display()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Event handler called when a button is pressed."""
if event.button.id == "send_button":

View File

@ -5,10 +5,18 @@
height: 1fr;
}
#bottom-bar {
#filter-bar {
layout: grid;
grid-size: 4;
grid-columns: 3fr 2fr 2fr 1fr;
grid-size: 2;
grid-columns: 4fr 1fr;
height: 4;
border-top: solid $primary;
}
#send-bar {
layout: grid;
grid-size: 3;
grid-columns: 3fr 3fr 1fr;
height: 4;
border-top: solid $primary;
}