1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
| # https://doc.qt.io/qt-6/qthread.html
import sys
from PySide6.QtCore import QObject, QThread, Slot, Signal, Qt
from PySide6.QtWidgets import (QApplication, QPushButton,
QLabel, QWidget, QVBoxLayout)
class Worker(QObject):
auto_signal = Signal()
direct_signal = Signal()
queued_signal = Signal()
blocking_signal = Signal()
@Slot()
def auto_slot(self):
print('Auto connection')
print('In', QThread.currentThread().objectName(),
', Loop level', QThread.currentThread().loopLevel())
@Slot()
def direct_slot(self):
print('Direct connection')
print('In', QThread.currentThread().objectName(),
', Loop level', QThread.currentThread().loopLevel())
@Slot()
def queued_slot(self):
print('Queued connection')
print('In', QThread.currentThread().objectName(),
', Loop level', QThread.currentThread().loopLevel())
@Slot()
def blocking_slot(self):
print('Blocking Queued connection')
print('In', QThread.currentThread().objectName(),
', Loop level', QThread.currentThread().loopLevel())
QThread.sleep(10)
class Window(QWidget):
def __init__(self):
super().__init__()
QThread.currentThread().setObjectName('Main thread')
layout = QVBoxLayout()
self.setLayout(layout)
self.button_auto = QPushButton('Auto connection')
self.button_direct = QPushButton('Direct connection')
self.button_queued = QPushButton('Queued connection')
self.button_blocking = QPushButton('Blocking Queued connection')
self.label = QLabel()
self.label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.emitting_thread = QThread()
self.emitting_thread.setObjectName('Emitting Thread')
self.emitter = Worker()
self.emitter.moveToThread(self.emitting_thread)
self.receiving_thread = QThread()
self.receiving_thread.setObjectName('Receiving Thread')
self.receiver = Worker()
self.receiver.moveToThread(self.receiving_thread)
self.emitter.auto_signal.connect(self.receiver.auto_slot, Qt.ConnectionType.AutoConnection)
self.emitter.direct_signal.connect(self.receiver.direct_slot, Qt.ConnectionType.DirectConnection)
self.emitter.queued_signal.connect(self.receiver.queued_slot, Qt.ConnectionType.QueuedConnection)
self.emitter.blocking_signal.connect(self.receiver.blocking_slot, Qt.ConnectionType.BlockingQueuedConnection)
self.button_auto.clicked.connect(self.emitter.auto_signal)
self.button_direct.clicked.connect(self.emitter.direct_signal)
self.button_queued.clicked.connect(self.emitter.queued_signal)
self.button_blocking.clicked.connect(self.emitter.blocking_signal)
self.emitting_thread.start()
self.receiving_thread.start()
layout.addWidget(self.button_auto)
layout.addWidget(self.button_direct)
layout.addWidget(self.button_queued)
layout.addWidget(self.button_blocking)
layout.addWidget(self.label)
@Slot()
def on_result_ready(self, result):
self.label.setText(result)
def closeEvent(self, event):
try:
self.emitting_thread.quit()
self.emitting_thread.wait()
self.receiving_thread.quit()
self.receiving_thread.wait()
except Exception as e:
print(e)
event.accept()
if __name__ == '__main__':
app = QApplication(sys.argv)
main_window = Window()
main_window.show()
sys.exit(app.exec())
|