mqtt_sniffer/mqtt_sniffer.py
2025-07-19 20:07:17 +02:00

131 lines
4.2 KiB
Python

import argparse
import getpass
import logging
import json
import mqtt
import time
from textual.app import App, ComposeResult
from textual.containers import Vertical
from textual.widgets import Footer, Header, Input, RichLog
# TODO: Usage of multiple regex for topic filter
# TODO: Integrate sending of message (topic + payload)
class MqttReceiver(object):
def __init__(self, app):
self.app = app
args = app.args
password = app.password
self.mqtt_client = mqtt.mqtt_client('mqtt_sniffer', args.hostname, args.port, username=args.username, password=password)
self.mqtt_client.add_callback("#", self.__rx__)
def __rx__(self, client, userdata, message):
self.app.call_from_thread(self.app.add_log, message)
class MqttSniffer(App):
"""a textual application for viewing mqtt messages."""
CSS_PATH = "style.tcss"
BINDINGS = [("q", "quit", "Quit")]
MAX_LOGS = 50
def __init__(self, args, password):
super().__init__()
self.args = args
self.password = password
#
self.all_logs = []
self.topic_filter = ""
self.log_display = RichLog(highlight=True, markup=True)
def compose(self) -> ComposeResult:
"""Create the widgets for the application."""
yield Header(name="Python Log Viewer")
with Vertical(id="app-grid"):
yield self.log_display
with Vertical(id="bottom-bar"):
yield Input(placeholder="topic filter...", id="topic_filter")
yield Footer()
def on_mount(self) -> None:
"""start the mqtt receiver."""
MqttReceiver(self)
def add_log(self, record) -> None:
"""Add new mqt messages and update the tui."""
asctime = time.asctime()
self.all_logs.append((asctime, record))
if len(self.all_logs) > self.MAX_LOGS:
self.all_logs = self.all_logs[-self.MAX_LOGS:]
self._apply_filters_to_log((asctime, record))
def _apply_filters_to_log(self, data: logging.LogRecord):
asctime, record = data
"""filter the mqtt messages."""
topic_match = False
for topic_filter in self.topic_filter.split(","):
topic_match |= topic_filter.lower() in record.topic.lower()
if topic_match:
try:
payload = json.loads(record.payload)
except:
payload = record.payload.decode('utf-8')
message = (
f"[[dim]{asctime}[/dim]] "
f"[bold]{record.topic}[/bold] - "
f"{repr(payload)}"
)
self.log_display.write(message)
def _update_display(self):
"""Clean the display and render all mqtt messages based on the current filters."""
self.log_display.clear()
for record in self.all_logs:
self._apply_filters_to_log(record)
def on_input_changed(self, message: Input.Changed) -> None:
"""Update the filter and filtered messages after the user did changes."""
if message.input.id == "topic_filter":
self.topic_filter = message.value
self._update_display()
if __name__ == "__main__":
#
# Parse Arguments
#
parser = argparse.ArgumentParser(description='This is a mqtt sniffer.')
parser.add_argument('-f', dest='hostname', default='localhost', help='Hostname of the mqtt server')
parser.add_argument('-p', dest='port', default=1883, type=int, help='Port of the mqtt server')
parser.add_argument('-n', dest='no_credentials', action='store_true', help='Avoid asking for credentials')
parser.add_argument('-u', dest='username', default=None, help='Set username for mqtt server')
parser.add_argument('-t', dest='topicfilter', default="", help='Set topic filter')
parser.add_argument('-l', dest='logtofile', action='store_true', help='Enable logging to file')
args = parser.parse_args()
#
# Ask for credentials
#
if not args.no_credentials:
if args.username == None:
args.username = input("Username: ")
password = getpass.getpass(prompt='Password: ', stream=None)
else:
args.username = None
password = None
#
# Start Application
#
app = MqttSniffer(args, password)
app.run()