loggy/loggy.py

135 lines
4.8 KiB
Python

import logging
import pickle
import socket
import struct
import threading
from datetime import datetime # <--- HINZUGEFÜGT
from logging.handlers import SocketHandler
from textual.app import App, ComposeResult
from textual.containers import Vertical
from textual.widgets import Footer, Header, Input, RichLog
# Mapping von Log-Level-Namen zu Farben für die Anzeige
LEVEL_STYLES = {
"CRITICAL": "bold white on red",
"ERROR": "bold red",
"WARNING": "bold yellow",
"INFO": "bold green",
"DEBUG": "bold blue",
}
class LogReceiver(threading.Thread):
"""Ein Thread, der auf eingehende Log-Nachrichten lauscht."""
def __init__(self, app, host="", port=19996):
super().__init__()
self.app = app
self.host = host
self.port = port
self.daemon = True
def run(self):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((self.host, self.port))
s.listen()
while True:
conn, _ = s.accept()
with conn:
while True:
chunk = conn.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack(">L", chunk)[0]
chunk = conn.recv(slen)
while len(chunk) < slen:
chunk = chunk + conn.recv(slen - len(chunk))
obj = pickle.loads(chunk)
log_record = logging.makeLogRecord(obj)
self.app.call_from_thread(self.app.add_log, log_record)
except Exception as e:
logging.basicConfig(filename="server_error.log", level=logging.DEBUG)
logging.error(f"LogReceiver-Fehler: {e}", exc_info=True)
class LogViewerApp(App):
"""Eine Textual-App zum Anzeigen und Filtern von Python-Logs."""
CSS_PATH = "style.tcss"
BINDINGS = [("q", "quit", "Beenden")]
def __init__(self):
super().__init__()
self.all_logs = []
self.module_filter = ""
self.level_filter = ""
self.log_display = RichLog(highlight=True, markup=True)
def compose(self) -> ComposeResult:
"""Erstellt die Widgets für die App."""
yield Header(name="Python Log Viewer")
with Vertical(id="app-grid"):
yield self.log_display
with Vertical(id="bottom-bar"):
yield Input(placeholder="Modul filtern...", id="module_filter")
yield Input(placeholder="Level filtern...", id="level_filter")
yield Footer()
def on_mount(self) -> None:
"""Startet den Log-Empfänger-Thread."""
log_receiver = LogReceiver(self)
log_receiver.start()
def add_log(self, record: logging.LogRecord) -> None:
"""Fügt einen neuen Log-Eintrag hinzu und aktualisiert die Anzeige."""
self.all_logs.append(record)
self._apply_filters_to_log(record)
def _apply_filters_to_log(self, record: logging.LogRecord):
"""Prüft einen einzelnen Log-Eintrag gegen die Filter und zeigt ihn ggf. an."""
module_match = False
for module_filter in self.module_filter.split(","):
module_match |= module_filter.lower() in record.name.lower()
level_match = False
for level_filter in self.level_filter.split(","):
level_match |= level_filter.lower() in record.levelname.lower()
if module_match and level_match:
level_style = LEVEL_STYLES.get(record.levelname, "white")
timestamp_str = datetime.fromtimestamp(record.created).strftime("%Y-%m-%d %H:%M:%S")
asctime = f"{timestamp_str},{int(record.msecs):03d}"
message = (
f"[[dim]{asctime}[/dim]] "
f"[{level_style}]{record.levelname:<8}[/{level_style}] "
f"[bold]{record.name}[/bold] - "
f"{record.getMessage()}"
)
self.log_display.write(message)
def _update_display(self):
"""Löscht die Anzeige und rendert alle Logs basierend auf den aktuellen Filtern neu."""
self.log_display.clear()
for record in self.all_logs:
self._apply_filters_to_log(record)
def on_input_changed(self, message: Input.Changed) -> None:
"""Aktualisiert die Filter und die Anzeige, wenn der Benutzer tippt."""
if message.input.id == "module_filter":
self.module_filter = message.value
elif message.input.id == "level_filter":
self.level_filter = message.value
self._update_display()
if __name__ == "__main__":
app = LogViewerApp()
app.run()