C++でのQtマルチスレッドに関する欠落している記事
公開: 2022-03-11C ++開発者は、堅牢なマルチスレッドQtアプリケーションの構築に努めていますが、これらすべての競合状態、同期、デッドロックとライブロックでは、マルチスレッドは決して簡単ではありませんでした。 あなたの名誉のために、あなたはあきらめず、StackOverflowを精査していることに気づきません。 それにもかかわらず、特に各ソリューションには独自の欠点があることを考えると、12の異なる答えから適切で実用的なソリューションを選択することはかなり簡単ではありません。
マルチスレッドは、1つのプロセスのコンテキスト内に複数のスレッドが存在できるようにする、広く普及しているプログラミングおよび実行モデルです。 これらのスレッドはプロセスのリソースを共有しますが、独立して実行できます。 スレッド化されたプログラミングモデルは、開発者に並行実行の有用な抽象化を提供します。 マルチスレッドを1つのプロセスに適用して、マルチプロセッシングシステムでの並列実行を可能にすることもできます。
–ウィキペディア
この記事の目的は、Qtフレームワークとの並行プログラミングに関する基本的な知識、特に最も誤解されているトピックを集約することです。 読者は、コンテンツを理解するために、QtおよびC++の以前のバックグラウンドを持っていることが期待されます。
QThreadPool
とQThread
のどちらを使用するかを選択する
Qtフレームワークは、マルチスレッド用の多くのツールを提供します。 最初は適切なツールを選択するのが難しい場合がありますが、実際には、決定木は2つのオプションで構成されています。Qtにスレッドを管理させるか、自分でスレッドを管理するかです。 ただし、他にも重要な基準があります。
イベントループを必要としないタスク。 具体的には、タスクの実行中にシグナル/スロットメカニズムを使用していないタスク。
使用:QtConcurrentおよびQThreadPool+QRunnable。シグナル/スロットを使用するため、イベントループが必要なタスク。
使用:ワーカーオブジェクトは+QThreadに移動されました。
Qtフレームワークの優れた柔軟性により、「イベントループの欠落」の問題を回避し、 QRunnable
に追加することができます。
class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }
ただし、このような「回避策」は危険で効率的ではないため、回避してください。シグナルを待機しているためにスレッドプール(MyTaskを実行している)のスレッドの1つがブロックされている場合、プールから他のタスクを実行できません。
QThread::run()
メソッドをオーバーライドすることで、イベントループなしでQThread
を実行することもできます。これは、実行していることを理解している限り、まったく問題ありません。 たとえば、このような場合にquit()
メソッドが機能することを期待しないでください。
一度に1つのタスクインスタンスを実行する
一度に1つのタスクインスタンスのみを実行でき、同じタスクを実行するためのすべての保留中の要求が特定のキューで待機していることを確認する必要があるとします。 これは、同じファイルへの書き込みやTCPソケットを使用したパケットの送信など、タスクが排他的リソースにアクセスしている場合によく必要になります。
コンピュータサイエンスと生産者/消費者パターンを少し忘れて、些細なことを考えてみましょう。 実際のプロジェクトで簡単に見つけることができるもの。
この問題の素朴な解決策は、 QMutex
を使用することです。 タスク関数内では、タスクを実行しようとするすべてのスレッドを効果的にシリアル化するミューテックスを取得するだけで済みます。 これにより、一度に1つのスレッドのみが関数を実行できることが保証されます。 ただし、このソリューションは、続行する前にすべてのスレッドが(ミューテックスで)ブロックされるため、競合の多い問題が発生するため、パフォーマンスに影響を与えます。 多くのスレッドがそのようなタスクを積極的に使用していて、その間にいくつかの有用な仕事をしている場合、これらのスレッドはすべてほとんどの時間スリープしているだけです。
void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }
競合を回避するには、キューと、独自のスレッドに存在し、キューを処理するワーカーが必要です。 これは、ほとんど古典的な生産者/消費者パターンです。 ワーカー(コンシューマー)はキューからリクエストを1つずつ選択し、各プロデューサーはそのリクエストをキューに追加するだけです。 最初は単純に聞こえますが、 QQueue
とQWaitCondition
を使用することを考えるかもしれませんが、これらのプリミティブなしで目標を達成できるかどうかを確認してみましょう。
- 保留中のタスクのキューがあるため、
QThreadPool
を使用できます
または
QEventLoop
があるため、デフォルトQThread::run()
を使用できます
最初のオプションは、 QThreadPool
を使用することです。 QThreadPool
インスタンスを作成し、 QThreadPool::setMaxThreadCount(1)
を使用できます。 次に、 QtConcurrent::run()
を使用してリクエストをスケジュールできます。
class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };
このソリューションには1つの利点がありますQThreadPool::clear()
を使用すると、たとえば、アプリケーションをすばやくシャットダウンする必要がある場合に、保留中のすべての要求を即座にキャンセルできます。 ただし、スレッドアフィニティに関連する重大な欠点もありますlogEventCore
関数は、呼び出しごとに異なるスレッドで実行される可能性があります。 また、Qtには、スレッドアフィニティを必要とするクラスがいくつかあることがわかっていますQTimer
、 QTcpSocket
、および場合によっては他のクラスです。
Qt仕様がスレッドアフィニティについて述べていること:タイマーは1つのスレッドで開始され、別のスレッドから停止することはできません。 また、ソケットインスタンスを所有するスレッドのみがこのソケットを使用できます。 これは、タイマーを開始したスレッドで実行中のタイマーをすべて停止し、ソケットを所有するスレッドでQTcpSocket :: close()を呼び出す必要があることを意味します。 どちらの例も通常、デストラクタで実行されます。
より良い解決策は、 QThread
が提供するQEventLoop
の使用に依存しています。 考え方は単純です。シグナル/スロットメカニズムを使用してリクエストを発行し、スレッド内で実行されているイベントループがキューとして機能し、一度に1つのスロットのみを実行できるようにします。
// the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };
LogWorker
コンストラクターとlogEvent
の実装は簡単であるため、ここでは提供しません。 ここで、スレッドとワーカーインスタンスを管理するサービスが必要です。
// interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent("event"); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); }
このコードがどのように機能するかを説明しましょう。
- コンストラクターでは、スレッドとワーカーのインスタンスを作成します。 ワーカーは新しいスレッドに移動されるため、親を受け取らないことに注意してください。 このため、Qtはワーカーのメモリを自動的に解放できません。したがって、
QThread::finished
シグナルをdeleteLater
スロットに接続してこれを行う必要があります。 また、プロキシメソッドLogService::logEvent()
をLogWorker::logEvent()
)に接続します。LogWorker:: logEvent()は、スレッドが異なるため、Qt::QueuedConnection
モードを使用します。 - デストラクタでは、
quit
イベントをイベントループのキューに入れます。 このイベントは、他のすべてのイベントが処理された後に処理されます。 たとえば、デストラクタ呼び出しの直前に数百のlogEvent()
呼び出しを行った場合、ロガーはquitイベントをフェッチする前にそれらすべてを処理します。 もちろん、これには時間がかかるため、イベントループが終了するまでwait()
する必要があります。 quitイベントの後に投稿された今後のすべてのロギング要求は決して処理されないことに注意してください。 - ロギング自体(
LogWorker::logEvent
)は常に同じスレッドで実行されるため、このアプローチはスレッドアフィニティを必要とするクラスでうまく機能します。 同時に、LogWorker
コンストラクタとデストラクタはメインスレッド(具体的にはLogService
が実行されているスレッド)で実行されるため、そこで実行しているコードに十分注意する必要があります。 具体的には、同じスレッドでデストラクタを実行できる場合を除いて、タイマーを停止したり、ワーカーのデストラクタでソケットを使用したりしないでください。
同じスレッドでワーカーのデストラクタを実行する
ワーカーがタイマーまたはソケットを処理している場合は、デストラクタが同じスレッド(ワーカー用に作成したスレッドとワーカーの移動先)で実行されるようにする必要があります。 これをサポートする明白な方法は、 QThread
をサブクラス化し、 QThread::run()
メソッド内のワーカーをdelete
ことです。 次のテンプレートを検討してください。

template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };
このテンプレートを使用して、前の例からLogService
を再定義します。
// interface class LogService : public Thread<LogWorker> { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread<LogWorker>(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }
これがどのように機能するかについて説明しましょう。
- カスタム
run()
関数を実装する必要があるため、LogService
をQThread
オブジェクトにしました。 スレッドのライフサイクルを内部で制御したいので、プライベートサブクラス化を使用してQThread
の関数にアクセスできないようにしました。 -
Thread::run()
関数では、デフォルトのQThread::run()
実装を呼び出してイベントループを実行し、イベントループが終了した直後にワーカーインスタンスを破棄します。 ワーカーのデストラクタは同じスレッドで実行されることに注意してください。 -
LogService::logEvent()
は、ロギングイベントをスレッドのイベントキューに送信するプロキシ関数(シグナル)です。
スレッドの一時停止と再開
もう1つの興味深い機会は、カスタムスレッドを一時停止および再開できることです。 アプリケーションが最小化されたとき、ロックされたとき、またはネットワーク接続が失われたときに一時停止する必要のある処理をアプリケーションが実行していると想像してください。 これは、ワーカーが再開されるまですべての保留中の要求を保持するカスタム非同期キューを構築することで実現できます。 ただし、最も簡単な解決策を探しているので、同じ目的でイベントループのキューを(再び)使用します。
スレッドを一時停止するには、特定の待機条件で待機する必要があることは明らかです。 スレッドがこのようにブロックされている場合、そのイベントループはイベントを処理しておらず、Qtはkeepをキューに入れる必要があります。 再開されると、イベントループは蓄積されたすべてのリクエストを処理します。 待機条件には、 QMutex
も必要とするQWaitCondition
オブジェクトを使用するだけです。 すべてのワーカーが再利用できる汎用ソリューションを設計するには、すべてのサスペンド/レジュームロジックを再利用可能な基本クラスに配置する必要があります。 それをSuspendableWorker
と呼びましょう。 このようなクラスは、次の2つのメソッドをサポートする必要があります。
-
suspend()
は、待機状態で待機しているスレッドを設定するブロッキング呼び出しになります。 これは、一時停止要求をキューに投稿し、それが処理されるまで待機することによって行われます。QThread::quit()
+wait()
とほとんど同じです。 -
resume()
は、スリープ状態のスレッドをウェイクアップして実行を継続するための待機条件を通知します。
インターフェイスと実装を確認しましょう。
// interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
// implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }
中断されたスレッドがquit
イベントを受け取ることは決してないことに注意してください。 このため、終了を投稿する前にスレッドを再開しない限り、これをバニラQThread
で安全に使用することはできません。 これをカスタムThread<T>
テンプレートに統合して、防弾にします。
template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };
これらの変更により、quitイベントを投稿する前にスレッドを再開します。 また、 Thread<TWorker>
を使用すると、 SuspendableWorker
であるかどうかに関係なく、あらゆる種類のワーカーを渡すことができます。
使用法は次のようになります。
LogService logService; logService.logEvent("processed event"); logService.suspend(); logService.logEvent("queued event"); logService.resume(); // "queued event" is now processed.
揮発性vsアトミック
これはよく誤解されているトピックです。 ほとんどの人は、 volatile
変数を使用して複数のスレッドからアクセスされる特定のフラグを処理できると考えており、これによりデータの競合状態から保護されます。 これは誤りであり、この目的にはQAtomic*
クラス(またはstd::atomic
)を使用する必要があります。
現実的な例を考えてみましょう。専用スレッドで動作するTcpConnection
接続クラスであり、このクラスにスレッドセーフメソッドbool isConnected()
をエクスポートさせます。 内部的には、クラスはソケットイベントをリッスンします。内部ブールフラグを維持するためにconnected
およびdisconnected
されます。
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }
_connected
メンバーをvolatile
にすることは問題を解決せず、 isConnected()
をスレッドセーフにすることもありません。 このソリューションは99%の確率で機能しますが、残りの1%はあなたの人生を悪夢にします。 これを修正するには、複数のスレッドから変数アクセスを保護する必要があります。 この目的でQReadWriteLocker
を使用してみましょう。
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }
これは確実に機能しますが、「ロックフリー」アトミック操作を使用するほど速くはありません。 3番目の解決策は高速でスレッドセーフです(例ではQAtomicInt
の代わりにstd::atomic
を使用していますが、意味的にはこれらは同じです):
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic<bool> _connected; }
結論
この記事では、Qtフレームワークとの並行プログラミングに関するいくつかの重要な懸念事項について説明し、特定のユースケースに対応するソリューションを設計しました。 アトミックプリミティブの使用、読み取り/書き込みロックなどの単純なトピックの多くは考慮していませんが、これらに興味がある場合は、以下にコメントを残して、そのようなチュートリアルを依頼してください。
Qmakeの探索に興味がある場合は、最近、Qmakeの重要なガイドも公開しました。 素晴らしい読み物です!