PySide6 Architecture & Best Practices Guide¶
Guide für Signal/Slot-Systeme, MVC-Pattern und Qt6-spezifische Architektur
Inhaltsverzeichnis¶
- Signal & Slot System
- Connection Types
- MVC/MVP Pattern in PySide6
- Threading Architecture
- Memory Management
- Performance Optimization
- Best Practices Zusammenfassung
Signal & Slot System¶
Grundprinzipien¶
Das Signal/Slot-System ist das Herzstück der Qt-Architektur. Es ermöglicht lose Kopplung zwischen Komponenten und ereignisgesteuerte Kommunikation.
from PySide6.QtCore import QObject, Signal, Slot
class DataModel(QObject):
# Signal-Deklaration auf Klassenebene
data_changed = Signal(str)
value_updated = Signal(int, str) # Mehrere Parameter
def __init__(self):
super().__init__()
self._data = ""
def update_data(self, new_data: str):
self._data = new_data
self.data_changed.emit(new_data) # Signal emittieren
Wann Signals & Slots verwenden?¶
✅ Verwende Signals & Slots für:¶
1. Kommunikation zwischen Komponenten
# Model → View Kommunikation
class DataModel(QObject):
data_changed = Signal(dict)
def process_data(self, data):
result = self._process(data)
self.data_changed.emit(result) # View wird automatisch informiert
# View lauscht auf Model
class DataView(QWidget):
def __init__(self, model: DataModel):
super().__init__()
model.data_changed.connect(self.update_display)
@Slot(dict)
def update_display(self, data):
# GUI aktualisieren
pass
2. Thread-übergreifende Kommunikation
class Worker(QObject):
progress = Signal(int)
finished = Signal(str)
def do_work(self):
for i in range(100):
# Schwere Berechnung
self.progress.emit(i) # Sicher von anderem Thread
self.finished.emit("Done")
# Im Main Thread
worker = Worker()
worker.progress.connect(self.update_progress_bar) # Thread-safe!
3. Event Propagation
class CustomWidget(QWidget):
# Eigene Events als Signals
item_selected = Signal(int)
validation_failed = Signal(str)
def handle_user_action(self, item_id):
if self.validate(item_id):
self.item_selected.emit(item_id)
else:
self.validation_failed.emit("Invalid item")
4. Entkopplung von Business Logic und UI
# Service Layer emittiert Signals
class AuthService(QObject):
login_successful = Signal(str) # username
login_failed = Signal(str) # error message
def login(self, username, password):
if self._authenticate(username, password):
self.login_successful.emit(username)
else:
self.login_failed.emit("Invalid credentials")
# UI reagiert darauf
class LoginView(QWidget):
def __init__(self, auth_service: AuthService):
super().__init__()
auth_service.login_successful.connect(self.on_login_success)
auth_service.login_failed.connect(self.on_login_failed)
❌ Verwende KEINE Signals & Slots für:¶
1. Synchrone Funktionsaufrufe im gleichen Thread
# ❌ Falsch: Unnötiger Overhead
class Manager(QObject):
calculate = Signal(int)
def process(self):
self.calculate.emit(42) # Unnötig kompliziert
# ✅ Richtig: Direkte Methode
class Manager:
def process(self):
result = self.calculate(42) # Einfacher Funktionsaufruf
return result
2. Getter/Setter Funktionalität
# ❌ Falsch: Signals für Properties
class DataModel(QObject):
get_value = Signal()
@Slot()
def value(self):
return self._value
# ✅ Richtig: Properties oder direkte Methoden
class DataModel(QObject):
@property
def value(self):
return self._value
3. Konstruktor-Callbacks
# ❌ Falsch: Signals in __init__ verbinden und sofort emittieren
class BadWidget(QWidget):
def __init__(self):
super().__init__()
self.ready = Signal()
self.ready.emit() # Event Loop läuft noch nicht!
# ✅ Richtig: Initialisierung nach Event Loop Start
class GoodWidget(QWidget):
initialized = Signal()
def showEvent(self, event):
super().showEvent(event)
QTimer.singleShot(0, self.initialized.emit)
Der @Slot Decorator¶
Warum @Slot verwenden?
from PySide6.QtCore import Slot
class MyWidget(QWidget):
# ✅ MIT @Slot: Empfohlen
@Slot(int, str)
def handle_data(self, value: int, text: str):
print(f"Value: {value}, Text: {text}")
# ✅ OHNE @Slot: Funktioniert auch
def handle_click(self):
print("Clicked")
Vorteile des @Slot Decorators:
- Performance: ~10-15% schneller bei häufigen Aufrufen
- Type Safety: Frühzeitige Fehlerkennung bei Type Mismatches
- Memory: Bessere Memory Management bei vielen Connections
- Dokumentation: Explizite Kennzeichnung von Slot-Methoden
- Metaobject System: Bessere Integration mit Qt's Reflection
Wann @Slot weglassen? - Prototyping und schnelle Tests - Lambdas (funktioniert nicht mit @Slot) - Einmalige, einfache Callbacks
# Lambda: Kein @Slot möglich
button.clicked.connect(lambda: print("Clicked"))
# Slot für Production Code
@Slot()
def on_button_clicked(self):
print("Clicked")
button.clicked.connect(self.on_button_clicked)
Custom Signals¶
Signal-Definitionen:
from PySide6.QtCore import QObject, Signal
class DataProcessor(QObject):
# Einfaches Signal
started = Signal()
# Signal mit Parametern
progress = Signal(int)
# Mehrere Parameter
data_ready = Signal(str, int, bool)
# Benanntes Signal (für QML)
status_changed = Signal(str, arguments=['status'])
# Mehrere Signaturen (LEGACY - nicht empfohlen)
value_changed = Signal([int], [str]) # int ODER str
Best Practice für Custom Signals:
class Worker(QObject):
"""
Worker mit sauber definierten Signals.
Signals:
progress(int): Fortschritt von 0-100
finished(dict): Ergebnis der Berechnung
error(str): Fehlermeldung wenn etwas schiefgeht
"""
progress = Signal(int)
finished = Signal(dict)
error = Signal(str)
def __init__(self):
super().__init__()
def do_work(self):
try:
for i in range(100):
# Arbeit...
self.progress.emit(i)
self.finished.emit({"status": "success"})
except Exception as e:
self.error.emit(str(e))
Signal-Verbindungen verwalten¶
Verbindungen herstellen:
# Standard-Verbindung
signal.connect(slot)
# Mit Connection Type
signal.connect(slot, Qt.ConnectionType.QueuedConnection)
# Connection Object speichern
connection = signal.connect(slot)
# Prüfen ob verbunden
if connection:
print("Connected successfully")
Verbindungen trennen:
class MyWidget(QWidget):
def __init__(self):
super().__init__()
self.timer = QTimer()
self._connection = self.timer.timeout.connect(self.update)
def pause(self):
# Spezifischen Slot trennen
self.timer.timeout.disconnect(self.update)
# ODER: Connection Object verwenden
# self._connection.disconnect()
def stop(self):
# Alle Verbindungen trennen
self.timer.timeout.disconnect()
Automatische Trennung:
# ✅ Gut: Parent-Child Beziehung nutzen
class ParentWidget(QWidget):
def __init__(self):
super().__init__()
# Child wird automatisch gelöscht
self.child = ChildWidget(parent=self)
# Connections werden automatisch getrennt
# ✅ Gut: Explizites Cleanup
class Worker(QObject):
def cleanup(self):
# Alle eigenen Signals trennen
self.finished.disconnect()
self.progress.disconnect()
self.deleteLater()
Fortgeschrittene Techniken¶
1. Lambda mit Zusatzdaten¶
class ButtonManager:
def __init__(self):
self.buttons = []
for i in range(10):
btn = QPushButton(f"Button {i}")
# Lambda um Index zu übergeben
btn.clicked.connect(lambda checked, idx=i: self.on_click(idx))
self.buttons.append(btn)
def on_click(self, button_id: int):
print(f"Button {button_id} clicked")
2. Signal Chaining¶
class DataPipeline(QObject):
input_received = Signal(bytes)
decoded = Signal(str)
processed = Signal(dict)
def __init__(self):
super().__init__()
# Signals verketten
self.input_received.connect(self._decode)
self.decoded.connect(self._process)
@Slot(bytes)
def _decode(self, data: bytes):
self.decoded.emit(data.decode('utf-8'))
@Slot(str)
def _process(self, text: str):
result = json.loads(text)
self.processed.emit(result)
3. Signal Multiplexing¶
class SignalRouter(QObject):
"""Einen Signal-Typ auf verschiedene Handler routen."""
message = Signal(str, str) # (type, content)
def __init__(self):
super().__init__()
self.handlers = {
'error': self.handle_error,
'warning': self.handle_warning,
'info': self.handle_info,
}
self.message.connect(self._route)
@Slot(str, str)
def _route(self, msg_type: str, content: str):
handler = self.handlers.get(msg_type)
if handler:
handler(content)
4. One-Shot Connections¶
# Connection, die sich nach erstem Signal selbst trennt
def one_shot_connect(signal, slot):
def wrapper(*args, **kwargs):
signal.disconnect(wrapper)
slot(*args, **kwargs)
signal.connect(wrapper)
# Verwendung
one_shot_connect(button.clicked, self.handle_first_click)
Connection Types¶
Überblick über Connection Types¶
Qt bietet verschiedene Connection Types, die bestimmen, wann und in welchem Thread ein Slot ausgeführt wird:
from PySide6.QtCore import Qt
# Connection Types:
Qt.ConnectionType.AutoConnection # Standard (automatische Wahl)
Qt.ConnectionType.DirectConnection # Sofortige Ausführung
Qt.ConnectionType.QueuedConnection # Über Event Queue
Qt.ConnectionType.BlockingQueuedConnection # Blockierend
Qt.ConnectionType.UniqueConnection # Verhindert Duplikate
1. AutoConnection (Standard)¶
Verhalten: - Gleicher Thread: DirectConnection (sofortiger Aufruf) - Verschiedene Threads: QueuedConnection (über Event Queue)
# Default - wird automatisch gewählt
signal.connect(slot)
# Gleichwertig mit:
signal.connect(slot, Qt.ConnectionType.AutoConnection)
Entscheidungslogik:
Wann verwenden: - Immer, wenn keine spezielle Thread-Semantik benötigt wird - Standard für 99% der Fälle - Qt trifft die richtige Entscheidung
2. DirectConnection¶
Verhalten: - Slot wird sofort im Thread des Senders ausgeführt - Funktioniert wie normaler Funktionsaufruf - Kein Event Loop nötig
Use Cases:
class PerformanceCritical(QObject):
data_ready = Signal(bytes)
def __init__(self):
super().__init__()
# DirectConnection für minimale Latenz
self.data_ready.connect(
self.process_immediately,
Qt.ConnectionType.DirectConnection
)
@Slot(bytes)
def process_immediately(self, data: bytes):
# Wird SOFORT im emittierenden Thread ausgeführt
# Keine Verzögerung durch Event Queue
pass
⚠️ Gefahren bei verschiedenen Threads:
# ❌ GEFÄHRLICH: GUI-Update aus Worker Thread
class DangerousWorker(QObject):
result = Signal(str)
def work(self):
# Läuft in Worker Thread
self.result.emit("Done") # DirectConnection!
# In Main Thread
worker = DangerousWorker()
# DirectConnection bedeutet: update_label läuft im Worker Thread!
worker.result.connect(
self.update_label, # GUI-Operation
Qt.ConnectionType.DirectConnection # CRASH!
)
✅ Sichere Verwendung:
# Nur wenn GARANTIERT im selben Thread
class SameThreadProcessor(QObject):
def __init__(self):
super().__init__()
self.internal_signal.connect(
self._internal_handler,
Qt.ConnectionType.DirectConnection
)
def _internal_handler(self):
# Sicher: beide im selben Thread
pass
3. QueuedConnection¶
Verhalten: - Signal wird in Event Queue eingereiht - Slot wird ausgeführt, wenn Event Loop den Event verarbeitet - Immer im Thread des Empfängers
Use Cases:
A) Thread-sichere GUI Updates:
class Worker(QObject):
update_ui = Signal(str)
def do_work(self):
# Läuft in Worker Thread
for i in range(100):
result = self.calculate(i)
# Sicher: wird in Main Thread ausgeführt
self.update_ui.emit(result)
# In Main Thread
worker = Worker()
worker.update_ui.connect(
self.label.setText, # GUI-Operation
Qt.ConnectionType.QueuedConnection # Thread-safe!
)
B) Rekursion vermeiden:
class RecursiveSignal(QObject):
trigger = Signal()
def __init__(self):
super().__init__()
# QueuedConnection verhindert Stack Overflow
self.trigger.connect(
self.on_trigger,
Qt.ConnectionType.QueuedConnection
)
@Slot()
def on_trigger(self):
# Ohne QueuedConnection: Endlos-Rekursion
self.trigger.emit()
C) State Änderungen batchen:
class DataModel(QObject):
items_changed = Signal()
def __init__(self):
super().__init__()
self._pending_update = False
self.items_changed.connect(
self._update_view,
Qt.ConnectionType.QueuedConnection
)
def add_item(self, item):
self._items.append(item)
if not self._pending_update:
self._pending_update = True
self.items_changed.emit() # Einmal pro Event Loop
@Slot()
def _update_view(self):
self._pending_update = False
# Alle Änderungen auf einmal verarbeiten
Wichtig: Parameter werden kopiert
class DataProcessor(QObject):
data_signal = Signal(list)
def send_data(self):
data = [1, 2, 3]
self.data_signal.emit(data)
# Qt kopiert die Liste für QueuedConnection
data.append(4) # Slot sieht nur [1, 2, 3]
4. BlockingQueuedConnection¶
Verhalten: - Wie QueuedConnection, aber Sender wartet auf Completion - Slot läuft in Empfänger Thread - Sender Thread wird blockiert
⚠️ GEFAHR: Deadlock!
# ❌ DEADLOCK: Gleicher Thread
class SameThreadDeadlock(QObject):
signal = Signal()
def __init__(self):
super().__init__()
# DEADLOCK: Kann nicht auf sich selbst warten!
self.signal.connect(
self.slot,
Qt.ConnectionType.BlockingQueuedConnection
)
def emit_signal(self):
self.signal.emit() # Blockiert für immer
# ❌ DEADLOCK: Circular Wait
# Thread A wartet auf Thread B
# Thread B wartet auf Thread A
✅ Legitime Verwendung:
class MainThread(QObject):
request = Signal(str)
def query_worker(self, query: str) -> str:
# Main Thread wartet auf Worker
result = []
def store_result(data):
result.append(data)
self.request.connect(
store_result,
Qt.ConnectionType.BlockingQueuedConnection
)
self.request.emit(query)
return result[0]
# Worker empfängt und antwortet synchron
Regel: Nur wenn absolut notwendig! - Worker muss nie zurück zum Main Thread signalisieren - Klare, einseitige Kommunikation - Kurze Ausführungszeit - Bevorzuge stattdessen: Callback-Signals oder Promises
5. UniqueConnection¶
Verhalten: - Verhindert duplizierte Connections - Kombinierbar mit anderen Types
# Einzeln
signal.connect(slot, Qt.ConnectionType.UniqueConnection)
# Kombiniert mit QueuedConnection
signal.connect(
slot,
Qt.ConnectionType.QueuedConnection | Qt.ConnectionType.UniqueConnection
)
Use Cases:
class DynamicConnector(QObject):
def add_listener(self, callback):
# Callback wird nur einmal verbunden
self.signal.connect(
callback,
Qt.ConnectionType.UniqueConnection
)
# Wiederholte Aufrufe ignoriert
# Verhindert Memory Leaks bei dynamischen Connections
class Plugin:
def activate(self):
# Sicher auch bei mehrmaligem Activate
app.plugin_event.connect(
self.handle,
Qt.ConnectionType.UniqueConnection
)
Connection Type Entscheidungsbaum¶
┌─ Brauche ich thread-safe Communication?
│
├─ NEIN → AutoConnection (Standard)
│
├─ JA, verschiedene Threads
│ │
│ ├─ Normaler Fall → QueuedConnection
│ │
│ ├─ Muss auf Antwort warten → BlockingQueuedConnection
│ │ (⚠️ Vorsicht Deadlock!)
│ │
│ └─ Minimale Latenz → DirectConnection
│ (⚠️ Nur wenn thread-safe!)
│
└─ JA, gleicher Thread
│
├─ Performance kritisch → DirectConnection
│
└─ Rekursion vermeiden → QueuedConnection
Performance Vergleich¶
# Benchmark (Näherungswerte)
# DirectConnection: ~0.01 µs (Funktionsaufruf)
# AutoConnection: ~0.01 µs (gleicher Thread)
# QueuedConnection: ~100 µs (Event Queue Overhead)
# BlockingQueued: ~100 µs + Wait Time
Best Practices¶
class BestPracticeWidget(QWidget):
def __init__(self):
super().__init__()
# ✅ Standard: AutoConnection (nichts angeben)
self.button.clicked.connect(self.on_click)
# ✅ Thread Communication: Explizit QueuedConnection
self.worker.result.connect(
self.update_ui,
Qt.ConnectionType.QueuedConnection
)
# ✅ Unique: Dynamische Connections
self.plugin_manager.event.connect(
self.handle_plugin_event,
Qt.ConnectionType.UniqueConnection
)
# ❌ NICHT: Blocking ohne guten Grund
# self.signal.connect(
# self.slot,
# Qt.ConnectionType.BlockingQueuedConnection
# )
Zusammenfassung: - AutoConnection: Standard, immer verwenden wenn möglich - QueuedConnection: Thread-Kommunikation, GUI-Updates - DirectConnection: Performance-kritische, gleicher Thread - BlockingQueuedConnection: Nur wenn unbedingt nötig - UniqueConnection: Dynamische Verbindungen
MVC/MVP Pattern in PySide6¶
Qt's Model/View Architektur¶
Qt verwendet eine Model/View Architektur (nicht klassisches MVC), bei der View und Controller verschmolzen sind:
┌─────────────────────────────────────────┐
│ Klassisches MVC │
│ ┌──────┐ ┌──────────┐ ┌──────┐ │
│ │Model │◄───│Controller│───│ View │ │
│ └──────┘ └──────────┘ └──────┘ │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ Qt Model/View │
│ ┌──────┐ ┌─────────────────┐ │
│ │Model │◄────────│ View/Controller│ │
│ └──────┘ └─────────────────┘ │
│ │ │ │
│ └──────Signals─────────┘ │
└─────────────────────────────────────────┘
Architektur-Patterns für PySide6¶
Option 1: Qt Model/View (für Listen/Tabellen/Trees)¶
Verwendung: QListView, QTableView, QTreeView mit Models
from PySide6.QtCore import QAbstractListModel, QModelIndex, Qt
class TodoModel(QAbstractListModel):
"""
Model für Todo-Liste.
Verwaltet Daten und notifiziert Views bei Änderungen.
"""
def __init__(self, todos: list[str] = None):
super().__init__()
self._todos = todos or []
# === Pflicht-Methoden für QAbstractListModel ===
def rowCount(self, parent=QModelIndex()):
"""Anzahl der Einträge"""
return len(self._todos)
def data(self, index: QModelIndex, role: int):
"""Daten für bestimmten Index und Role"""
if not index.isValid():
return None
if role == Qt.ItemDataRole.DisplayRole:
return self._todos[index.row()]
return None
# === Daten-Manipulation ===
def add_todo(self, text: str):
"""Fügt Todo hinzu und notifiziert Views"""
row = len(self._todos)
self.beginInsertRows(QModelIndex(), row, row)
self._todos.append(text)
self.endInsertRows() # Triggert View Update
def remove_todo(self, row: int):
"""Entfernt Todo und notifiziert Views"""
if 0 <= row < len(self._todos):
self.beginRemoveRows(QModelIndex(), row, row)
del self._todos[row]
self.endRemoveRows()
def get_todos(self) -> list[str]:
"""Direkt auf Daten zugreifen"""
return self._todos.copy()
class TodoView(QWidget):
"""
View zeigt Model-Daten an.
Reagiert automatisch auf Model-Änderungen.
"""
def __init__(self, model: TodoModel):
super().__init__()
layout = QVBoxLayout(self)
# QListView verbindet sich mit Model
self.list_view = QListView()
self.list_view.setModel(model) # Automatische Updates!
self.input = QLineEdit()
self.add_button = QPushButton("Add")
layout.addWidget(self.list_view)
layout.addWidget(self.input)
layout.addWidget(self.add_button)
# View emittiert User Actions als Signals
self.add_button.clicked.connect(self._on_add_clicked)
def _on_add_clicked(self):
"""View-Logic: User Input validieren"""
text = self.input.text().strip()
if text:
# Signal an Controller/Presenter
self.add_requested.emit(text)
self.input.clear()
add_requested = Signal(str) # View → Controller Communication
Vorteile: - Automatische Updates: Model-Änderungen propagieren automatisch zu allen Views - Mehrere Views: Ein Model kann mehrere Views haben - Optimiert: Qt's internes Caching und Change Tracking
Option 2: MVP (Model-View-Presenter)¶
Verwendung: Custom Widgets, komplexe Business Logic
# ===== MODEL =====
class DataModel(QObject):
"""
Pures Datenmodell ohne GUI-Wissen.
Emittiert Signals bei Änderungen.
"""
data_changed = Signal(dict)
error_occurred = Signal(str)
def __init__(self):
super().__init__()
self._data = {}
def load_data(self, source: str):
"""Lädt Daten aus Quelle"""
try:
self._data = self._fetch_from_source(source)
self.data_changed.emit(self._data)
except Exception as e:
self.error_occurred.emit(str(e))
def get_data(self) -> dict:
"""Direkter Datenzugriff für Presenter"""
return self._data.copy()
def update_field(self, key: str, value):
"""Daten updaten"""
self._data[key] = value
self.data_changed.emit(self._data)
# ===== VIEW =====
class DataView(QWidget):
"""
Passive View: Nur Darstellung, keine Logic.
Kommuniziert nur mit Presenter via Signals.
"""
# View → Presenter Signals
load_requested = Signal(str)
save_requested = Signal()
field_changed = Signal(str, str)
def __init__(self):
super().__init__()
self._setup_ui()
def _setup_ui(self):
"""UI Setup"""
layout = QVBoxLayout(self)
self.source_input = QLineEdit()
self.load_button = QPushButton("Load")
self.save_button = QPushButton("Save")
self.data_display = QTextEdit()
self.status_label = QLabel()
layout.addWidget(self.source_input)
layout.addWidget(self.load_button)
layout.addWidget(self.data_display)
layout.addWidget(self.save_button)
layout.addWidget(self.status_label)
# User Actions → Signals
self.load_button.clicked.connect(self._on_load)
self.save_button.clicked.connect(self.save_requested)
def _on_load(self):
source = self.source_input.text()
self.load_requested.emit(source)
# === Presenter ruft diese Methoden auf ===
def display_data(self, data: dict):
"""Zeigt Daten an"""
self.data_display.setPlainText(str(data))
def show_status(self, message: str):
"""Status anzeigen"""
self.status_label.setText(message)
def show_error(self, error: str):
"""Fehler anzeigen"""
QMessageBox.critical(self, "Error", error)
def enable_save(self, enabled: bool):
"""Save Button aktivieren/deaktivieren"""
self.save_button.setEnabled(enabled)
# ===== PRESENTER =====
class DataPresenter(QObject):
"""
Presenter: Verbindet Model und View.
Enthält Presentation Logic.
"""
def __init__(self, model: DataModel, view: DataView):
super().__init__()
self._model = model
self._view = view
# Model → Presenter → View
self._model.data_changed.connect(self._on_data_changed)
self._model.error_occurred.connect(self._view.show_error)
# View → Presenter → Model
self._view.load_requested.connect(self._on_load_requested)
self._view.save_requested.connect(self._on_save_requested)
@Slot(str)
def _on_load_requested(self, source: str):
"""User will Daten laden"""
if not source:
self._view.show_error("Please enter a source")
return
self._view.show_status("Loading...")
self._view.enable_save(False)
# Delegate zu Model
self._model.load_data(source)
@Slot(dict)
def _on_data_changed(self, data: dict):
"""Model hat neue Daten"""
# Transform für View
self._view.display_data(data)
self._view.show_status("Data loaded successfully")
self._view.enable_save(True)
@Slot()
def _on_save_requested(self):
"""User will speichern"""
data = self._model.get_data()
if self._validate_data(data):
self._save_data(data)
self._view.show_status("Saved successfully")
else:
self._view.show_error("Invalid data")
def _validate_data(self, data: dict) -> bool:
"""Presentation Logic: Validierung"""
return bool(data)
# ===== ZUSAMMENFÜHRUNG =====
class Application:
def __init__(self):
# Alle Komponenten erstellen
self.model = DataModel()
self.view = DataView()
self.presenter = DataPresenter(self.model, self.view)
self.view.show()
Vorteile: - Testbarkeit: Model und Presenter ohne GUI testbar - Separation: Klare Verantwortlichkeiten - Flexibility: Views austauschbar
Option 3: Hybrid Model/View + Services¶
Verwendung: Große Applikationen mit Backend-Kommunikation
# ===== SERVICE LAYER =====
class DataService(QObject):
"""
Service Layer: Backend Communication, API Calls.
Entkoppelt von GUI.
"""
data_fetched = Signal(dict)
fetch_failed = Signal(str)
def __init__(self, api_client):
super().__init__()
self._api_client = api_client
self._cache = {}
def fetch_data(self, resource_id: str):
"""Async Data Fetch"""
# Check Cache
if resource_id in self._cache:
QTimer.singleShot(0, lambda: self.data_fetched.emit(
self._cache[resource_id]
))
return
# Fetch from API (in background thread)
worker = self._create_fetch_worker(resource_id)
worker.finished.connect(lambda data: self._on_fetch_complete(
resource_id, data
))
worker.start()
def _on_fetch_complete(self, resource_id: str, data: dict):
self._cache[resource_id] = data
self.data_fetched.emit(data)
# ===== VIEW MODEL =====
class DataViewModel(QAbstractListModel):
"""
ViewModel: Transformiert Service Data für View.
"""
def __init__(self, data_service: DataService):
super().__init__()
self._items = []
self._service = data_service
# Service → ViewModel
self._service.data_fetched.connect(self._on_data_received)
@Slot(dict)
def _on_data_received(self, data: dict):
"""Service hat Daten geliefert"""
# Transform für View
items = self._transform_data(data)
# Update Model
self.beginResetModel()
self._items = items
self.endResetModel()
def _transform_data(self, data: dict) -> list:
"""Business Logic: Data Transformation"""
return [
{"title": item["name"], "subtitle": item["description"]}
for item in data.get("items", [])
]
# QAbstractListModel Interface
def rowCount(self, parent=QModelIndex()):
return len(self._items)
def data(self, index: QModelIndex, role: int):
if not index.isValid():
return None
item = self._items[index.row()]
if role == Qt.ItemDataRole.DisplayRole:
return item["title"]
elif role == Qt.ItemDataRole.ToolTipRole:
return item["subtitle"]
return None
# ===== ZUSAMMENFÜHRUNG =====
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
# Service Layer
api_client = ApiClient("https://api.example.com")
self.data_service = DataService(api_client)
# ViewModel
self.view_model = DataViewModel(self.data_service)
# View
self.list_view = QListView()
self.list_view.setModel(self.view_model)
self.setCentralWidget(self.list_view)
# Initial Load
self.data_service.fetch_data("user_data")
Best Practices für MVC/MVP in PySide6¶
1. ❌ Nicht: Tight Coupling¶
# ❌ FALSCH: Model kennt View
class BadModel:
def __init__(self, view):
self.view = view # BAD!
def update_data(self, data):
self._data = data
self.view.update_ui() # Model manipuliert View direkt
# ❌ FALSCH: View kennt Model-Interna
class BadView(QWidget):
def save(self):
# Direkt auf Model-Interna zugreifen
self.model._internal_data = {} # BAD!
2. ✅ Richtig: Loose Coupling über Signals¶
# ✅ GUT: Model emittiert Signals
class GoodModel(QObject):
data_changed = Signal(dict)
def update_data(self, data):
self._data = data
self.data_changed.emit(data) # View lauscht
# ✅ GUT: View emittiert User Actions
class GoodView(QWidget):
save_requested = Signal()
def _on_save_clicked(self):
self.save_requested.emit() # Presenter/Controller lauscht
3. Thread-sichere Model Updates¶
class ThreadSafeModel(QObject):
data_updated = Signal(list)
def __init__(self):
super().__init__()
self._data = []
self._lock = QMutex()
def add_item(self, item):
"""Thread-safe add"""
self._lock.lock()
try:
self._data.append(item)
# Signal im gleichen Thread wie Model
self.data_updated.emit(self._data.copy())
finally:
self._lock.unlock()
# View verbindet mit QueuedConnection für Thread-Safety
view.model.data_updated.connect(
view.update_list,
Qt.ConnectionType.QueuedConnection
)
4. Testbare Komponenten¶
# Model ist unabhängig testbar
def test_model():
model = DataModel()
# Signal Spy
spy = []
model.data_changed.connect(lambda d: spy.append(d))
# Test
model.load_data("test.json")
assert len(spy) == 1
assert "data" in spy[0]
# Presenter ist testbar mit Mock View
class MockView:
def __init__(self):
self.displayed_data = None
def display_data(self, data):
self.displayed_data = data
def test_presenter():
model = DataModel()
view = MockView()
presenter = DataPresenter(model, view)
# Test
model.load_data("test.json")
assert view.displayed_data is not None
Entscheidungsbaum: Welches Pattern?¶
┌─ Verwende ich QListView/QTableView/QTreeView?
│
├─ JA → Qt Model/View Pattern
│ (QAbstractItemModel / QAbstractListModel / QAbstractTableModel)
│
└─ NEIN → Custom Widgets?
│
├─ Einfache App → Direkte Signal/Slot Connections
│
├─ Mittlere Komplexität → MVP Pattern
│ (Model-View-Presenter)
│
└─ Große App mit Backend → Hybrid Pattern
(Services + ViewModels + Views)
Threading Architecture¶
Thread-Modell in PySide6¶
Grundprinzip: Qt ist nicht thread-safe! Nur der Main Thread darf GUI-Objekte manipulieren.
# ❌ FALSCH: GUI-Update aus Worker Thread
class BadWorker(QThread):
def run(self):
# CRASH! Läuft nicht im Main Thread
self.label.setText("Done")
# ✅ RICHTIG: Signal verwenden
class GoodWorker(QThread):
finished = Signal(str)
def run(self):
# Signal wird thread-safe im Main Thread verarbeitet
self.finished.emit("Done")
# In Main Thread
worker.finished.connect(self.label.setText) # Sicher!
Zwei Ansätze für Threading¶
Ansatz 1: QThread Subclassing¶
Verwendung: Langlebige Threads mit Event Loop
from PySide6.QtCore import QThread, Signal, Slot
class Worker(QThread):
"""
Worker Thread mit Event Loop.
Gut für: Polling, Monitoring, Background Services
"""
progress = Signal(int)
finished = Signal(dict)
error = Signal(str)
def __init__(self):
super().__init__()
self._running = False
def run(self):
"""Haupt-Thread-Funktion"""
self._running = True
try:
for i in range(100):
if not self._running:
break
# Schwere Arbeit
result = self._process_chunk(i)
# Thread-sicheres Signal
self.progress.emit(i)
# Kurz schlafen
self.msleep(100)
self.finished.emit({"status": "complete"})
except Exception as e:
self.error.emit(str(e))
def stop(self):
"""Thread sauber beenden"""
self._running = False
# Verwendung
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.worker = Worker()
# Signals verbinden
self.worker.progress.connect(self.update_progress)
self.worker.finished.connect(self.on_finished)
self.worker.error.connect(self.on_error)
# Thread starten
self.worker.start()
@Slot(int)
def update_progress(self, value: int):
self.progress_bar.setValue(value)
@Slot(dict)
def on_finished(self, result: dict):
print(f"Worker finished: {result}")
def closeEvent(self, event):
# Wichtig: Auf Thread-Ende warten
self.worker.stop()
self.worker.wait() # Blockiert bis Thread endet
event.accept()
QThread mit Worker Object Pattern:
class Worker(QObject):
"""
Worker als QObject (nicht QThread!).
Bevorzugtes Pattern für komplexe Workers.
"""
finished = Signal()
progress = Signal(int)
def __init__(self):
super().__init__()
@Slot()
def do_work(self):
"""Wird im Worker Thread ausgeführt"""
for i in range(100):
time.sleep(0.1)
self.progress.emit(i)
self.finished.emit()
# Setup
class Application:
def __init__(self):
# Worker und Thread erstellen
self.worker = Worker()
self.thread = QThread()
# Worker in Thread verschieben
self.worker.moveToThread(self.thread)
# Connections
self.thread.started.connect(self.worker.do_work)
self.worker.finished.connect(self.thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.thread.finished.connect(self.thread.deleteLater)
# Starten
self.thread.start()
Wann QThread Subclassing: - ✅ Einfache, dedizierte Background-Tasks - ✅ Ein Worker = Ein Thread - ✅ Thread braucht Event Loop (für Slots/Timers)
Wann Worker Object Pattern: - ✅ Mehrere Workers in einem Thread - ✅ Worker hat komplexe Lifetime - ✅ Worker braucht Parent/Child Relationships
Ansatz 2: QThreadPool + QRunnable¶
Verwendung: Kurze, viele parallele Tasks
from PySide6.QtCore import QRunnable, QThreadPool, Signal, Slot, QObject
class WorkerSignals(QObject):
"""
Signals für QRunnable (kann keine eigenen Signals haben).
"""
finished = Signal()
error = Signal(str)
result = Signal(object)
progress = Signal(int)
class Worker(QRunnable):
"""
Einzelne Task für Thread Pool.
Gut für: Kurze, unabhängige Tasks
"""
def __init__(self, fn, *args, **kwargs):
super().__init__()
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
# Auto-Delete nach Ausführung
self.setAutoDelete(True)
@Slot()
def run(self):
"""Wird im Thread Pool Thread ausgeführt"""
try:
result = self.fn(*self.args, **self.kwargs)
self.signals.result.emit(result)
except Exception as e:
self.signals.error.emit(str(e))
finally:
self.signals.finished.emit()
# Verwendung
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
# Thread Pool (global oder lokal)
self.threadpool = QThreadPool.globalInstance()
print(f"Max threads: {self.threadpool.maxThreadCount()}")
def execute_task(self, data):
"""Task im Thread Pool ausführen"""
def task_function(data):
# Schwere Berechnung
result = self.process(data)
return result
# Worker erstellen
worker = Worker(task_function, data)
# Signals verbinden
worker.signals.result.connect(self.on_result)
worker.signals.error.connect(self.on_error)
worker.signals.finished.connect(self.on_finished)
# In Pool einreihen
self.threadpool.start(worker)
@Slot(object)
def on_result(self, result):
print(f"Result: {result}")
Thread Pool Management:
class TaskManager:
def __init__(self):
self.threadpool = QThreadPool()
# Pool konfigurieren
self.threadpool.setMaxThreadCount(4)
self.threadpool.setExpiryTimeout(30000) # ms
def submit_task(self, task: QRunnable):
"""Task einreihen"""
self.threadpool.start(task)
def submit_high_priority(self, task: QRunnable):
"""Task mit Priorität"""
self.threadpool.start(task, priority=10)
def wait_for_done(self, timeout_ms: int = -1):
"""Auf alle Tasks warten"""
self.threadpool.waitForDone(timeout_ms)
def active_thread_count(self) -> int:
"""Aktive Threads"""
return self.threadpool.activeThreadCount()
Wann QThreadPool: - ✅ Viele kurze, unabhängige Tasks - ✅ Tasks brauchen keinen Event Loop - ✅ Automatisches Thread Management - ✅ Resource Pooling
Wann nicht QThreadPool: - ❌ Task braucht Event Loop (Timers, Signals) - ❌ Langlebige Background Services - ❌ Sehr spezifische Thread-Konfiguration
Thread-Kommunikation¶
1. Signals & Slots (Empfohlen)¶
class DataProcessor(QObject):
# Signals für Inter-Thread Communication
data_ready = Signal(bytes)
def process_in_thread(self, data: bytes):
# Läuft in Worker Thread
result = self.heavy_processing(data)
# Signal → Main Thread (thread-safe!)
self.data_ready.emit(result)
# Main Thread
processor = DataProcessor()
processor.data_ready.connect(self.update_ui)
2. Thread-sichere Datenstrukturen¶
from PySide6.QtCore import QMutex, QMutexLocker
class SharedData:
"""Thread-sichere Daten mit Mutex"""
def __init__(self):
self._data = {}
self._mutex = QMutex()
def set_value(self, key: str, value):
"""Thread-safe Write"""
locker = QMutexLocker(self._mutex)
self._data[key] = value
# Mutex wird automatisch freigegeben
def get_value(self, key: str):
"""Thread-safe Read"""
locker = QMutexLocker(self._mutex)
return self._data.get(key)
3. QWaitCondition für Synchronisation¶
from PySide6.QtCore import QWaitCondition, QMutex
class Producer(QThread):
def __init__(self, buffer, mutex, condition):
super().__init__()
self.buffer = buffer
self.mutex = mutex
self.condition = condition
def run(self):
for i in range(100):
self.mutex.lock()
# Warten wenn Buffer voll
while len(self.buffer) >= 10:
self.condition.wait(self.mutex)
# Daten hinzufügen
self.buffer.append(i)
# Consumer wecken
self.condition.wakeOne()
self.mutex.unlock()
class Consumer(QThread):
def __init__(self, buffer, mutex, condition):
super().__init__()
self.buffer = buffer
self.mutex = mutex
self.condition = condition
def run(self):
while True:
self.mutex.lock()
# Warten wenn Buffer leer
while len(self.buffer) == 0:
self.condition.wait(self.mutex)
# Daten konsumieren
data = self.buffer.pop(0)
# Producer wecken
self.condition.wakeOne()
self.mutex.unlock()
self.process(data)
Thread Best Practices¶
1. ✅ DOs:¶
# ✅ Signals für Thread Communication
worker.finished.connect(self.on_finished, Qt.ConnectionType.QueuedConnection)
# ✅ Auf Thread-Ende warten im Cleanup
def closeEvent(self, event):
self.worker.stop()
self.worker.wait(5000) # Max 5 Sekunden
if self.worker.isRunning():
self.worker.terminate() # Force kill
event.accept()
# ✅ Worker ohne Parent erstellen
worker = Worker() # Kein parent=self!
worker.moveToThread(thread)
# ✅ Errors im Thread handlen
class SafeWorker(QThread):
error = Signal(str)
def run(self):
try:
self.do_work()
except Exception as e:
self.error.emit(str(e))
2. ❌ DON'Ts:¶
# ❌ GUI-Objekte in Worker Thread
class BadWorker(QThread):
def run(self):
self.label.setText("Bad") # CRASH!
# ❌ Worker mit Parent in anderen Thread
worker = Worker(parent=self) # BAD!
worker.moveToThread(thread) # Kann nicht moved werden
# ❌ terminate() ohne wait()
self.worker.terminate() # Kann zu Resource Leaks führen
# ❌ Vergessen auf Thread zu warten
def closeEvent(self, event):
event.accept() # Thread läuft weiter!
# ❌ Direkter Zugriff auf shared Data ohne Mutex
# Thread 1
self.shared_list.append(item)
# Thread 2
self.shared_list.pop() # RACE CONDITION!
Thread Debugging¶
from PySide6.QtCore import QThread
class DebugWorker(QThread):
def run(self):
print(f"Worker Thread ID: {int(QThread.currentThreadId())}")
print(f"Worker Thread: {QThread.currentThread()}")
# Check ob im Main Thread
if QThread.currentThread() == QApplication.instance().thread():
print("Running in Main Thread!")
else:
print("Running in Worker Thread")
# Thread Names für Debugging
worker = Worker()
worker.setObjectName("MyWorkerThread")
# Priority
worker.setPriority(QThread.Priority.HighPriority)
Performance Considerations¶
# Python GIL: Threads sind nicht immer schneller!
# Gut für:
# - I/O Operations (Network, File)
# - C/C++ Extensions (ohne GIL)
# - Qt Operations (größtenteils außerhalb GIL)
# Nicht gut für:
# - Pure Python Berechnungen (GIL-bound)
# → Verwende multiprocessing stattdessen
# Anzahl Threads
optimal_threads = QThread.idealThreadCount() # CPU Cores
threadpool.setMaxThreadCount(optimal_threads)
Memory Management¶
Qt Object Ownership¶
Grundregel: Qt managed Memory über Parent-Child Relationships
# ✅ Parent-Child: Automatisches Cleanup
parent = QWidget()
child = QPushButton("Click", parent=parent) # parent besitzt child
# Wenn parent gelöscht wird:
parent.deleteLater() # child wird auch gelöscht
Python vs Qt Memory Management¶
Problem: Python's Garbage Collector vs Qt's Parent-Child System
# Scenario 1: Python hat Referenz
class Container:
def __init__(self):
self.widget = QWidget() # Python Referenz
self.widget.deleteLater() # Qt delete
# Python hält noch Referenz!
# → Dangling Pointer möglich
# Scenario 2: Qt hat Ownership
parent = QWidget()
child = QPushButton(parent=parent)
child = None # Python Referenz weg
# Kind existiert noch! Parent hält Ownership
# Scenario 3: Wer ist Owner?
widget = QWidget() # Python Owner
widget.setParent(parent) # Jetzt Qt Owner!
Signal/Slot Memory Management¶
Connections und Lifetime:
class Emitter(QObject):
signal = Signal()
class Receiver(QObject):
@Slot()
def slot(self):
print("Received")
# Connection Lifetime
emitter = Emitter()
receiver = Receiver()
connection = emitter.signal.connect(receiver.slot)
# Was passiert wenn receiver gelöscht wird?
receiver.deleteLater()
# Qt trennt automatisch die Connection!
# Was passiert wenn emitter gelöscht wird?
emitter.deleteLater()
# Qt trennt automatisch alle Connections!
⚠️ Problematisch: Lambdas und Closures:
# ❌ Memory Leak mit Lambda
class Widget(QWidget):
def __init__(self):
super().__init__()
self.data = [1, 2, 3] # Große Daten
# Lambda hält Referenz auf self
self.button.clicked.connect(
lambda: print(self.data) # self wird nie freigegeben!
)
# ✅ Lösung 1: WeakRef
from weakref import ref
class Widget(QWidget):
def __init__(self):
super().__init__()
self.data = [1, 2, 3]
weak_self = ref(self)
def callback():
strong_self = weak_self()
if strong_self:
print(strong_self.data)
self.button.clicked.connect(callback)
# ✅ Lösung 2: Explizite Slot-Methode
class Widget(QWidget):
def __init__(self):
super().__init__()
self.data = [1, 2, 3]
self.button.clicked.connect(self.on_clicked)
@Slot()
def on_clicked(self):
print(self.data)
deleteLater() vs delete¶
# deleteLater(): Sicherer, aber verzögert
widget = QWidget()
widget.deleteLater() # Wird im nächsten Event Loop Cycle gelöscht
# Widget existiert noch!
# Python del: Sofort (wenn keine Referenzen mehr)
widget = QWidget()
del widget # Python GC entscheidet
# C++ delete: Sofort (nicht in Python verfügbar)
# In C++: delete widget; // Sofort freigegeben
Wann welche Methode:
# ✅ deleteLater(): Standard, immer sicher
def close_window(self):
self.window.deleteLater()
# ✅ Python del: Explizit Python-Referenz freigeben
def cleanup(self):
self.temp_widget = None # Python GC übernimmt
# ❌ Direktes delete: Nicht verfügbar in Python
Memory Leaks vermeiden¶
Häufige Leak-Quellen:
1. Zirkuläre Referenzen:
# ❌ Memory Leak
class Parent(QWidget):
def __init__(self):
super().__init__()
self.child = Child(self)
self.child.parent_ref = self # Zirkel!
# ✅ Lösung: WeakRef
from weakref import ref
class Parent(QWidget):
def __init__(self):
super().__init__()
self.child = Child(self)
self.child.parent_ref = ref(self) # Schwache Referenz
2. Nicht getrennte Signals:
# ❌ Leak: Signal bleibt verbunden
class Controller:
def __init__(self):
self.model = Model()
self.view = None
def set_view(self, view):
if self.view:
# Vergessen alte View zu trennen!
pass
self.view = view
self.model.changed.connect(view.update)
# ✅ Lösung: Alte Connections trennen
class Controller:
def __init__(self):
self.model = Model()
self.view = None
self._connection = None
def set_view(self, view):
if self._connection:
self._connection.disconnect()
self.view = view
self._connection = self.model.changed.connect(view.update)
3. Thread-Workers ohne Cleanup:
# ❌ Leak: Worker wird nie gelöscht
class Manager:
def start_work(self):
worker = Worker()
thread = QThread()
worker.moveToThread(thread)
thread.start()
# Worker und Thread laufen ewig!
# ✅ Lösung: Proper Cleanup
class Manager:
def start_work(self):
worker = Worker()
thread = QThread()
worker.moveToThread(thread)
# Auto-Cleanup
worker.finished.connect(thread.quit)
worker.finished.connect(worker.deleteLater)
thread.finished.connect(thread.deleteLater)
thread.start()
4. C++ Objects mit Python References:
# ⚠️ Vorsicht: Qt löscht, Python hat noch Referenz
layout = QVBoxLayout()
widget = QPushButton("Click")
layout.addWidget(widget) # Layout übernimmt Ownership
# Widget wird von Layout gelöscht
layout.deleteLater()
# Python-Referenz ist jetzt dangling!
widget.setText("New") # CRASH: Widget wurde von Qt gelöscht
Memory Profiling¶
import gc
from PySide6.QtCore import QObject
def count_qobjects():
"""Zähle aktive QObjects"""
count = 0
for obj in gc.get_objects():
if isinstance(obj, QObject):
count += 1
return count
# Vor Operation
before = count_qobjects()
# Operation
create_and_destroy_widgets()
# Nach Operation
gc.collect() # Force GC
after = count_qobjects()
print(f"Leaked QObjects: {after - before}")
Memory Leak Detection:
class MemoryTracker:
"""Track QObject Creation/Deletion"""
def __init__(self):
self.created = []
self.destroyed = []
def track(self, obj: QObject):
"""Track ein Object"""
self.created.append(ref(obj))
obj.destroyed.connect(lambda: self.destroyed.append(obj))
def check_leaks(self):
"""Check für Leaks"""
gc.collect()
alive = [r for r in self.created if r() is not None]
if alive:
print(f"WARNING: {len(alive)} objects still alive!")
for weak_ref in alive:
obj = weak_ref()
print(f" - {obj.__class__.__name__}")
Best Practices¶
# ✅ Parent-Child für Widgets
parent = QWidget()
child1 = QPushButton(parent=parent)
child2 = QLabel(parent=parent)
# parent.deleteLater() löscht alle Children
# ✅ deleteLater() in Event Handlers
@Slot()
def on_close_clicked(self):
self.dialog.deleteLater() # Sicher
# ✅ Explicit Disconnect bei dynamischen Connections
connection = signal.connect(slot)
# Später:
connection.disconnect()
# ✅ Workers mit Auto-Cleanup
worker.finished.connect(worker.deleteLater)
thread.finished.connect(thread.deleteLater)
# ✅ WeakRef für Callbacks
weak_self = ref(self)
signal.connect(lambda: weak_self() and weak_self().method())
# ❌ Vermeide: Zirkuläre Referenzen
# ❌ Vermeide: Ungetrennte Signals
# ❌ Vermeide: Zugriff auf gelöschte Qt Objects
Performance Optimization¶
Signal/Slot Performance¶
Signal Emission ist nicht kostenlos:
# Benchmark: Signal vs Direct Call
# Direct Call: ~0.01 µs
# Signal (same thread): ~0.1 µs (10x langsamer)
# Signal (QueuedConnection): ~100 µs (10000x langsamer!)
# ❌ Schlecht: Signal in Tight Loop
class BadModel(QObject):
changed = Signal()
def update_data(self, items):
for item in items: # 10000 items
self._process(item)
self.changed.emit() # 10000 Signals!
# ✅ Besser: Signal batchen
class GoodModel(QObject):
changed = Signal()
def update_data(self, items):
for item in items:
self._process(item)
self.changed.emit() # 1 Signal
blockSignals() für temporäres Disabling:
class Widget(QWidget):
def set_values(self, values):
# Temporär Signals deaktivieren
self.slider.blockSignals(True)
for i, value in enumerate(values):
self.sliders[i].setValue(value)
# Re-enable und manuell emittieren
self.slider.blockSignals(False)
self.values_changed.emit()
Qt Model/View Optimization¶
1. Batch Updates:
class OptimizedModel(QAbstractListModel):
def add_items(self, items):
# ❌ Schlecht: Einzeln hinzufügen
# for item in items:
# self.add_single_item(item)
# ✅ Gut: Batch Insert
first = len(self._items)
last = first + len(items) - 1
self.beginInsertRows(QModelIndex(), first, last)
self._items.extend(items)
self.endInsertRows() # Ein Update für alle!
2. Lazy Loading:
class LazyModel(QAbstractListModel):
def __init__(self):
super().__init__()
self._items = []
self._loaded_count = 0
def rowCount(self, parent=QModelIndex()):
return self._loaded_count
def canFetchMore(self, parent=QModelIndex()):
return self._loaded_count < len(self._items)
def fetchMore(self, parent=QModelIndex()):
remainder = len(self._items) - self._loaded_count
items_to_fetch = min(100, remainder) # 100 auf einmal
first = self._loaded_count
last = self._loaded_count + items_to_fetch - 1
self.beginInsertRows(QModelIndex(), first, last)
self._loaded_count += items_to_fetch
self.endInsertRows()
3. dataChanged() minimieren:
class SmartModel(QAbstractListModel):
def update_items(self, changed_indices):
# ❌ Schlecht: Einzelne dataChanged
# for idx in changed_indices:
# self.dataChanged.emit(self.index(idx), self.index(idx))
# ✅ Gut: Bereiche zusammenfassen
ranges = self._consolidate_ranges(changed_indices)
for start, end in ranges:
self.dataChanged.emit(
self.index(start),
self.index(end)
)
def _consolidate_ranges(self, indices):
"""[1, 2, 3, 7, 8, 9] → [(1, 3), (7, 9)]"""
if not indices:
return []
indices = sorted(indices)
ranges = []
start = indices[0]
end = indices[0]
for idx in indices[1:]:
if idx == end + 1:
end = idx
else:
ranges.append((start, end))
start = end = idx
ranges.append((start, end))
return ranges
Threading für Performance¶
CPU-bound vs I/O-bound:
# I/O-bound: Threads sind gut
class IOWorker(QRunnable):
def run(self):
# Network Request, File I/O
data = self.fetch_from_network() # Threads helfen!
return data
# CPU-bound: Threads helfen NICHT (Python GIL)
class CPUWorker(QRunnable):
def run(self):
# Pure Python Berechnung
result = sum(range(10000000)) # GIL-bound!
return result
# ✅ Für CPU-bound: multiprocessing verwenden
from multiprocessing import Pool
def cpu_task(data):
return sum(data)
with Pool(4) as p:
results = p.map(cpu_task, chunks)
Event Loop Optimization¶
Schwere Arbeit aufteilen:
class ProgressiveLoader:
def __init__(self):
self.queue = []
self.timer = QTimer()
self.timer.timeout.connect(self.process_chunk)
def load_data(self, items):
# ❌ Schlecht: Alles auf einmal
# for item in items:
# self.process(item) # Blockiert Event Loop!
# ✅ Gut: In Chunks aufteilen
self.queue = items[:]
self.timer.start(0) # Process im Event Loop
def process_chunk(self):
# Verarbeite 100 Items
chunk = self.queue[:100]
self.queue = self.queue[100:]
for item in chunk:
self.process(item)
if not self.queue:
self.timer.stop()
# Event Loop kann andere Events verarbeiten!
processEvents() vermeiden:
# ❌ SCHLECHT: Manuell Event Loop pumpen
def long_operation(self):
for i in range(10000):
self.process(i)
QApplication.processEvents() # BAD! Unkontrolliert
# ✅ BESSER: Threading
def long_operation(self):
worker = Worker(self.process, range(10000))
worker.signals.finished.connect(self.on_finished)
self.threadpool.start(worker)
Caching & Memoization¶
from functools import lru_cache
class DataProcessor:
@lru_cache(maxsize=128)
def expensive_computation(self, data: str) -> dict:
"""Cache für teure Berechnungen"""
# Schwere Berechnung
result = self._compute(data)
return result
def clear_cache(self):
"""Cache leeren wenn Daten sich ändern"""
self.expensive_computation.cache_clear()
QPixmap & QImage Optimization¶
# ✅ Pixmap Cache verwenden
from PySide6.QtGui import QPixmapCache
class ImageLoader:
def load_image(self, path: str) -> QPixmap:
# Check Cache
pixmap = QPixmapCache.find(path)
if pixmap:
return pixmap
# Load und Cache
pixmap = QPixmap(path)
QPixmapCache.insert(path, pixmap)
return pixmap
# Cache Size setzen
QPixmapCache.setCacheLimit(100 * 1024) # 100 MB
Profiling¶
import cProfile
import pstats
# Profile Qt Application
def profile_app():
profiler = cProfile.Profile()
profiler.enable()
# App Code
app.exec()
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20)
# QElapsedTimer für Timing
from PySide6.QtCore import QElapsedTimer
timer = QElapsedTimer()
timer.start()
# Operation
heavy_operation()
print(f"Elapsed: {timer.elapsed()}ms")
Best Practices Zusammenfassung¶
Signal & Slots¶
# ✅ DOs
- @Slot decorator für Production Code
- Signals für Thread-Kommunikation
- Explizit disconnect bei dynamischen Connections
- Connection Type explizit bei Threading
- Signal-Namen beschreibend (data_changed, not update)
# ❌ DON'Ts
- Signals in Tight Loops
- Lambdas ohne WeakRef für Widgets
- Mehrfache Connections ohne UniqueConnection
- BlockingQueuedConnection ohne Deadlock-Check
Threading¶
# ✅ DOs
- QThreadPool für kurze Tasks
- QThread + Worker für langlebige Threads
- Immer auf Thread.wait() beim Cleanup
- Mutex für shared Data
- Signals für GUI Updates
# ❌ DON'Ts
- GUI-Zugriff aus Worker Thread
- Worker mit Parent vor moveToThread()
- terminate() ohne wait()
- Shared Data ohne Synchronisation
MVC/MVP¶
# ✅ DOs
- Model emittiert Signals bei Änderungen
- View ist passiv, kennt kein Model
- Presenter/Controller verbindet Model ↔ View
- Business Logic im Model/Service
- Presentation Logic im Presenter
# ❌ DON'Ts
- Model kennt View
- View greift direkt auf Model zu
- Business Logic in View
- Tight Coupling zwischen Komponenten
Memory Management¶
# ✅ DOs
- Parent-Child für Widgets nutzen
- deleteLater() in Event Handlers
- WeakRef für Closures
- Explicit disconnect bei Cleanup
- Track Object Lifetime in Debugging
# ❌ DON'Ts
- Zirkuläre Referenzen
- Ungetrennte Signals
- Zugriff auf gelöschte Qt Objects
- Vergessene Thread Cleanup
Performance¶
# ✅ DOs
- Batch Signal Emissions
- Lazy Loading für große Daten
- Threading für I/O Operations
- Cache für teure Berechnungen
- Profile vor Optimierung!
# ❌ DON'Ts
- processEvents() in Loops
- Signals in Tight Loops
- Unnötige dataChanged()
- Threads für Pure Python (GIL!)
Referenzen & Weiterführende Links¶
Offizielle Dokumentation¶
- PySide6 Docs: https://doc.qt.io/qtforpython-6/
- Qt for Python Tutorials: https://doc.qt.io/qtforpython-6/tutorials/index.html
- Signals and Slots: https://doc.qt.io/qtforpython-6/tutorials/basictutorial/signals_and_slots.html
- Model/View Programming: https://doc.qt.io/qtforpython-6/overviews/model-view-programming.html
- QThread Documentation: https://doc.qt.io/qtforpython-6/PySide6/QtCore/QThread.html
Wichtige Konzepte¶
- Qt Object Model: https://doc.qt.io/qt-6/object.html
- The Meta-Object System: https://doc.qt.io/qt-6/metaobjects.html
- Thread Basics: https://doc.qt.io/qt-6/threads.html
- Threading Technologies: https://doc.qt.io/qt-6/threads-technologies.html
Community Ressourcen¶
- Python GUI Programming: https://www.pythonguis.com/
- Qt Forum: https://forum.qt.io/category/15/qt-for-python
- Stack Overflow: Tag
pyside6/pyqt6
Schlusswort¶
Dieser Guide deckt die architektonischen Kernkonzepte von PySide6 ab:
✅ Signal/Slot System - Wann und wie verwenden
✅ Connection Types - Thread-sichere Kommunikation
✅ MVC/MVP Pattern - Saubere Architektur
✅ Threading - Performance ohne GUI-Freezes
✅ Memory Management - Leaks vermeiden
✅ Performance - Optimization Best Practices
Key Takeaways: 1. Signals & Slots für lose Kopplung 2. QueuedConnection für Thread-Kommunikation 3. MVP Pattern für testbare Architektur 4. QThreadPool für parallele Tasks 5. Parent-Child für automatisches Memory Management 6. Profiling vor Optimization
Viel Erfolg bei der Entwicklung professioneller PySide6-Applikationen! 🚀