mqtt_sniffer/mqtt_sniffer.py

201 lines
7.1 KiB
Python

import argparse
import config
import getpass
import logging
import mqtt
import os
import report
import time
from textual.app import App, ComposeResult
from textual.containers import Vertical
from textual.widgets import Footer, Header, Input, RichLog, Button, Select, Checkbox
from mytui import MultiSelect
logger = report.app_logging_config()
filename = os.path.splitext(__file__)[0] + '.log'
filelogger = logging.getLogger('filelogger')
filehandler = report.add_handler_file(filelogger, filename=filename, maxMbytes=5, backupCount=3, fmt="[%(asctime)s] %(message)s")
def filerotate():
if os.path.getsize(filename) > 0:
filehandler.doRollover()
class MqttHandler(object):
def __init__(self, app):
self.app = app
args = app.args
password = app.password
self.mqtt_client = mqtt.mqtt_client('mqtt_sniffer', args.hostname, args.port, username=args.username, password=password)
self.mqtt_client.add_callback("#", self.__rx__)
def __get_logger__(self, prefix, topic):
return logger.getChild(prefix + '.' + topic.replace('/', '.'))
def __rx__(self, client, userdata, message):
logger = self.__get_logger__('rx', message.topic)
logger.debug("Received message: topic=%s, payload=%s type=%s", message.topic, repr(message.payload), repr(type(message.payload)))
self.app.call_from_thread(self.app.add_log, message)
def send(self, topic, payload):
logger = self.__get_logger__('tx', topic)
logger.debug("Sending message: topic=%s, payload=%s type=%s", topic, repr(payload), repr(type(payload)))
self.mqtt_client.send(topic, payload)
class MqttSniffer(App):
"""a textual application for viewing mqtt messages."""
CSS_PATH = "style.tcss"
BINDINGS = [
("q", "quit", "Quit"),
("c", "clear_screen", "Clear"),
("p", "pause", "Pause")
]
MAX_LOGS = 5000
def __init__(self, args, password):
super().__init__()
self.args = args
self.password = password
#
self.__log_to_file__ = False
#
self.mqtt = None
self.all_logs = []
self.__logging_enabled__ = True
self.__topic_selection__ = MultiSelect((), prompt="Full", id="topic_filter")
self.send_topic = ""
self.send_payload = ""
self.log_display = RichLog(highlight=True, markup=True)
def compose(self) -> ComposeResult:
"""Create the widgets for the application."""
yield Header(name="Python Log Viewer")
with Vertical(id="app-grid"):
yield self.log_display
with Vertical(id="filter-bar"):
yield self.__topic_selection__
yield Input(placeholder="Filter", id="select_filter")
yield Checkbox("ToFile", self.__log_to_file__, id="log_to_file")
with Vertical(id="send-bar"):
yield Input(placeholder="topic...", id="send_topic")
yield Input(placeholder="payload...", id="send_payload")
yield Button("Send", variant="success", id="send_button")
yield Footer()
def on_mount(self) -> None:
"""start the mqtt receiver."""
self.mqtt = MqttHandler(self)
def add_log(self, record) -> None:
"""Add new mqt messages and update the tui."""
asctime = time.asctime()
self.__topic_selection__.AddEntry(record.topic)
if self.__log_to_file__ and self.__topic_selection__.IsSelected(record.topic):
filelogger.info("%s - %s", record.topic, record.payload)
self.all_logs.append((asctime, record))
if len(self.all_logs) > self.MAX_LOGS:
self.all_logs = self.all_logs[-self.MAX_LOGS:]
self._apply_filters_to_log((asctime, record))
def _apply_filters_to_log(self, data: logging.LogRecord):
asctime, record = data
if self.__topic_selection__.IsSelected(record.topic):
message = (
f"[[dim]{asctime}[/dim]] "
f"[bold]{record.topic}[/bold] - "
f"{repr(record.payload)}"
)
if self.__logging_enabled__:
self.log_display.write(message)
def action_clear_screen(self):
self.all_logs = []
self._update_display()
def action_pause(self):
self.__logging_enabled__ = not self.__logging_enabled__
if self.__logging_enabled__:
self._update_display()
def _update_display(self):
"""Clean the display and render all mqtt messages based on the current filters."""
self.log_display.clear()
for record in self.all_logs:
self._apply_filters_to_log(record)
def on_input_changed(self, message: Input.Changed) -> None:
"""Update the tui inputs and execute task, if requireed."""
if message.input.id == "select_filter":
self.__topic_selection__.SetSelectionRegEx(message.value)
elif message.input.id == "send_topic":
self.send_topic = message.value
elif message.input.id == "send_payload":
self.send_payload = message.value
def on_select_changed(self, message: Select.Changed) -> None:
if message.select.id in ('topic_filter', ):
self._update_display()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Event handler called when a button is pressed."""
if event.button.id == "send_button":
if self.mqtt is not None:
if len(self.send_topic) > 0:
self.mqtt.send(self.send_topic, self.send_payload)
else:
logger.warning("Can't send mqtt message with empty topic. topic=%s; payload=%s", repr(self.send_topic), repr(self.send_payload))
def on_checkbox_changed(self, message: Checkbox.Changed) -> None:
if message.checkbox.id == "log_to_file":
if message.value:
filerotate()
self.__log_to_file__ = message.value
if __name__ == "__main__":
#
# Logging
#
if config.DEBUG:
report.add_handler_socket(logger)
#
# Parse Arguments
#
parser = argparse.ArgumentParser(description='This is a mqtt sniffer.')
parser.add_argument('-f', dest='hostname', default='localhost', help='Hostname of the mqtt server')
parser.add_argument('-p', dest='port', default=1883, type=int, help='Port of the mqtt server')
parser.add_argument('-n', dest='no_credentials', action='store_true', help='Avoid asking for credentials')
parser.add_argument('-u', dest='username', default=None, help='Set username for mqtt server')
parser.add_argument('-t', dest='topicfilter', default="", help='Set topic filter')
parser.add_argument('-l', dest='logtofile', action='store_true', help='Enable logging to file')
args = parser.parse_args()
#
# Ask for credentials
#
if not args.no_credentials:
if args.username == None:
args.username = input("Username: ")
password = getpass.getpass(prompt='Password: ', stream=None)
else:
args.username = None
password = None
#
# Start Application
#
app = MqttSniffer(args, password)
app.run()