diff --git a/gui/__init__.py b/gui/__init__.py index 6f0c267..6b634c7 100644 --- a/gui/__init__.py +++ b/gui/__init__.py @@ -38,36 +38,97 @@ class QueueWorker(QObject): job_finished = Signal(int) all_jobs_finished = Signal() - def __init__(self, start_val, end_val): + def __init__(self): super().__init__() self.queue = Queue() - self.start_val = start_val - self.end_val = end_val + self.__stop__ = False self.__error_msg__ = None + self.__job_data__ = {} + @Slot() + def stop(self): + self.__stop__ = True + + @Slot() + def stopped(self): + return self.__stop__ + + @Slot() def get_error(self): return self.__error_msg__ + @Slot(str) def set_error(self, error_msg): if self.__error_msg__ is None: self.__error_msg__ = error_msg + @Slot() + def empty(self): + return self.queue.empty() + @Slot(int) def add_job(self, job_index, job_data): + self.__job_data__[job_index] = job_data self.queue.put(job_index) @Slot() def run(self): - while True: - job_index = self.queue.get() - if job_index is None: - break - for i in range(self.start_val, self.end_val + 1): - time.sleep(0.02) - self.progress_updated.emit(i, job_index) - self.job_finished.emit(job_index) + while not self.__stop__: + if self.queue.empty() or self.__error_msg__ is not None: + # do not execute, if job_index is not available or error_msg is set + time.sleep(0.1) + else: + job_index = self.queue.get() + self.action(job_index) + if job_index in self.__job_data__: + del self.__job_data__[job_index] + self.job_finished.emit(job_index) self.all_jobs_finished.emit() + @Slot(int) + def action(self, job_index): + pass + + +class RipQueueWorker(QueueWorker): + def __init__(self): + super().__init__() + + @Slot(int) + def action(self, job_index): + ERROR_AT_TRACK = None # Give a number here + # + logger.info("action: Start ripping track %02d...", job_index + 1) + for i in range(0, 51): + time.sleep(0.1) + if ERROR_AT_TRACK is not None: + if job_index == ERROR_AT_TRACK and i > 10: + msg = "Fehler beim rippen von track %02d" % (job_index + 1) + logger.warning("action: " + msg) + self.set_error(msg) + return + self.progress_updated.emit(i, job_index) + + +class EncQueueWorker(QueueWorker): + def __init__(self): + super().__init__() + + @Slot(int) + def action(self, job_index): + ERROR_AT_TRACK = None # Give a number here + # + logger.info("action: Start encoding track %02d...", job_index + 1) + for i in range(50, 101): + time.sleep(0.05) + if ERROR_AT_TRACK is not None: + if job_index == ERROR_AT_TRACK and i > 60: + msg = "Fehler beim encoden von track %02d" % (job_index + 1) + logger.warning("action: " + msg) + self.set_error(msg) + return + self.progress_updated.emit(i, job_index) + class RowWidget(QWidget): """ @@ -203,48 +264,50 @@ class MainWindow(QMainWindow): self.button_rip.setEnabled(True) def get_rip_worker(self): - return QueueWorker(start_val=0, end_val=50) + return RipQueueWorker() def get_encode_worker(self): - return QueueWorker(start_val=50, end_val=100) + return EncQueueWorker() def rip_all(self, clicked_data: bool = None, single_job: int = None): self.button_read.setEnabled(False) self.button_rip.setEnabled(False) - self.thread1 = QThread() - self.worker1 = self.get_rip_worker() - self.worker1.moveToThread(self.thread1) + self.rip_thread = QThread() + self.rip_worker = self.get_rip_worker() + self.rip_worker.moveToThread(self.rip_thread) - self.thread2 = QThread() - self.worker2 = self.get_encode_worker() - self.worker2.moveToThread(self.thread2) + self.enc_thread = QThread() + self.enc_worker = self.get_encode_worker() + self.enc_worker.moveToThread(self.enc_thread) # Connect Signals and Slots - self.worker1.progress_updated.connect(self.set_title_progress) - self.worker2.progress_updated.connect(self.set_title_progress) + self.rip_worker.progress_updated.connect(self.set_title_progress) + self.enc_worker.progress_updated.connect(self.set_title_progress) # We route the signal through the main window. - self.worker1.job_finished.connect(self.schedule_phase_two_task) + self.rip_worker.job_finished.connect(self.rip_task_finished) + self.enc_worker.job_finished.connect(self.enc_task_finished) - self.thread1.started.connect(self.worker1.run) - self.thread2.started.connect(self.worker2.run) + self.rip_thread.started.connect(self.rip_worker.run) + self.enc_thread.started.connect(self.enc_worker.run) - self.worker1.all_jobs_finished.connect(self.phase_one_complete) - self.worker2.all_jobs_finished.connect(self.all_tasks_complete) + self.rip_worker.all_jobs_finished.connect(self.rip_tasks_complete) + self.enc_worker.all_jobs_finished.connect(self.enc_tasks_complete) - self.thread1.start() - self.thread2.start() + self.rip_thread.start() + self.enc_thread.start() - self.set_status("Start ripping...") + self.set_status("rip_all: Start ripping...") if single_job is None: + logger.info("rip_all: Starting rip process for whole disc...") for i in range(0, self.list_widget.count()): - self.worker1.add_job(i, self.get_job_data(i)) + self.rip_worker.add_job(i, self.get_job_data(i)) elif 0 <= single_job < self.list_widget.count(): - self.worker1.add_job(single_job, self.get_job_data(single_job)) - - self.worker1.add_job(None, None) + logger.info("rip_all: Starting rip process for track %02d", single_job + 1) + self.rip_worker.add_job(single_job, self.get_job_data(single_job)) + @Slot(int) def get_job_data(self, job_index): # placeholder for disc function return None @@ -254,7 +317,7 @@ class MainWindow(QMainWindow): if self.button_rip.isEnabled(): self.rip_all(single_job=list_index) else: - logger.warning("A rip process is running ignoring rip request of track %02d", list_index + 1) + logger.warning("single_track_selected: A rip process is running ignoring rip request of track %02d", list_index + 1) def on_disc_removed(self, device): self.clear() @@ -271,31 +334,59 @@ class MainWindow(QMainWindow): self.on_disc_new(device) @Slot(int) - def schedule_phase_two_task(self, job_index): + def rip_task_finished(self, job_index): """ This slot receives the 'job_finished' signal from worker1 and explicitly adds the job to worker2. """ - self.worker2.add_job(job_index, self.get_job_data(job_index)) + logger.info("rip_task_finished: Track %02d", job_index + 1) + if self.rip_worker.get_error() is not None or self.enc_worker.get_error() is not None: + # Error in one worker detected + if not self.rip_worker.stopped(): + logger.warning("rip_task_finished: Error detected. Stopping rip worker and setting progress of track %02d to 0%%.", job_index + 1) + self.rip_worker.stop() + self.set_title_progress(0, job_index) + else: + # Add next encode task only, if no errors are detected + logger.debug("rip_task_finished: Starting encode task for track %02d", job_index + 1) + self.enc_worker.add_job(job_index, self.get_job_data(job_index)) + if self.rip_worker.empty(): + logger.debug("rip_task_finished: Rip worker queue is empty. Stopping worker") + self.rip_worker.stop() + + @Slot(int) + def enc_task_finished(self, job_index): + logger.info("enc_task_finished: Track %02d", job_index + 1) + if self.rip_worker.get_error() is not None or self.enc_worker.get_error() is not None: + # Error in one worker detected + if not self.rip_worker.stopped(): + logger.warning("enc_task_finished: Error detected. Stopping rip worker.") + self.rip_worker.stop() + if self.enc_worker.get_error() is not None: + # Error in enc worker detected + logger.warning("enc_task_finished: Encoding error detected. Setting progress of track %02d to 0%%.", job_index + 1) + self.set_title_progress(0, job_index) @Slot() - def phase_one_complete(self): - self.worker2.add_job(None, None) + def rip_tasks_complete(self): + logger.info("rip_tasks_complete: All rip tasks completed. Stopping encode worker which is already encoding the last track.") + self.enc_worker.stop() @Slot() - def all_tasks_complete(self): - self.thread1.quit() - self.thread1.wait() - self.thread2.quit() - self.thread2.wait() + def enc_tasks_complete(self): + logger.info("enc_tasks_complete: All encode tasks completed. Removing threads and enabling buttons.") + self.rip_thread.quit() + self.rip_thread.wait() + self.enc_thread.quit() + self.enc_thread.wait() - self.worker1.deleteLater() - self.worker2.deleteLater() - self.thread1.deleteLater() - self.thread2.deleteLater() + self.rip_worker.deleteLater() + self.enc_worker.deleteLater() + self.rip_thread.deleteLater() + self.enc_thread.deleteLater() - rip_error = self.worker1.get_error() - enc_error = self.worker2.get_error() + rip_error = self.rip_worker.get_error() + enc_error = self.enc_worker.get_error() if rip_error is None and enc_error is None: self.set_status("Finished ripping...") else: diff --git a/pyripgui.py b/pyripgui.py index 4f1da6e..ebcfc83 100644 --- a/pyripgui.py +++ b/pyripgui.py @@ -1,6 +1,6 @@ import argparse import fstools -from gui import MainWindow, SelectionDialog +from gui import MainWindow, SelectionDialog, QueueWorker import media import os from PySide6.QtCore import QObject, Signal, Slot @@ -14,7 +14,6 @@ import sys logger = report.app_logging_config() # TODO: Stop / Kill Threads before close: QThread: Destroyed while thread '' is still running -# TODO: Stop Queues, on Error. Possibly better error handling. # TODO: Error handling on cancelling cddb choose Dialog @@ -26,107 +25,62 @@ MEDIA_GUI_DICT = { } -class RipQueueWorker(QObject): - progress_updated = Signal(int, int) - job_finished = Signal(int) - all_jobs_finished = Signal() - +class ThisQueueWorker(QueueWorker): def __init__(self, basepath): - self.__basepath__ = basepath - self.__job_data__ = {} super().__init__() - self.queue = Queue() - self.__error_msg__ = None + self.__basepath__ = basepath - def get_error(self): - return self.__error_msg__ - - def set_error(self, error_msg): - if self.__error_msg__ is None: - self.__error_msg__ = error_msg - - @Slot(int) - def add_job(self, job_index, job_data): - self.__job_data__[job_index] = job_data - self.queue.put(job_index) +class RipQueueWorker(ThisQueueWorker): def progress_adaption(self, value, track_num): self.progress_updated.emit(50 * value, track_num - 1) - @Slot() - def run(self): - while True: - job_index = self.queue.get() - track_info = self.__job_data__.get(job_index) - if job_index is None or track_info is None: - break - # Rip track job_index + def action(self, job_index): + logger.info("action: Start ripping track %02d...", job_index + 1) + # Rip track job_index + track_info = self.__job_data__.get(job_index) + if track_info is None: + msg = "No track information found for track %02d" % job_index + logger.error("action: " + msg) + self.set_error(msg) + else: wavfile = media.track_to_targetpath(self.__basepath__, track_info, 'wav') try: fstools.mkdir(os.path.dirname(wavfile)) except PermissionError: - msg = f"Unable to create ripping target path: {os.path.dirname(wavfile)}" - logger.exception(msg) + msg = f"Insufficient permissions to create ripping target path: {os.path.dirname(wavfile)}" + logger.error("action: " + msg) self.set_error(msg) - break - if media.disc_track_rip(job_index + 1, wavfile, self.progress_adaption) != 0: - msg = f"Unable to rip: {wavfile}" - logger.exception(msg) - self.set_error(msg) - break - if job_index in self.__job_data__: - del self.__job_data__[job_index] - self.job_finished.emit(job_index) - self.all_jobs_finished.emit() + else: + if media.disc_track_rip(job_index + 1, wavfile, self.progress_adaption) != 0: + msg = f"Unable to rip: {wavfile}" + logger.error("action: " + msg) + self.set_error(msg) -class EncodeQueueWorker(QObject): - progress_updated = Signal(int, int) - job_finished = Signal(int) - all_jobs_finished = Signal() - - def __init__(self, basepath): - self.__basepath__ = basepath - self.__job_data__ = {} - super().__init__() - self.queue = Queue() - self.__error_msg__ = None - - def get_error(self): - return self.__error_msg__ - - def set_error(self, error_msg): - if self.__error_msg__ is None: - self.__error_msg__ = error_msg - - @Slot(int) - def add_job(self, job_index, job_data): - self.__job_data__[job_index] = job_data - self.queue.put(job_index) - +class EncodeQueueWorker(ThisQueueWorker): def progress_adaption(self, value, track_num): self.progress_updated.emit(50 + 50 * value, track_num - 1) - @Slot() - def run(self): - while True: - job_index = self.queue.get() - track_info = self.__job_data__.get(job_index) - if job_index is None or track_info is None: - break - # Rip track job_index + def action(self, job_index): + logger.info("action: Start encoding track %02d...", job_index + 1) + # Rip track job_index + track_info = self.__job_data__.get(job_index) + if track_info is None: + msg = "No track information found for track %02d" % job_index + logger.error("action: " + msg) + self.set_error(msg) + else: wavfile = media.track_to_targetpath(self.__basepath__, track_info, 'wav') rv = media.wav_to_mp3(wavfile, self.__basepath__, track_info, self.progress_adaption) - os.remove(wavfile) if rv != 0: msg = f"Unable to encode: {wavfile}" - logger.exception(msg) + logger.exception("action: " + msg) self.set_error(msg) - break - if job_index in self.__job_data__: - del self.__job_data__[job_index] - self.job_finished.emit(job_index) - self.all_jobs_finished.emit() + try: + os.remove(wavfile) + except Exception as e: + logger.error("action: Unable to delete wavfile %s", wavfile) class RipMainWindow(MainWindow):