关于 C++ 中 Qt 多线程的缺失文章

已发表: 2022-03-11

C++ 开发人员努力构建强大的多线程 Qt 应用程序,但在所有这些竞争条件、同步、死锁和活锁的情况下,多线程从来都不是一件容易的事。 值得称赞的是,您没有放弃并发现自己正在搜索 StackOverflow。 尽管如此,从十几个不同的答案中选择正确且有效的解决方案并非易事,尤其是考虑到每种解决方案都有其自身的缺点。

多线程是一种广泛使用的编程和执行模型,它允许多个线程存在于一个进程的上下文中。 这些线程共享进程的资源,但能够独立执行。 线程编程模型为开发人员提供了有用的并发执行抽象。 多线程也可以应用于一个进程,以在多处理系统上实现并行执行。

– 维基百科

本文的目的是汇总有关使用 Qt 框架进行并发编程的基本知识,特别是最容易被误解的主题。 读者应具有 Qt 和 C++ 方面的背景才能理解内容。

在使用QThreadPoolQThread之间进行选择

Qt 框架为多线程提供了许多工具。 选择正确的工具一开始可能具有挑战性,但实际上,决策树仅包含两个选项:您要么希望 Qt 为您管理线程,要么您希望自己管理线程。 但是,还有其他重要标准:

  1. 不需要事件循环的任务。 具体来说,任务执行过程中不使用信号/槽机制的任务。
    使用:QtConcurrent 和 QThreadPool + QRunnable。

  2. 使用信号/槽并因此需要事件循环的任务。
    使用: Worker 对象移动到 + QThread。

Qt 框架的极大灵活性允许您解决“缺少事件循环”问题并将一个添加到QRunnable

 class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }

但是,请尽量避免这种“变通方法”,因为这些方法既危险又不高效:如果线程池中的一个线程(运行 MyTask)由于等待信号而被阻塞,则它无法执行池中的其他任务。

替代文字

您还可以通过覆盖QThread::run()方法来运行没有任何事件循环的QThread ,只要您知道自己在做什么,这非常好。 例如,不要期望方法quit()在这种情况下工作。

一次运行一个任务实例

想象一下,您需要确保一次只能执行一个任务实例,并且运行同一任务的所有待处理请求都在某个队列上等待。 这通常在任务访问独占资源时需要,例如写入同一个文件或使用 TCP 套接字发送数据包。

让我们暂时忘记计算机科学和生产者-消费者模式,考虑一些微不足道的事情; 在实际项目中很容易找到的东西。

这个问题的一个简单的解决方案可能是使用QMutex 。 在任务函数内部,您可以简单地获取互斥锁,从而有效地序列化所有尝试运行任务的线程。 这将保证一次只有一个线程可以运行该函数。 但是,此解决方案通过引入高争用问题来影响性能,因为所有这些线程在它们可以继续之前都会被阻塞(在互斥体上)。 如果您有许多线程在积极使用这样的任务并在其间做一些有用的工作,那么所有这些线程大部分时间都只是在休眠。

 void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }

为了避免争用,我们需要一个队列和一个工作在自己的线程中并处理队列。 这几乎是经典的生产者-消费者模式。 工作者(消费者)将从队列中一一挑选请求,每个生产者可以简单地将其请求添加到队列中。 一开始听起来很简单,您可能会想到使用QQueueQWaitCondition ,但请稍等,看看我们是否可以在没有这些原语的情况下实现目标:

  • 我们可以使用QThreadPool ,因为它有一个待处理的任务队列

要么

  • 我们可以使用默认QThread::run()因为它有QEventLoop

第一个选项是使用QThreadPool 。 我们可以创建一个QThreadPool实例并使用QThreadPool::setMaxThreadCount(1) 。 然后我们可以使用QtConcurrent::run()来安排请求:

 class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };

这个解决方案有一个好处: QThreadPool::clear()允许您立即取消所有挂起的请求,例如当您的应用程序需要快速关闭时。 但是,还有一个与线程关联性相关的重大缺点: logEventCore函数可能会在不同的线程中执行,从一个调用到另一个调用。 我们知道 Qt 有一些需要线程关联的类: QTimerQTcpSocket和可能的其他一些。

Qt 规范对线程亲和性的描述是:定时器在一个线程中启动,不能从另一个线程停止。 并且只有拥有套接字实例的线程才能使用这个套接字。 这意味着您必须停止启动它们的线程中的任何正在运行的计时器,并且您必须在拥有套接字的线程中调用 QTcpSocket::close()。 这两个示例通常都在析构函数中执行。

更好的解决方案依赖于使用QThread提供的QEventLoop 。 这个想法很简单:我们使用信号/槽机制来发出请求,并且在线程内运行的事件循环将作为一个队列,一次只允许执行一个槽。

 // the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };

LogWorker构造函数和logEvent的实现很简单,因此这里不提供。 现在我们需要一个服务来管理线程和工作实例:

 // interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent("event"); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); } 

替代文字

让我们讨论一下这段代码是如何工作的:

  • 在构造函数中,我们创建了一个线程和工作者实例。 请注意,worker 没有收到父线程,因为它将被移动到新线程。 正因为如此,Qt 将无法自动释放 worker 的内存,因此,我们需要通过将QThread::finished信号连接到deleteLater插槽来做到这一点。 我们还将代理方法LogService::logEvent()连接到LogWorker::logEvent() ,由于线程不同,它将使用Qt::QueuedConnection模式。
  • 在析构函数中,我们将quit事件放入事件循环的队列中。 此事件将在处理完所有其他事件处理。 例如,如果我们在析构函数调用之前进行了数百次logEvent()调用,那么记录器将在获取退出事件之前处理它们。 当然,这需要时间,所以我们必须wait()直到事件循环退出。 值得一提的是,在退出事件之后发布的所有未来日志记录请求将永远不会被处理。
  • 日志记录本身( LogWorker::logEvent )将始终在同一个线程中完成,因此这种方法对于需要thread-affinity的类来说效果很好。 同时, LogWorker的构造函数和析构函数都在主线程(特别是LogService运行所在的线程)中执行,因此您需要非常小心您在那里运行的代码。 具体来说,除非您可以在同一个线程中运行析构函数,否则不要在工作者的析构函数中停止计时器或使用套接字!

在同一个线程中执行worker的析构函数

如果您的工作人员正在处理计时器或套接字,您需要确保析构函数在同一个线程中执行(您为工作人员创建的线程以及您将工作人员移动到的位置)。 支持这一点的明显方法是QThread并在QThread::run()方法中delete worker。 考虑以下模板:

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };

使用这个模板,我们从前面的例子中重新定义LogService

 // interface class LogService : public Thread<LogWorker> { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread<LogWorker>(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }

让我们讨论一下这应该如何工作:

  • 我们将LogService设为QThread对象,因为我们需要实现自定义的run()函数。 我们使用私有子类来防止访问QThread的函数,因为我们想在内部控制线程的生命周期。
  • Thread::run()函数中,我们通过调用默认的QThread::run()实现来运行事件循环,并在事件循环退出后立即销毁工作实例。 请注意,worker 的析构函数在同一个线程中执行。
  • LogService::logEvent()是将日志事件发布到线程的事件队列的代理函数(信号)。

暂停和恢复线程

另一个有趣的机会是能够暂停和恢复我们的自定义线程。 想象一下,当应用程序最小化、锁定或刚刚失去网络连接时,您的应用程序正在执行一些需要暂停的处理。 这可以通过构建一个自定义异步队列来实现,该队列将保存所有待处理的请求,直到工作人员恢复。 然而,由于我们正在寻找最简单的解决方案,我们将(再次)使用事件循环的队列来实现相同的目的。

要挂起一个线程,我们显然需要它等待某个等待条件。 如果线程以这种方式被阻塞,它的事件循环不会处理任何事件,Qt 必须将 keep 放入队列中。 一旦恢复,事件循环将处理所有累积的请求。 对于等待条件,我们只需使用同样需要QMutexQWaitCondition对象。 为了设计一个可以被任何工作人员重用的通用解决方案,我们需要将所有挂起/恢复逻辑放入一个可重用的基类中。 我们称之为SuspendableWorker 。 这样的类应支持两种方法:

  • suspend()将是一个阻塞调用,它将线程设置为等待条件。 这将通过将挂起请求发布到队列中并等待它被处理来完成。 非常类似于QThread::quit() + wait()
  • resume()将发出等待条件以唤醒睡眠线程以继续执行。

让我们回顾一下接口和实现:

 // interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
 // implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }

请记住,挂起的线程永远不会收到quit事件。 出于这个原因,除非我们在发布退出之前恢复线程,否则我们不能安全地将它与 vanilla QThread一起使用。 让我们将它集成到我们的自定义Thread<T>模板中以使其防弹。

替代文字

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };

通过这些更改,我们将在发布退出事件之前恢复线程。 此外, Thread<TWorker>仍然允许传入任何类型的工作者,无论它是否为SuspendableWorker

用法如下:

 LogService logService; logService.logEvent("processed event"); logService.suspend(); logService.logEvent("queued event"); logService.resume(); // "queued event" is now processed.

易失性与原子性

这是一个经常被误解的话题。 大多数人认为volatile变量可用于为多个线程访问的某些标志提供服务,并且可以避免数据竞争条件。 那是错误的,为此必须使用QAtomic*类(或std::atomic )。

让我们考虑一个现实的例子:一个在专用线程中工作的TcpConnection连接类,我们希望这个类导出一个线程安全的方法: bool isConnected() 。 在内部,该类将监听套接字事件: connecteddisconnected以维护内部布尔标志:

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }

使_connected成员volatile不会解决问题,也不会使isConnected()线程安全。 此解决方案将在 99% 的时间内有效,但剩下的 1% 将使您的生活成为一场噩梦。 为了解决这个问题,我们需要保护来自多个线程的变量访问。 让我们为此目的使用QReadWriteLocker

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }

这工作可靠,但不如使用“无锁”原子操作快。 第三种解决方案既快速又线程安全(该示例使用std::atomic而不是QAtomicInt ,但在语义上它们是相同的):

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic<bool> _connected; }

结论

在本文中,我们讨论了使用 Qt 框架进行并发编程的几个重要问题,并设计了解决特定用例的解决方案。 我们没有考虑许多简单的主题,例如使用原子原语、读写锁和许多其他主题,但如果您对这些感兴趣,请在下面留下您的评论并索取此类教程。

如果您有兴趣探索 Qmake,我最近还发布了 Qmake 重要指南。 这是一本很棒的书!