關於 C++ 中 Qt 多線程的缺失文章

已發表: 2022-03-11

C++ 開發人員努力構建強大的多線程 Qt 應用程序,但在所有這些競爭條件、同步、死鎖和活鎖的情況下,多線程從來都不是一件容易的事。 值得稱讚的是,您沒有放棄並發現自己正在搜索 StackOverflow。 儘管如此,從十幾個不同的答案中選擇正確且有效的解決方案並非易事,尤其是考慮到每種解決方案都有其自身的缺點。

多線程是一種廣泛使用的編程和執行模型,它允許多個線程存在於一個進程的上下文中。 這些線程共享進程的資源,但能夠獨立執行。 線程編程模型為開發人員提供了有用的並發執行抽象。 多線程也可以應用於一個進程,以在多處理系統上實現並行執行。

– 維基百科

本文的目的是匯總有關使用 Qt 框架進行並發編程的基本知識,特別是最容易被誤解的主題。 讀者應具有 Qt 和 C++ 方面的背景才能理解內容。

在使用QThreadPoolQThread之間進行選擇

Qt 框架為多線程提供了許多工具。 選擇正確的工具起初可能具有挑戰性,但事實上,決策樹僅包含兩個選項:您要么希望 Qt 為您管理線程,要么您希望自己管理線程。 但是,還有其他重要標準:

  1. 不需要事件循環的任務。 具體來說,任務執行過程中不使用信號/槽機制的任務。
    使用:QtConcurrent 和 QThreadPool + QRunnable。

  2. 使用信號/槽並因此需要事件循環的任務。
    使用: Worker 對象移動到 + QThread。

Qt 框架的極大靈活性允許您解決“缺少事件循環”問題並將一個添加到QRunnable

 class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }

但是,盡量避免這種“變通方法”,因為這些方法既危險又不高效:如果線程池中的一個線程(運行 MyTask)由於等待信號而被阻塞,則它無法執行池中的其他任務。

替代文字

您還可以通過覆蓋QThread::run()方法來運行沒有任何事件循環的QThread ,只要您知道自己在做什麼,這非常好。 例如,不要期望方法quit()在這種情況下工作。

一次運行一個任務實例

想像一下,您需要確保一次只能執行一個任務實例,並且運行同一任務的所有待處理請求都在某個隊列上等待。 這通常在任務訪問獨占資源時需要,例如寫入同一個文件或使用 TCP 套接字發送數據包。

讓我們暫時忘記計算機科學和生產者-消費者模式,考慮一些微不足道的事情; 在實際項目中很容易找到的東西。

這個問題的一個簡單的解決方案可能是使用QMutex 。 在任務函數內部,您可以簡單地獲取互斥鎖,從而有效地序列化所有嘗試運行任務的線程。 這將保證一次只有一個線程可以運行該函數。 但是,此解決方案通過引入高爭用問題來影響性能,因為所有這些線程在它們可以繼續之前都會被阻塞(在互斥體上)。 如果您有許多線程在積極使用這樣的任務並在其間做一些有用的工作,那麼所有這些線程大部分時間都只是在休眠。

 void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }

為了避免爭用,我們需要一個隊列和一個工作在自己的線程中並處理隊列。 這幾乎是經典的生產者-消費者模式。 工作者(消費者)將從隊列中一一挑選請求,每個生產者可以簡單地將其請求添加到隊列中。 一開始聽起來很簡單,您可能會想到使用QQueueQWaitCondition ,但請稍等,看看我們是否可以在沒有這些原語的情況下實現目標:

  • 我們可以使用QThreadPool ,因為它有一個待處理的任務隊列

要么

  • 我們可以使用默認QThread::run()因為它有QEventLoop

第一個選項是使用QThreadPool 。 我們可以創建一個QThreadPool實例並使用QThreadPool::setMaxThreadCount(1) 。 然後我們可以使用QtConcurrent::run()來安排請求:

 class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };

這個解決方案有一個好處: QThreadPool::clear()允許您立即取消所有掛起的請求,例如當您的應用程序需要快速關閉時。 但是,還有一個與線程關聯性相關的重大缺點: logEventCore函數可能會在不同的線程中執行,從一個調用到另一個調用。 我們知道 Qt 有一些需要線程關聯的類: QTimerQTcpSocket和可能的其他一些。

Qt 規範對線程親和性的描述是:定時器在一個線程中啟動,不能從另一個線程停止。 並且只有擁有套接字實例的線程才能使用這個套接字。 這意味著您必須停止啟動它們的線程中的任何正在運行的計時器,並且您必須在擁有套接字的線程中調用 QTcpSocket::close()。 這兩個示例通常都在析構函數中執行。

更好的解決方案依賴於使用QThread提供的QEventLoop 。 這個想法很簡單:我們使用信號/槽機制來發出請求,並且在線程內運行的事件循環將作為一個隊列,一次只允許執行一個槽。

 // the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };

LogWorker構造函數和logEvent的實現很簡單,因此這裡不提供。 現在我們需要一個服務來管理線程和工作實例:

 // interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent("event"); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); } 

替代文字

讓我們討論一下這段代碼是如何工作的:

  • 在構造函數中,我們創建了一個線程和工作者實例。 請注意,worker 沒有收到父線程,因為它將被移動到新線程。 正因為如此,Qt 將無法自動釋放 worker 的內存,因此,我們需要通過將QThread::finished信號連接到deleteLater插槽來做到這一點。 我們還將代理方法LogService::logEvent()連接到LogWorker::logEvent() ,由於線程不同,它將使用Qt::QueuedConnection模式。
  • 在析構函數中,我們將quit事件放入事件循環的隊列中。 此事件將在處理完所有其他事件處理。 例如,如果我們在析構函數調用之前進行了數百次logEvent()調用,那麼記錄器將在獲取退出事件之前處理它們。 當然,這需要時間,所以我們必須wait()直到事件循環退出。 值得一提的是,在退出事件之後發布的所有未來日誌記錄請求將永遠不會被處理。
  • 日誌記錄本身( LogWorker::logEvent )將始終在同一個線程中完成,因此這種方法對於需要thread-affinity的類來說效果很好。 同時, LogWorker的構造函數和析構函數都在主線程(特別是LogService運行所在的線程)中執行,因此,您需要非常小心您在那裡運行的代碼。 具體來說,除非您可以在同一個線程中運行析構函數,否則不要在工作者的析構函數中停止計時器或使用套接字!

在同一個線程中執行worker的析構函數

如果您的工作人員正在處理計時器或套接字,您需要確保析構函數在同一個線程中執行(您為工作人員創建的線程以及您將工作人員移動到的位置)。 支持這一點的明顯方法是QThread並在QThread::run()方法中delete worker。 考慮以下模板:

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };

使用這個模板,我們從前面的例子中重新定義LogService

 // interface class LogService : public Thread<LogWorker> { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread<LogWorker>(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }

讓我們討論一下這應該如何工作:

  • 我們將LogService設為QThread對象,因為我們需要實現自定義的run()函數。 我們使用私有子類來防止訪問QThread的函數,因為我們想在內部控制線程的生命週期。
  • Thread::run()函數中,我們通過調用默認的QThread::run()實現來運行事件循環,並在事件循環退出後立即銷毀工作實例。 請注意,worker 的析構函數在同一個線程中執行。
  • LogService::logEvent()是將日誌事件發佈到線程的事件隊列的代理函數(信號)。

暫停和恢復線程

另一個有趣的機會是能夠暫停和恢復我們的自定義線程。 想像一下,當應用程序最小化、鎖定或剛剛失去網絡連接時,您的應用程序正在執行一些需要暫停的處理。 這可以通過構建一個自定義異步隊列來實現,該隊列將保存所有待處理的請求,直到工作人員恢復。 然而,由於我們正在尋找最簡單的解決方案,我們將(再次)使用事件循環的隊列來實現相同的目的。

要掛起一個線程,我們顯然需要它等待某個等待條件。 如果線程以這種方式被阻塞,它的事件循環不會處理任何事件,Qt 必須將 keep 放入隊列中。 一旦恢復,事件循環將處理所有累積的請求。 對於等待條件,我們只需使用同樣需要QMutexQWaitCondition對象。 為了設計一個可以被任何工作人員重用的通用解決方案,我們需要將所有掛起/恢復邏輯放入一個可重用的基類中。 我們稱之為SuspendableWorker 。 這樣的類應支持兩種方法:

  • suspend()將是一個阻塞調用,它將線程設置為等待條件。 這將通過將掛起請求發佈到隊列中並等待它被處理來完成。 非常類似於QThread::quit() + wait()
  • resume()將發出等待條件以喚醒睡眠線程以繼續執行。

讓我們回顧一下接口和實現:

 // interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
 // implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }

請記住,掛起的線程永遠不會收到quit事件。 出於這個原因,除非我們在發布退出之前恢復線程,否則我們不能安全地將它與 vanilla QThread一起使用。 讓我們將它集成到我們的自定義Thread<T>模板中以使其防彈。

替代文字

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };

通過這些更改,我們將在發布退出事件之前恢復線程。 此外, Thread<TWorker>仍然允許傳入任何類型的工作者,無論它是否為SuspendableWorker

用法如下:

 LogService logService; logService.logEvent("processed event"); logService.suspend(); logService.logEvent("queued event"); logService.resume(); // "queued event" is now processed.

易失性與原子性

這是一個經常被誤解的話題。 大多數人認為volatile變量可用於為多個線程訪問的某些標誌提供服務,並且可以避免數據競爭條件。 那是錯誤的,為此必須使用QAtomic*類(或std::atomic )。

讓我們考慮一個現實的例子:一個在專用線程中工作的TcpConnection連接類,我們希望這個類導出一個線程安全的方法: bool isConnected() 。 在內部,該類將監聽套接字事件: connecteddisconnected以維護內部布爾標誌:

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }

使_connected成員volatile不會解決問題,也不會使isConnected()線程安全。 此解決方案將在 99% 的時間內有效,但剩下的 1% 將使您的生活成為一場噩夢。 為了解決這個問題,我們需要保護來自多個線程的變量訪問。 讓我們為此目的使用QReadWriteLocker

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }

這工作可靠,但不如使用“無鎖”原子操作快。 第三種解決方案既快速又線程安全(該示例使用std::atomic而不是QAtomicInt ,但在語義上它們是相同的):

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic<bool> _connected; }

結論

在本文中,我們討論了使用 Qt 框架進行並發編程的幾個重要問題,並設計了解決特定用例的解決方案。 我們沒有考慮許多簡單的主題,例如使用原子原語、讀寫鎖和許多其他主題,但如果您對這些感興趣,請在下面留下您的評論並索取此類教程。

如果您有興趣探索 Qmake,我最近還發布了 Qmake 重要指南。 這是一本很棒的書!