Error Handling of threads improved

This commit is contained in:
Dirk Alders 2025-08-04 18:02:29 +02:00
parent 03c0e799b4
commit 542b997284
2 changed files with 175 additions and 130 deletions

View File

@ -38,36 +38,97 @@ class QueueWorker(QObject):
job_finished = Signal(int) job_finished = Signal(int)
all_jobs_finished = Signal() all_jobs_finished = Signal()
def __init__(self, start_val, end_val): def __init__(self):
super().__init__() super().__init__()
self.queue = Queue() self.queue = Queue()
self.start_val = start_val self.__stop__ = False
self.end_val = end_val
self.__error_msg__ = None self.__error_msg__ = None
self.__job_data__ = {}
@Slot()
def stop(self):
self.__stop__ = True
@Slot()
def stopped(self):
return self.__stop__
@Slot()
def get_error(self): def get_error(self):
return self.__error_msg__ return self.__error_msg__
@Slot(str)
def set_error(self, error_msg): def set_error(self, error_msg):
if self.__error_msg__ is None: if self.__error_msg__ is None:
self.__error_msg__ = error_msg self.__error_msg__ = error_msg
@Slot()
def empty(self):
return self.queue.empty()
@Slot(int) @Slot(int)
def add_job(self, job_index, job_data): def add_job(self, job_index, job_data):
self.__job_data__[job_index] = job_data
self.queue.put(job_index) self.queue.put(job_index)
@Slot() @Slot()
def run(self): def run(self):
while True: while not self.__stop__:
job_index = self.queue.get() if self.queue.empty() or self.__error_msg__ is not None:
if job_index is None: # do not execute, if job_index is not available or error_msg is set
break time.sleep(0.1)
for i in range(self.start_val, self.end_val + 1): else:
time.sleep(0.02) job_index = self.queue.get()
self.progress_updated.emit(i, job_index) self.action(job_index)
self.job_finished.emit(job_index) if job_index in self.__job_data__:
del self.__job_data__[job_index]
self.job_finished.emit(job_index)
self.all_jobs_finished.emit() self.all_jobs_finished.emit()
@Slot(int)
def action(self, job_index):
pass
class RipQueueWorker(QueueWorker):
def __init__(self):
super().__init__()
@Slot(int)
def action(self, job_index):
ERROR_AT_TRACK = None # Give a number here
#
logger.info("action: Start ripping track %02d...", job_index + 1)
for i in range(0, 51):
time.sleep(0.1)
if ERROR_AT_TRACK is not None:
if job_index == ERROR_AT_TRACK and i > 10:
msg = "Fehler beim rippen von track %02d" % (job_index + 1)
logger.warning("action: " + msg)
self.set_error(msg)
return
self.progress_updated.emit(i, job_index)
class EncQueueWorker(QueueWorker):
def __init__(self):
super().__init__()
@Slot(int)
def action(self, job_index):
ERROR_AT_TRACK = None # Give a number here
#
logger.info("action: Start encoding track %02d...", job_index + 1)
for i in range(50, 101):
time.sleep(0.05)
if ERROR_AT_TRACK is not None:
if job_index == ERROR_AT_TRACK and i > 60:
msg = "Fehler beim encoden von track %02d" % (job_index + 1)
logger.warning("action: " + msg)
self.set_error(msg)
return
self.progress_updated.emit(i, job_index)
class RowWidget(QWidget): class RowWidget(QWidget):
""" """
@ -203,48 +264,50 @@ class MainWindow(QMainWindow):
self.button_rip.setEnabled(True) self.button_rip.setEnabled(True)
def get_rip_worker(self): def get_rip_worker(self):
return QueueWorker(start_val=0, end_val=50) return RipQueueWorker()
def get_encode_worker(self): def get_encode_worker(self):
return QueueWorker(start_val=50, end_val=100) return EncQueueWorker()
def rip_all(self, clicked_data: bool = None, single_job: int = None): def rip_all(self, clicked_data: bool = None, single_job: int = None):
self.button_read.setEnabled(False) self.button_read.setEnabled(False)
self.button_rip.setEnabled(False) self.button_rip.setEnabled(False)
self.thread1 = QThread() self.rip_thread = QThread()
self.worker1 = self.get_rip_worker() self.rip_worker = self.get_rip_worker()
self.worker1.moveToThread(self.thread1) self.rip_worker.moveToThread(self.rip_thread)
self.thread2 = QThread() self.enc_thread = QThread()
self.worker2 = self.get_encode_worker() self.enc_worker = self.get_encode_worker()
self.worker2.moveToThread(self.thread2) self.enc_worker.moveToThread(self.enc_thread)
# Connect Signals and Slots # Connect Signals and Slots
self.worker1.progress_updated.connect(self.set_title_progress) self.rip_worker.progress_updated.connect(self.set_title_progress)
self.worker2.progress_updated.connect(self.set_title_progress) self.enc_worker.progress_updated.connect(self.set_title_progress)
# We route the signal through the main window. # We route the signal through the main window.
self.worker1.job_finished.connect(self.schedule_phase_two_task) self.rip_worker.job_finished.connect(self.rip_task_finished)
self.enc_worker.job_finished.connect(self.enc_task_finished)
self.thread1.started.connect(self.worker1.run) self.rip_thread.started.connect(self.rip_worker.run)
self.thread2.started.connect(self.worker2.run) self.enc_thread.started.connect(self.enc_worker.run)
self.worker1.all_jobs_finished.connect(self.phase_one_complete) self.rip_worker.all_jobs_finished.connect(self.rip_tasks_complete)
self.worker2.all_jobs_finished.connect(self.all_tasks_complete) self.enc_worker.all_jobs_finished.connect(self.enc_tasks_complete)
self.thread1.start() self.rip_thread.start()
self.thread2.start() self.enc_thread.start()
self.set_status("Start ripping...") self.set_status("rip_all: Start ripping...")
if single_job is None: if single_job is None:
logger.info("rip_all: Starting rip process for whole disc...")
for i in range(0, self.list_widget.count()): for i in range(0, self.list_widget.count()):
self.worker1.add_job(i, self.get_job_data(i)) self.rip_worker.add_job(i, self.get_job_data(i))
elif 0 <= single_job < self.list_widget.count(): elif 0 <= single_job < self.list_widget.count():
self.worker1.add_job(single_job, self.get_job_data(single_job)) logger.info("rip_all: Starting rip process for track %02d", single_job + 1)
self.rip_worker.add_job(single_job, self.get_job_data(single_job))
self.worker1.add_job(None, None)
@Slot(int)
def get_job_data(self, job_index): def get_job_data(self, job_index):
# placeholder for disc function # placeholder for disc function
return None return None
@ -254,7 +317,7 @@ class MainWindow(QMainWindow):
if self.button_rip.isEnabled(): if self.button_rip.isEnabled():
self.rip_all(single_job=list_index) self.rip_all(single_job=list_index)
else: else:
logger.warning("A rip process is running ignoring rip request of track %02d", list_index + 1) logger.warning("single_track_selected: A rip process is running ignoring rip request of track %02d", list_index + 1)
def on_disc_removed(self, device): def on_disc_removed(self, device):
self.clear() self.clear()
@ -271,31 +334,59 @@ class MainWindow(QMainWindow):
self.on_disc_new(device) self.on_disc_new(device)
@Slot(int) @Slot(int)
def schedule_phase_two_task(self, job_index): def rip_task_finished(self, job_index):
""" """
This slot receives the 'job_finished' signal from worker1 This slot receives the 'job_finished' signal from worker1
and explicitly adds the job to worker2. and explicitly adds the job to worker2.
""" """
self.worker2.add_job(job_index, self.get_job_data(job_index)) logger.info("rip_task_finished: Track %02d", job_index + 1)
if self.rip_worker.get_error() is not None or self.enc_worker.get_error() is not None:
# Error in one worker detected
if not self.rip_worker.stopped():
logger.warning("rip_task_finished: Error detected. Stopping rip worker and setting progress of track %02d to 0%%.", job_index + 1)
self.rip_worker.stop()
self.set_title_progress(0, job_index)
else:
# Add next encode task only, if no errors are detected
logger.debug("rip_task_finished: Starting encode task for track %02d", job_index + 1)
self.enc_worker.add_job(job_index, self.get_job_data(job_index))
if self.rip_worker.empty():
logger.debug("rip_task_finished: Rip worker queue is empty. Stopping worker")
self.rip_worker.stop()
@Slot(int)
def enc_task_finished(self, job_index):
logger.info("enc_task_finished: Track %02d", job_index + 1)
if self.rip_worker.get_error() is not None or self.enc_worker.get_error() is not None:
# Error in one worker detected
if not self.rip_worker.stopped():
logger.warning("enc_task_finished: Error detected. Stopping rip worker.")
self.rip_worker.stop()
if self.enc_worker.get_error() is not None:
# Error in enc worker detected
logger.warning("enc_task_finished: Encoding error detected. Setting progress of track %02d to 0%%.", job_index + 1)
self.set_title_progress(0, job_index)
@Slot() @Slot()
def phase_one_complete(self): def rip_tasks_complete(self):
self.worker2.add_job(None, None) logger.info("rip_tasks_complete: All rip tasks completed. Stopping encode worker which is already encoding the last track.")
self.enc_worker.stop()
@Slot() @Slot()
def all_tasks_complete(self): def enc_tasks_complete(self):
self.thread1.quit() logger.info("enc_tasks_complete: All encode tasks completed. Removing threads and enabling buttons.")
self.thread1.wait() self.rip_thread.quit()
self.thread2.quit() self.rip_thread.wait()
self.thread2.wait() self.enc_thread.quit()
self.enc_thread.wait()
self.worker1.deleteLater() self.rip_worker.deleteLater()
self.worker2.deleteLater() self.enc_worker.deleteLater()
self.thread1.deleteLater() self.rip_thread.deleteLater()
self.thread2.deleteLater() self.enc_thread.deleteLater()
rip_error = self.worker1.get_error() rip_error = self.rip_worker.get_error()
enc_error = self.worker2.get_error() enc_error = self.enc_worker.get_error()
if rip_error is None and enc_error is None: if rip_error is None and enc_error is None:
self.set_status("Finished ripping...") self.set_status("Finished ripping...")
else: else:

View File

@ -1,6 +1,6 @@
import argparse import argparse
import fstools import fstools
from gui import MainWindow, SelectionDialog from gui import MainWindow, SelectionDialog, QueueWorker
import media import media
import os import os
from PySide6.QtCore import QObject, Signal, Slot from PySide6.QtCore import QObject, Signal, Slot
@ -14,7 +14,6 @@ import sys
logger = report.app_logging_config() logger = report.app_logging_config()
# TODO: Stop / Kill Threads before close: QThread: Destroyed while thread '' is still running # TODO: Stop / Kill Threads before close: QThread: Destroyed while thread '' is still running
# TODO: Stop Queues, on Error. Possibly better error handling.
# TODO: Error handling on cancelling cddb choose Dialog # TODO: Error handling on cancelling cddb choose Dialog
@ -26,107 +25,62 @@ MEDIA_GUI_DICT = {
} }
class RipQueueWorker(QObject): class ThisQueueWorker(QueueWorker):
progress_updated = Signal(int, int)
job_finished = Signal(int)
all_jobs_finished = Signal()
def __init__(self, basepath): def __init__(self, basepath):
self.__basepath__ = basepath
self.__job_data__ = {}
super().__init__() super().__init__()
self.queue = Queue() self.__basepath__ = basepath
self.__error_msg__ = None
def get_error(self):
return self.__error_msg__
def set_error(self, error_msg):
if self.__error_msg__ is None:
self.__error_msg__ = error_msg
@Slot(int)
def add_job(self, job_index, job_data):
self.__job_data__[job_index] = job_data
self.queue.put(job_index)
class RipQueueWorker(ThisQueueWorker):
def progress_adaption(self, value, track_num): def progress_adaption(self, value, track_num):
self.progress_updated.emit(50 * value, track_num - 1) self.progress_updated.emit(50 * value, track_num - 1)
@Slot() def action(self, job_index):
def run(self): logger.info("action: Start ripping track %02d...", job_index + 1)
while True: # Rip track job_index
job_index = self.queue.get() track_info = self.__job_data__.get(job_index)
track_info = self.__job_data__.get(job_index) if track_info is None:
if job_index is None or track_info is None: msg = "No track information found for track %02d" % job_index
break logger.error("action: " + msg)
# Rip track job_index self.set_error(msg)
else:
wavfile = media.track_to_targetpath(self.__basepath__, track_info, 'wav') wavfile = media.track_to_targetpath(self.__basepath__, track_info, 'wav')
try: try:
fstools.mkdir(os.path.dirname(wavfile)) fstools.mkdir(os.path.dirname(wavfile))
except PermissionError: except PermissionError:
msg = f"Unable to create ripping target path: {os.path.dirname(wavfile)}" msg = f"Insufficient permissions to create ripping target path: {os.path.dirname(wavfile)}"
logger.exception(msg) logger.error("action: " + msg)
self.set_error(msg) self.set_error(msg)
break else:
if media.disc_track_rip(job_index + 1, wavfile, self.progress_adaption) != 0: if media.disc_track_rip(job_index + 1, wavfile, self.progress_adaption) != 0:
msg = f"Unable to rip: {wavfile}" msg = f"Unable to rip: {wavfile}"
logger.exception(msg) logger.error("action: " + msg)
self.set_error(msg) self.set_error(msg)
break
if job_index in self.__job_data__:
del self.__job_data__[job_index]
self.job_finished.emit(job_index)
self.all_jobs_finished.emit()
class EncodeQueueWorker(QObject): class EncodeQueueWorker(ThisQueueWorker):
progress_updated = Signal(int, int)
job_finished = Signal(int)
all_jobs_finished = Signal()
def __init__(self, basepath):
self.__basepath__ = basepath
self.__job_data__ = {}
super().__init__()
self.queue = Queue()
self.__error_msg__ = None
def get_error(self):
return self.__error_msg__
def set_error(self, error_msg):
if self.__error_msg__ is None:
self.__error_msg__ = error_msg
@Slot(int)
def add_job(self, job_index, job_data):
self.__job_data__[job_index] = job_data
self.queue.put(job_index)
def progress_adaption(self, value, track_num): def progress_adaption(self, value, track_num):
self.progress_updated.emit(50 + 50 * value, track_num - 1) self.progress_updated.emit(50 + 50 * value, track_num - 1)
@Slot() def action(self, job_index):
def run(self): logger.info("action: Start encoding track %02d...", job_index + 1)
while True: # Rip track job_index
job_index = self.queue.get() track_info = self.__job_data__.get(job_index)
track_info = self.__job_data__.get(job_index) if track_info is None:
if job_index is None or track_info is None: msg = "No track information found for track %02d" % job_index
break logger.error("action: " + msg)
# Rip track job_index self.set_error(msg)
else:
wavfile = media.track_to_targetpath(self.__basepath__, track_info, 'wav') wavfile = media.track_to_targetpath(self.__basepath__, track_info, 'wav')
rv = media.wav_to_mp3(wavfile, self.__basepath__, track_info, self.progress_adaption) rv = media.wav_to_mp3(wavfile, self.__basepath__, track_info, self.progress_adaption)
os.remove(wavfile)
if rv != 0: if rv != 0:
msg = f"Unable to encode: {wavfile}" msg = f"Unable to encode: {wavfile}"
logger.exception(msg) logger.exception("action: " + msg)
self.set_error(msg) self.set_error(msg)
break try:
if job_index in self.__job_data__: os.remove(wavfile)
del self.__job_data__[job_index] except Exception as e:
self.job_finished.emit(job_index) logger.error("action: Unable to delete wavfile %s", wavfile)
self.all_jobs_finished.emit()
class RipMainWindow(MainWindow): class RipMainWindow(MainWindow):