172 lines
5.8 KiB
Python
172 lines
5.8 KiB
Python
import argparse
|
|
import config
|
|
import getpass
|
|
import logging
|
|
import mqtt
|
|
import re
|
|
import report
|
|
import time
|
|
|
|
from textual.app import App, ComposeResult
|
|
from textual.containers import Vertical
|
|
from textual.widgets import Footer, Header, Input, RichLog, Button
|
|
|
|
try:
|
|
from config import APP_NAME as ROOT_LOGGER_NAME
|
|
except ImportError:
|
|
ROOT_LOGGER_NAME = 'root'
|
|
logger = logging.getLogger(ROOT_LOGGER_NAME)
|
|
|
|
|
|
class MqttHandler(object):
|
|
def __init__(self, app):
|
|
self.app = app
|
|
args = app.args
|
|
password = app.password
|
|
|
|
self.mqtt_client = mqtt.mqtt_client('mqtt_sniffer', args.hostname, args.port, username=args.username, password=password)
|
|
self.mqtt_client.add_callback("#", self.__rx__)
|
|
|
|
def __get_logger__(self, prefix, topic):
|
|
return logging.getLogger(ROOT_LOGGER_NAME).getChild(prefix + '.' + topic.replace('/', '.'))
|
|
|
|
def __rx__(self, client, userdata, message):
|
|
logger = self.__get_logger__('rx', message.topic)
|
|
logger.debug("Received message: topic=%s, payload=%s type=%s", message.topic, repr(message.payload), repr(type(message.payload)))
|
|
self.app.call_from_thread(self.app.add_log, message)
|
|
|
|
def send(self, topic, payload):
|
|
logger = self.__get_logger__('tx', topic)
|
|
logger.debug("Sending message: topic=%s, payload=%s type=%s", topic, repr(payload), repr(type(payload)))
|
|
self.mqtt_client.send(topic, payload)
|
|
|
|
class MqttSniffer(App):
|
|
"""a textual application for viewing mqtt messages."""
|
|
|
|
CSS_PATH = "style.tcss"
|
|
BINDINGS = [
|
|
("q", "quit", "Quit"),
|
|
("c", "clear_screen", "Clear")
|
|
]
|
|
MAX_LOGS = 50
|
|
|
|
def __init__(self, args, password):
|
|
super().__init__()
|
|
self.args = args
|
|
self.password = password
|
|
#
|
|
self.mqtt = None
|
|
self.all_logs = []
|
|
self.topic_filter = ""
|
|
self.send_topic = ""
|
|
self.send_payload = ""
|
|
self.log_display = RichLog(highlight=True, markup=True)
|
|
|
|
def compose(self) -> ComposeResult:
|
|
"""Create the widgets for the application."""
|
|
yield Header(name="Python Log Viewer")
|
|
with Vertical(id="app-grid"):
|
|
yield self.log_display
|
|
with Vertical(id="bottom-bar"):
|
|
yield Input(placeholder="topic filter...", id="topic_filter")
|
|
yield Input(placeholder="topic...", id="send_topic")
|
|
yield Input(placeholder="payload...", id="send_payload")
|
|
yield Button("Send", variant="success", id="send_button")
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
"""start the mqtt receiver."""
|
|
self.mqtt = MqttHandler(self)
|
|
|
|
def add_log(self, record) -> None:
|
|
"""Add new mqt messages and update the tui."""
|
|
asctime = time.asctime()
|
|
self.all_logs.append((asctime, record))
|
|
if len(self.all_logs) > self.MAX_LOGS:
|
|
self.all_logs = self.all_logs[-self.MAX_LOGS:]
|
|
self._apply_filters_to_log((asctime, record))
|
|
|
|
def _apply_filters_to_log(self, data: logging.LogRecord):
|
|
asctime, record = data
|
|
"""filter the mqtt messages."""
|
|
topic_match = False
|
|
for topic_filter in self.topic_filter.split(","):
|
|
try:
|
|
topic_match |= len(re.findall(topic_filter, record.topic)) > 0
|
|
except re.error:
|
|
pass # No valid regular expression
|
|
|
|
if topic_match:
|
|
message = (
|
|
f"[[dim]{asctime}[/dim]] "
|
|
f"[bold]{record.topic}[/bold] - "
|
|
f"{repr(record.payload)}"
|
|
)
|
|
|
|
self.log_display.write(message)
|
|
|
|
def action_clear_screen(self):
|
|
self.all_logs = []
|
|
self._update_display()
|
|
|
|
|
|
def _update_display(self):
|
|
"""Clean the display and render all mqtt messages based on the current filters."""
|
|
self.log_display.clear()
|
|
for record in self.all_logs:
|
|
self._apply_filters_to_log(record)
|
|
|
|
def on_input_changed(self, message: Input.Changed) -> None:
|
|
"""Update the tui inputs and execute task, if requireed."""
|
|
if message.input.id == "topic_filter":
|
|
self.topic_filter = message.value
|
|
self._update_display()
|
|
elif message.input.id == "send_topic":
|
|
self.send_topic = message.value
|
|
elif message.input.id == "send_payload":
|
|
self.send_payload = message.value
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
"""Event handler called when a button is pressed."""
|
|
if event.button.id == "send_button":
|
|
if self.mqtt is not None:
|
|
self.mqtt.send(self.send_topic, self.send_payload)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
#
|
|
# Logging
|
|
#
|
|
if config.DEBUG:
|
|
report.add_handler_socket(logger)
|
|
|
|
#
|
|
# Parse Arguments
|
|
#
|
|
parser = argparse.ArgumentParser(description='This is a mqtt sniffer.')
|
|
parser.add_argument('-f', dest='hostname', default='localhost', help='Hostname of the mqtt server')
|
|
parser.add_argument('-p', dest='port', default=1883, type=int, help='Port of the mqtt server')
|
|
parser.add_argument('-n', dest='no_credentials', action='store_true', help='Avoid asking for credentials')
|
|
parser.add_argument('-u', dest='username', default=None, help='Set username for mqtt server')
|
|
parser.add_argument('-t', dest='topicfilter', default="", help='Set topic filter')
|
|
parser.add_argument('-l', dest='logtofile', action='store_true', help='Enable logging to file')
|
|
|
|
args = parser.parse_args()
|
|
|
|
#
|
|
# Ask for credentials
|
|
#
|
|
if not args.no_credentials:
|
|
if args.username == None:
|
|
args.username = input("Username: ")
|
|
password = getpass.getpass(prompt='Password: ', stream=None)
|
|
else:
|
|
args.username = None
|
|
password = None
|
|
|
|
#
|
|
# Start Application
|
|
#
|
|
app = MqttSniffer(args, password)
|
|
app.run()
|