任何代码一旦涉及线程,往往会变得十分复杂和不可控,但对于图形界面应用,通常必须要求使用线程,甚至多线程。Qt 开发人员 Bradley T. Hughes 曾发文「You’re doing it wrong…」指出很多对 Qt 中线程的使用方法都是不正确的。

本模块框架展示了如何正确使用 PyQt 中的 QThread 来创建和管理多线程,其核心逻辑是:由主线程创建多个 Worker 对象,并为每个对象绑定一个 QThread 线程,这些线程通过 Qt 的异步信号机制,与主线程(通常是 GUI 线程)进行通信。

from PyQt5.QtCore import QObject, QThread, pyqtSignal, pyqtSlot

测试环境:macOS 10.13.3;Python 3.6.5;PyQt5。

信号与槽的核心代码原理

  • 使用 pyqtSignal() 方法来定义一个新的信号(Signal),其参数可以是 Python 的对象类型,或者是 C++ 类型。同时,它也可以是这些类型参数的一个序列,以此来实现信号函数的重载
class Foo(QObject):

    sig_open = pyqtSignal()
    sig_close = pyqtSignal(int, int)
    sig_create = pyqtSignal([int], ['QString'])

注意:新的信号必须定义在 QObject 的子类之中。对信号进行重载时,要格外小心,因为有些 Python 类型(如:listdict)并没有对应的 C++ 类型,而 PyQt5 会使用同一个内部 C++ 类来表示它们。

  • 使用 @pyqtSlot() 装饰器来显式标记一个被用作槽(Slot)函数的 Python 方法,并同时提供对应的 C++ 类型作为参数。
class Foo(QObject):

    @pyqtSlot()
    def foo(self):
        """ C++: void foo() """

    @pyqtSlot(int, str)
    def foo(self, arg1, arg2):
        """ C++: void foo(int, QString) """

    @pyqtSlot(int, QObject)
    def foo(self, arg1):
        """ C++: int foo(int, QObject *) """

尽管 PyQt5 可以使用任何函数作为连接信号的槽(slot),但连接信号到一个被 @pyqtSlot() 装饰的 Python 方法,不仅代码上更为清晰,性能上也会略有提升。

  • 信号通过 connect() 连接到对应的槽。对应地,可以使用 disconnect() 方法进行解耦。emit() 方法用于主动触发信号,来调用对应的槽函数。
class Foo(QObject):

    sig_output = pyqtSignal([int], [str])

    def connect_and_emit(self):
        self.sig_output[int].connect(self.output_int)
        self.sig_output[str].connect(self.output_str)
        self.sig_output[int].emit(5)
        self.sig_output[str].emit('hello')

    def output_int(self, index):
        print('signal received with index:', index)

    def outout_str(self, text):
        print('signal received with string:', text)

QThread 类的核心代码原理

  • 静态方法 currentThread()currentThreadId() 分别用于返回当前正在执行的线程和线程句柄。
QThread.currentThread().setObjectName('main thread')
thread_name = QThread.currentThread().objectName()
thread_id = int(QThread.currentThreadId())
  • moveToThread() 将一个对象绑定到特定的目标线程上,后续的事件处理会在目标线程上进行。如前所述,这里的对象要求继承自 QObject 而不是 QThread
worker = Worker()
thread = QThread()
worker.moveToThread(thread)

注意:该方法是线程安全的,意味着只能把一个对象从当前线程推送至(push)另一个线程,而不能把该对象从任意其他线程拉回至(pull)当前线程。

  • start() 方法用于启动执行一个线程,在调用该方法前,应绑定信号 started() 至某个槽函数上,当线程开始执行后会发出信号。
thread.started.connect(worker.work)
thread.start()
  • quit() 方法用于告知线程退出事件循环(event loop);而 wait() 方法则用于阻塞线程,直至其执行完成,即退出事件循环。两者应同时使用。
for thread, worker in self.__threads:
    thread.quit()
    thread.wait()

注意:另一个退出线程的方法是 terminate(),但它会直接退出整个代码路径,即使线程正在修改某个数据,因此,应当极力避免使用该方法。

  • 当一个线程执行一个较长时间的任务时,执行期间线程的事件循环是阻塞的,因此无法响应其他的事件。此时,可以通过 processEvents() 方法,使得线程(包括主线程)有机会来处理其他事件(信号)。若该信号来自主线程,如要求退出当前线程,则应设置一个标志位,并立即返回事件循环,在事件循环中根据该标志位判断是否退出,从而避免阻塞父线程。
for step in range(1000):
    # some time-consuming work
    # ...
    app.processEvents()
    if self.__abort:  # received the signal to abort
        break

该方法是线程安全的。所谓线程安全(thread-safe)的方法,指的是该方法可以同时被多个线程调用,而它们共享数据的引用都会被串行化。线程安全的类,指的是该类的成员函数都可以被多个线程调用,即使所有的线程都使用同一个类的实例。只有线程安全的类,才适用于使用多线程。

一个基于 PyQt5 和 QThread 的多线程模板

  • GUI 主线程创建并维护一个多线程组 __threads[],然后连接子线程的信号(如正在执行 sig_working、执行完成 sig_finished、输出结果 sig_output等)至主线程对应的槽函数上;同时,连接主线程要求退出所有子线程的信号 sig_abort_workers 至每一个子线程的退出槽函数上。

为了避免界面卡死,主线程发出退出信号后,便要求设置子线程的标志位 __abortTrue。此时,主线程可以立即返回,而不用阻塞等待子线程完成当前事件循环。

class MyWidget(QWidget):

    NUM_THREADS = 3
    sig_abort_workers = pyqtSignal()

    def start_workers(self):

        self.__workers_done = 0
        self.__threads = []
        for idx in range(self.NUM_THREADS):
            worker = Worker(idx)
            thread = QThread()
            self.__threads.append((thread, worker))
            worker.moveToThread(thread)
            worker.sig_working.connect(self.on_worker_working)
            worker.sig_finished.connect(self.on_worker_finished)
            self.sig_abort_workers.connect(worker.abort)
            thread.started.connect(worker.work)
            thread.start()

    @pyqtSlot()
    def abort_workers(self):

        self.txt_log.append('Asking each worker to abort')
        self.sig_abort_workers.emit()

    @pyqtSlot(int)
    def on_worker_finished(self, worker_id):

        thread, worker = self.__threads[worker_id]
        thread.quit()
        thread.wait()
        if self.__workers_done == self.NUM_THREADS:
            self.txt_log.append('No more workers active')
            self.txt_log.append('All threads exited')
  • 子线程在执行事件循环的过程中调用 processEvents() 便可以处理其他信号,比如这里主线程发出的 sig_abort_workers 信号,要求结束当前子线程。一旦子线程接收并响应了到该信号,子线程的退出标志位即为 True,当子线程返再次返回到事件循环中后,便会根据该标志位主动退出事件循环,并告知父线程,当前工作已执行完毕。

当父线程接收到子线程执行完毕的信号后,才能正常结束子线程,且不会因为阻塞等待而造成界面卡死。

class Worker(QObject):

    sig_working = pyqtSignal(int, str)
    sig_finished = pyqtSignal(int)

    @pyqtSlot()
    def work(self):

        thread_name = QThread.currentThread().objectName()
        thread_id = int(QThread.currentThreadId())
        for step in range(20):
            # some time-consuming work
            time.sleep(2)
            app.processEvents()
            if self.__abort:
                break
        self.sig_finished.emit(self.__id)

    def abort(self):
        self.__abort = True
  • 最后,给出一个完整的基于 PyQt5 和 QThread 的「Python 多线程示例代码」。用户创建并运行 3 个线程,每个线程都会迭代 10 步进行「计算」,每一次的迭代「计算」都耗时 5 秒。用户可以等待程序计算完毕,或者在执行过程中,主动要求停止。

一个正确的多线程程序,应当能正常退出当前正在运行的线程,并完成清理工作,且不造成界面卡死。

XJ4YVVyzgRLHUvm8.png

仅有 1 条评论


  1. Sidlly

    你好,看了你的这篇文章感觉发现了新大陆一样,之前对于多线程的使用都是继承QThread 类重写run方法。

    想请教你一下,如何控制启动的线程数量呢?

    有这样一个场景:
    一批数据range(1000)需要调用一个http接口去查询,但是接口并发量有限制,如果我直接下面这样写肯定是不行的。
    for i in range(100000):

    thread = QThread() apiWorker = API(i) # 查询类(QObject) apiWorker.moveToThread(thread) self.threadList.append((thread, apiWorker)) apiWorker.logSIG.connect(self.log) apiWorker.exceptionOccurredSIG.connect(self.log) apiWorker.updateProgressSIG.connect(self.updateProgress) thread.started.connect(apiWorker.work) thread.start()

    盼复

    Sidlly  2018-11-26 16:04回复

发表新评论

沪ICP备17018959号-3