import logging from PySide6.QtWidgets import ( QMainWindow, QMessageBox, QDialog, QDialogButtonBox, QWidget, QVBoxLayout, QHBoxLayout, QListWidget, QListWidgetItem, QLabel, QPlainTextEdit, QProgressBar, QPushButton, QLineEdit ) from PySide6.QtCore import ( Qt, SIGNAL, QObject, Signal, Slot, QThread ) from queue import Queue import time try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) class QueueWorker(QObject): progress_updated = Signal(int, int) job_finished = Signal(int) all_jobs_finished = Signal() def __init__(self, start_val, end_val): super().__init__() self.queue = Queue() self.start_val = start_val self.end_val = end_val self.__error_msg__ = None def get_error(self): return self.__error_msg__ def set_error(self, error_msg): if self.__error_msg__ is None: self.__error_msg__ = error_msg @Slot(int) def add_job(self, job_index, job_data): self.queue.put(job_index) @Slot() def run(self): while True: job_index = self.queue.get() if job_index is None: break for i in range(self.start_val, self.end_val + 1): time.sleep(0.02) self.progress_updated.emit(i, job_index) self.job_finished.emit(job_index) self.all_jobs_finished.emit() class RowWidget(QWidget): """ Ein Widget, das eine einzelne Zeile in unserer Liste darstellt. Es enthält eine Nummer (QLabel), ein Textfeld (QPlainTextEdit) und einen Fortschrittsbalken (QProgressBar). """ def __init__(self, number: int, parent=None): super().__init__(parent) self.number_label = QLabel(f"{number}.") self.text_edit = QPlainTextEdit() self.text_edit.setTabChangesFocus(True) self.progress_bar = QProgressBar() self.progress_bar.setMaximumWidth(150) self.text_edit.setFixedHeight(31) self.number_label.setFixedWidth(30) self.number_label.setAlignment(Qt.AlignmentFlag.AlignTop | Qt.AlignmentFlag.AlignRight) layout = QHBoxLayout() layout.addWidget(self.number_label) layout.addWidget(self.text_edit) layout.addWidget(self.progress_bar) self.setLayout(layout) class MainWindow(QMainWindow): KEY_ALBUM_ARTIST = "Interpret" KEY_ALBUM_ALBUM = "Album" KEY_ALBUM_YEAR = "Jahr" KEY_ALBUM_GENRE = "Genre" FIELD_NAMES = [KEY_ALBUM_ARTIST, KEY_ALBUM_ALBUM, KEY_ALBUM_YEAR, KEY_ALBUM_GENRE] def __init__(self, basepath): super().__init__() self.__basepath__ = basepath # self.setWindowTitle("PyRip") self.setGeometry(100, 100, 900, 600) # Fensterbreite angepasst self.set_status("Ready...") self.__album_lines__ = [] main_widget = QWidget() main_layout = QVBoxLayout(main_widget) self.setCentralWidget(main_widget) disc_layout = QHBoxLayout() album_layout = QVBoxLayout() # Create fields for Album Information for i in range(0, len(self.FIELD_NAMES)): # Add label label = QLabel(self.FIELD_NAMES[i]) # Add title line_edit = QLineEdit() line_edit.setMaximumWidth(300) line_edit.setPlaceholderText("Title...") # Add title to list for later use self.__album_lines__.append(line_edit) # Add both to album_layout album_layout.addWidget(label) album_layout.addWidget(line_edit) # Add stretch to place entries to top album_layout.addStretch() # Add album and disc_layout disc_layout.addLayout(album_layout) main_layout.addLayout(disc_layout) self.list_widget = QListWidget() disc_layout.addWidget(self.list_widget) button_layout = QHBoxLayout() main_layout.addLayout(button_layout) self.button_read = QPushButton("Read Disc") self.button_read.clicked.connect(self.read_it) button_layout.addWidget(self.button_read) self.button_rip = QPushButton("Rip Disc") self.button_rip.setDisabled(True) self.button_rip.clicked.connect(self.rip_all) button_layout.addWidget(self.button_rip) self.connect(self.list_widget, SIGNAL("itemDoubleClicked(QListWidgetItem *)"), self.single_track_selected) def clear(self): self.list_widget.clear() for key in self.FIELD_NAMES: self.set_album_field(key, "") def set_album_field(self, key, value): i = self.FIELD_NAMES.index(key) line = self.__album_lines__[i] line.setText(value) def get_album_field(self, key): i = self.FIELD_NAMES.index(key) if key == self.KEY_ALBUM_YEAR: return int(self.__album_lines__[i].text()) else: return self.__album_lines__[i].text() def append_title(self, value): num_tracks = self.list_widget.count() + 1 row_widget = RowWidget(num_tracks) row_widget.text_edit.setPlainText(value) row_widget.progress_bar.setValue(0) list_item = QListWidgetItem(self.list_widget) list_item.setSizeHint(row_widget.sizeHint()) self.list_widget.addItem(list_item) self.list_widget.setItemWidget(list_item, row_widget) def set_status(self, status): self.statusBar().showMessage(status) @Slot(int, int) def set_title_progress(self, value, bar_index): if 0 <= bar_index < self.list_widget.count(): list_item = self.list_widget.item(bar_index) row_widget = self.list_widget.itemWidget(list_item) row_widget.progress_bar.setValue(value) def read_it(self): print("read_it button action not yet implemented, adding example data...") for key in self.FIELD_NAMES: self.set_album_field(key, key) for track in range(0, 5): self.append_title("Track %02d" % (track + 1)) self.button_rip.setEnabled(True) def get_rip_worker(self): return QueueWorker(start_val=0, end_val=50) def get_encode_worker(self): return QueueWorker(start_val=50, end_val=100) def rip_all(self, clicked_data: bool = None, single_job: int = None): self.button_read.setEnabled(False) self.button_rip.setEnabled(False) self.thread1 = QThread() self.worker1 = self.get_rip_worker() self.worker1.moveToThread(self.thread1) self.thread2 = QThread() self.worker2 = self.get_encode_worker() self.worker2.moveToThread(self.thread2) # Connect Signals and Slots self.worker1.progress_updated.connect(self.set_title_progress) self.worker2.progress_updated.connect(self.set_title_progress) # We route the signal through the main window. self.worker1.job_finished.connect(self.schedule_phase_two_task) self.thread1.started.connect(self.worker1.run) self.thread2.started.connect(self.worker2.run) self.worker1.all_jobs_finished.connect(self.phase_one_complete) self.worker2.all_jobs_finished.connect(self.all_tasks_complete) self.thread1.start() self.thread2.start() self.set_status("Start ripping...") if single_job is None: for i in range(0, self.list_widget.count()): self.worker1.add_job(i, self.get_job_data(i)) elif 0 <= single_job < self.list_widget.count(): self.worker1.add_job(single_job, self.get_job_data(single_job)) self.worker1.add_job(None, None) def get_job_data(self, job_index): # placeholder for disc function return None def single_track_selected(self, list_item): list_index = self.list_widget.row(list_item) if self.button_rip.isEnabled(): self.rip_all(single_job=list_index) else: logger.warning("A rip process is running ignoring rip request of track %02d", list_index + 1) def on_disc_removed(self, device): self.clear() def on_disc_new(self, device): self.read_it() def device_callback(self, device): action = device.action if 'ID_CDROM' in device and action == 'change': if device.properties.get('ID_CDROM_MEDIA') is None: self.on_disc_removed(device) if int(device.properties.get('ID_CDROM_MEDIA_TRACK_COUNT_AUDIO', 0)) > 0: self.on_disc_new(device) @Slot(int) def schedule_phase_two_task(self, job_index): """ This slot receives the 'job_finished' signal from worker1 and explicitly adds the job to worker2. """ self.worker2.add_job(job_index, self.get_job_data(job_index)) @Slot() def phase_one_complete(self): self.worker2.add_job(None, None) @Slot() def all_tasks_complete(self): self.thread1.quit() self.thread1.wait() self.thread2.quit() self.thread2.wait() self.worker1.deleteLater() self.worker2.deleteLater() self.thread1.deleteLater() self.thread2.deleteLater() rip_error = self.worker1.get_error() enc_error = self.worker2.get_error() if rip_error is None and enc_error is None: self.set_status("Finished ripping...") else: self.set_status("Ripping FAILED...") if rip_error is not None: msg = rip_error if enc_error is not None: msg += "\n" msg += enc_error else: msg = enc_error self.set_status(msg) QMessageBox.critical(self, "Error!", msg) self.button_read.setEnabled(True) self.button_rip.setEnabled(True) class SelectionDialog(QDialog): """ A custom dialog to select an item from a list. """ def __init__(self, parent, title, items): super().__init__(parent) self.setWindowTitle(title) self.setMinimumWidth(300) # Layout layout = QVBoxLayout(self) # List widget for options self.list_widget = QListWidget() self.list_widget.addItems(items) if items: self.list_widget.setCurrentRow(0) # Select first item by default # Add a double-click shortcut to accept the dialog self.list_widget.itemDoubleClicked.connect(self.accept) layout.addWidget(self.list_widget) # Standard OK and Cancel buttons button_box = QDialogButtonBox(QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel) button_box.accepted.connect(self.accept) button_box.rejected.connect(self.reject) layout.addWidget(button_box) def get_selected_index(self): """Returns the index of the currently selected item.""" return self.list_widget.currentRow()