import logging import os from PySide6.QtWidgets import ( QMainWindow, QMessageBox, QDialog, QDialogButtonBox, QWidget, QVBoxLayout, QHBoxLayout, QListWidget, QListWidgetItem, QLabel, QPlainTextEdit, QProgressBar, QPushButton, QLineEdit ) from PySide6.QtCore import ( Qt, SIGNAL, QObject, Signal, Slot, QThread ) from PySide6.QtGui import QPixmap, QIcon from queue import Queue import time try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) class QueueWorker(QObject): progress_updated = Signal(int, int) job_finished = Signal(int) all_jobs_finished = Signal() def __init__(self): super().__init__() self.queue = Queue() self.__stop__ = False self.__error_msg__ = None self.__job_data__ = {} @Slot() def stop(self): self.__stop__ = True @Slot() def stopped(self): return self.__stop__ @Slot() def get_error(self): return self.__error_msg__ @Slot(str) def set_error(self, error_msg): if self.__error_msg__ is None: self.__error_msg__ = error_msg @Slot() def empty(self): return self.queue.empty() @Slot(int) def add_job(self, job_index, job_data): self.__job_data__[job_index] = job_data self.queue.put(job_index) @Slot() def run(self): while not self.__stop__: if self.queue.empty() or self.__error_msg__ is not None: # do not execute, if job_index is not available or error_msg is set time.sleep(0.1) else: job_index = self.queue.get() self.action(job_index) if job_index in self.__job_data__: del self.__job_data__[job_index] self.job_finished.emit(job_index) self.all_jobs_finished.emit() @Slot(int) def action(self, job_index): pass class RipQueueWorker(QueueWorker): def __init__(self): super().__init__() @Slot(int) def action(self, job_index): ERROR_AT_TRACK = None # Give a number here # logger.info("action: Start ripping track %02d...", job_index + 1) for i in range(0, 51): time.sleep(0.1) if ERROR_AT_TRACK is not None: if job_index == ERROR_AT_TRACK and i > 10: msg = "Fehler beim rippen von track %02d" % (job_index + 1) logger.warning("action: " + msg) self.set_error(msg) return self.progress_updated.emit(i, job_index) class EncQueueWorker(QueueWorker): def __init__(self): super().__init__() @Slot(int) def action(self, job_index): ERROR_AT_TRACK = None # Give a number here # logger.info("action: Start encoding track %02d...", job_index + 1) for i in range(50, 101): time.sleep(0.05) if ERROR_AT_TRACK is not None: if job_index == ERROR_AT_TRACK and i > 60: msg = "Fehler beim encoden von track %02d" % (job_index + 1) logger.warning("action: " + msg) self.set_error(msg) return self.progress_updated.emit(i, job_index) class RowWidget(QWidget): """ Ein Widget, das eine einzelne Zeile in unserer Liste darstellt. Es enthält eine Nummer (QLabel), ein Textfeld (QPlainTextEdit) und einen Fortschrittsbalken (QProgressBar). """ def __init__(self, number: int, parent=None): super().__init__(parent) self.number_label = QLabel(f"{number}.") self.text_edit = QPlainTextEdit() self.text_edit.setTabChangesFocus(True) self.progress_bar = QProgressBar() self.progress_bar.setMaximumWidth(150) self.text_edit.setFixedHeight(31) self.number_label.setFixedWidth(30) self.number_label.setAlignment(Qt.AlignmentFlag.AlignTop | Qt.AlignmentFlag.AlignRight) layout = QHBoxLayout() layout.addWidget(self.number_label) layout.addWidget(self.text_edit) layout.addWidget(self.progress_bar) self.setLayout(layout) class MainWindow(QMainWindow): KEY_ALBUM_ARTIST = "Interpret" KEY_ALBUM_ALBUM = "Album" KEY_ALBUM_YEAR = "Jahr" KEY_ALBUM_GENRE = "Genre" FIELD_NAMES = [KEY_ALBUM_ARTIST, KEY_ALBUM_ALBUM, KEY_ALBUM_YEAR, KEY_ALBUM_GENRE] def __init__(self, basepath): super().__init__() self.__basepath__ = basepath # self.setWindowTitle("PyRip") self.setGeometry(100, 100, 900, 600) # Fensterbreite angepasst self.set_status("Ready...") app_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) pixmap = QPixmap(os.path.join(app_path, 'icon.xpm')) icon = QIcon(pixmap) self.setWindowIcon(icon) self.__album_lines__ = [] main_widget = QWidget() main_layout = QVBoxLayout(main_widget) self.setCentralWidget(main_widget) disc_layout = QHBoxLayout() album_layout = QVBoxLayout() # Create fields for Album Information for i in range(0, len(self.FIELD_NAMES)): # Add label label = QLabel(self.FIELD_NAMES[i]) # Add title line_edit = QLineEdit() line_edit.setMaximumWidth(300) line_edit.setPlaceholderText("Title...") # Add title to list for later use self.__album_lines__.append(line_edit) # Add both to album_layout album_layout.addWidget(label) album_layout.addWidget(line_edit) # Add stretch to place entries to top album_layout.addStretch() # Add album and disc_layout disc_layout.addLayout(album_layout) main_layout.addLayout(disc_layout) self.list_widget = QListWidget() disc_layout.addWidget(self.list_widget) button_layout = QHBoxLayout() main_layout.addLayout(button_layout) self.button_read = QPushButton("Read Disc") self.button_read.clicked.connect(self.read_it) button_layout.addWidget(self.button_read) self.button_rip = QPushButton("Rip Disc") self.button_rip.setDisabled(True) self.button_rip.clicked.connect(self.rip_all) button_layout.addWidget(self.button_rip) self.connect(self.list_widget, SIGNAL("itemDoubleClicked(QListWidgetItem *)"), self.single_track_selected) def clear(self): self.list_widget.clear() for key in self.FIELD_NAMES: self.set_album_field(key, "") def set_album_field(self, key, value): i = self.FIELD_NAMES.index(key) line = self.__album_lines__[i] line.setText(value) def get_album_field(self, key): i = self.FIELD_NAMES.index(key) if key == self.KEY_ALBUM_YEAR: return int(self.__album_lines__[i].text()) else: return self.__album_lines__[i].text() def append_title(self, value): num_tracks = self.list_widget.count() + 1 row_widget = RowWidget(num_tracks) row_widget.text_edit.setPlainText(value) row_widget.progress_bar.setValue(0) list_item = QListWidgetItem(self.list_widget) list_item.setSizeHint(row_widget.sizeHint()) self.list_widget.addItem(list_item) self.list_widget.setItemWidget(list_item, row_widget) def set_status(self, status): self.statusBar().showMessage(status) @Slot(int, int) def set_title_progress(self, value, bar_index): if 0 <= bar_index < self.list_widget.count(): list_item = self.list_widget.item(bar_index) row_widget = self.list_widget.itemWidget(list_item) row_widget.progress_bar.setValue(value) def read_it(self): print("read_it button action not yet implemented, adding example data...") for key in self.FIELD_NAMES: self.set_album_field(key, key) for track in range(0, 5): self.append_title("Track %02d" % (track + 1)) self.button_rip.setEnabled(True) def get_rip_worker(self): return RipQueueWorker() def get_encode_worker(self): return EncQueueWorker() def rip_all(self, clicked_data: bool = None, single_job: int = None): self.button_read.setEnabled(False) self.button_rip.setEnabled(False) self.rip_thread = QThread() self.rip_worker = self.get_rip_worker() self.rip_worker.moveToThread(self.rip_thread) self.enc_thread = QThread() self.enc_worker = self.get_encode_worker() self.enc_worker.moveToThread(self.enc_thread) # Connect Signals and Slots self.rip_worker.progress_updated.connect(self.set_title_progress) self.enc_worker.progress_updated.connect(self.set_title_progress) # We route the signal through the main window. self.rip_worker.job_finished.connect(self.rip_task_finished) self.enc_worker.job_finished.connect(self.enc_task_finished) self.rip_thread.started.connect(self.rip_worker.run) self.enc_thread.started.connect(self.enc_worker.run) self.rip_worker.all_jobs_finished.connect(self.rip_tasks_complete) self.enc_worker.all_jobs_finished.connect(self.enc_tasks_complete) self.rip_thread.start() self.enc_thread.start() self.set_status("rip_all: Start ripping...") if single_job is None: logger.info("rip_all: Starting rip process for whole disc...") for i in range(0, self.list_widget.count()): self.rip_worker.add_job(i, self.get_job_data(i)) elif 0 <= single_job < self.list_widget.count(): logger.info("rip_all: Starting rip process for track %02d", single_job + 1) self.rip_worker.add_job(single_job, self.get_job_data(single_job)) @Slot(int) def get_job_data(self, job_index): # placeholder for disc function return None def single_track_selected(self, list_item): list_index = self.list_widget.row(list_item) if self.button_rip.isEnabled(): self.rip_all(single_job=list_index) else: logger.warning("single_track_selected: A rip process is running ignoring rip request of track %02d", list_index + 1) def on_disc_removed(self, device): self.clear() def on_disc_new(self, device): self.read_it() def device_callback(self, device): action = device.action if 'ID_CDROM' in device and action == 'change': if device.properties.get('ID_CDROM_MEDIA') is None: self.on_disc_removed(device) if int(device.properties.get('ID_CDROM_MEDIA_TRACK_COUNT_AUDIO', 0)) > 0: self.on_disc_new(device) @Slot(int) def rip_task_finished(self, job_index): """ This slot receives the 'job_finished' signal from worker1 and explicitly adds the job to worker2. """ logger.info("rip_task_finished: Track %02d", job_index + 1) if self.rip_worker.get_error() is not None or self.enc_worker.get_error() is not None: # Error in one worker detected if not self.rip_worker.stopped(): logger.warning("rip_task_finished: Error detected. Stopping rip worker and setting progress of track %02d to 0%%.", job_index + 1) self.rip_worker.stop() self.set_title_progress(0, job_index) else: # Add next encode task only, if no errors are detected logger.debug("rip_task_finished: Starting encode task for track %02d", job_index + 1) self.enc_worker.add_job(job_index, self.get_job_data(job_index)) if self.rip_worker.empty(): logger.debug("rip_task_finished: Rip worker queue is empty. Stopping worker") self.rip_worker.stop() @Slot(int) def enc_task_finished(self, job_index): logger.info("enc_task_finished: Track %02d", job_index + 1) if self.rip_worker.get_error() is not None or self.enc_worker.get_error() is not None: # Error in one worker detected if not self.rip_worker.stopped(): logger.warning("enc_task_finished: Error detected. Stopping rip worker.") self.rip_worker.stop() if self.enc_worker.get_error() is not None: # Error in enc worker detected logger.warning("enc_task_finished: Encoding error detected. Setting progress of track %02d to 0%%.", job_index + 1) self.set_title_progress(0, job_index) @Slot() def rip_tasks_complete(self): logger.info("rip_tasks_complete: All rip tasks completed. Stopping encode worker which is already encoding the last track.") self.enc_worker.stop() @Slot() def enc_tasks_complete(self): logger.info("enc_tasks_complete: All encode tasks completed. Removing threads and enabling buttons.") self.rip_thread.quit() self.rip_thread.wait() self.enc_thread.quit() self.enc_thread.wait() self.rip_worker.deleteLater() self.enc_worker.deleteLater() self.rip_thread.deleteLater() self.enc_thread.deleteLater() rip_error = self.rip_worker.get_error() enc_error = self.enc_worker.get_error() if rip_error is None and enc_error is None: self.set_status("Finished ripping...") else: self.set_status("Ripping FAILED...") if rip_error is not None: msg = rip_error if enc_error is not None: msg += "\n" msg += enc_error else: msg = enc_error self.set_status(msg) QMessageBox.critical(self, "Error!", msg) self.button_read.setEnabled(True) self.button_rip.setEnabled(True) class SelectionDialog(QDialog): """ A custom dialog to select an item from a list. """ def __init__(self, parent, title, items): super().__init__(parent) self.setWindowTitle(title) self.setMinimumWidth(300) # Layout layout = QVBoxLayout(self) # List widget for options self.list_widget = QListWidget() self.list_widget.addItems(items) if items: self.list_widget.setCurrentRow(0) # Select first item by default # Add a double-click shortcut to accept the dialog self.list_widget.itemDoubleClicked.connect(self.accept) layout.addWidget(self.list_widget) # Standard OK and Cancel buttons button_box = QDialogButtonBox(QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel) button_box.accepted.connect(self.accept) button_box.rejected.connect(self.reject) layout.addWidget(button_box) def get_selected_index(self): """Returns the index of the currently selected item.""" return self.list_widget.currentRow()