QThreadPool Walk the Filesystem
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import os
import sys
from PySide6.QtCore import (QObject, QRunnable, QThread,
QThreadPool, Slot, Signal, Qt)
from PySide6.QtWidgets import (QApplication,
QPushButton, QLabel, QWidget, QVBoxLayout)
class Signals(QObject):
progress = Signal(str)
error = Signal(str)
# 1. Create a QRunnable subclass
# and implement its run() method
class Runnable(QRunnable):
signals = Signals()
def __init__(self, parent=None):
super().__init__(parent)
self.do_work = True
QThread.currentThread().setObjectName('Worker thread')
# Enumerate fs objects while self.do_work flag is True
def run(self):
path = os.path.abspath('.').split(os.path.sep)[0] + os.path.sep
for root, _, _ in os.walk(path):
if not self.do_work:
return
self.signals.progress.emit(os.path.basename(root))
@Slot()
def on_cancel_emitted(self):
self.do_work = False
class Window(QWidget):
# 2. add a custom signal to be emitted
# when we want to cancel the task.
cancel_runnable = Signal()
def __init__(self):
super().__init__()
layout = QVBoxLayout()
self.setLayout(layout)
self.start_button = QPushButton('Start background thread')
self.start_button.clicked.connect(self.on_start_button_clicked)
self.cancel_button = QPushButton('Cancel')
self.cancel_button.clicked.connect(self.on_cancel_button_clicked)
self.cancel_button.setDisabled(True)
self.label = QLabel()
self.label.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(self.start_button)
layout.addWidget(self.cancel_button)
layout.addWidget(self.label)
@Slot()
def on_start_button_clicked(self):
# 3. Create a Runnable object
# and connect the signals and the slots
runnable = Runnable()
runnable.signals.progress.connect(self.label.setText)
runnable.signals.error.connect(self.on_error)
self.cancel_runnable.connect(runnable.on_cancel_emitted)
# 4. Run the task.
QThreadPool.globalInstance().start(runnable)
self.start_button.setDisabled(True)
self.cancel_button.setEnabled(True)
@Slot()
def on_cancel_button_clicked(self):
self.cancel_runnable.emit()
self.start_button.setEnabled(True)
self.cancel_button.setDisabled(True)
@Slot()
def on_error(self, message):
print(message)
# Emit the cancel_runnable signal to interrupt
# the runnable on the main window close
def closeEvent(self, event):
try:
self.cancel_runnable.emit()
except Exception as e:
print(e)
if __name__ == '__main__':
app = QApplication(sys.argv)
main_window = Window()
main_window.show()
sys.exit(app.exec())
Let’s see what the “walk the filesystem” example looks like rewritten to use the QThreadPool
- QRunnable
pair:
-
Create a
QRunnable
subclass and implement itsrun()
method. Add a member variable that contains the signals to be emitted from the runnable. Also add a Boolean flag nameddo_work
to signal therun()
method interruption. Therun()
method itself uses the Pythonos.walk()
method to enumerate filesystem objects and reports each enumerated object by emitting theprogress
signal. -
In the main window class, add a custom signal when the background task needs to be interrupted named
cancel_unnable
. Connectcancel_runnable
with theRunnable.on_cancel_emitted
slot that sets theRunnable.do_work
flag toFalse
. -
In the main window
on_start_button_clicked
create aRunnable
object and connects the signals with the slots. When theprogress
signal is emitted we set the label’s text to the reported file or directory. When the cancel button is clicked and thecancel_runnable
signal emitted we change theRunnable.do_work
value toFalse
. -
Access the global instance of the thread pool and run the task using
QThreadPool.start()
method.