Der fehlende Artikel über Qt Multithreading in C++

Veröffentlicht: 2022-03-11

C++-Entwickler streben danach, robuste Multithreading-Qt-Anwendungen zu erstellen, aber Multithreading war nie einfach mit all diesen Race Conditions, Synchronisationen und Deadlocks und Livelocks. Zu Ihrer Ehre geben Sie nicht auf und finden sich StackOverflow durchwühlend wieder. Dennoch ist es ziemlich nicht trivial, aus einem Dutzend verschiedener Antworten die richtige und funktionierende Lösung auszuwählen, insbesondere angesichts der Tatsache, dass jede Lösung ihre eigenen Nachteile hat.

Multithreading ist ein weit verbreitetes Programmier- und Ausführungsmodell, das es ermöglicht, dass mehrere Threads im Kontext eines Prozesses existieren. Diese Threads teilen sich die Ressourcen des Prozesses, können aber unabhängig voneinander ausgeführt werden. Das Threaded-Programmiermodell bietet Entwicklern eine nützliche Abstraktion der gleichzeitigen Ausführung. Multithreading kann auch auf einen Prozess angewendet werden, um eine parallele Ausführung auf einem Multiprozessorsystem zu ermöglichen.

– Wikipedia

Das Ziel dieses Artikels ist es, das grundlegende Wissen über die gleichzeitige Programmierung mit dem Qt-Framework zusammenzufassen, insbesondere die am häufigsten missverstandenen Themen. Es wird erwartet, dass ein Leser über Vorkenntnisse in Qt und C++ verfügt, um den Inhalt zu verstehen.

Auswahl zwischen der Verwendung von QThreadPool und QThread

Das Qt-Framework bietet viele Tools für Multithreading. Die Auswahl des richtigen Tools kann zunächst schwierig sein, aber tatsächlich besteht der Entscheidungsbaum nur aus zwei Optionen: Sie möchten entweder, dass Qt die Threads für Sie verwaltet, oder Sie möchten die Threads selbst verwalten. Es gibt jedoch noch weitere wichtige Kriterien:

  1. Aufgaben, die die Ereignisschleife nicht benötigen. Insbesondere die Tasks, die während der Taskausführung keinen Signal/Slot-Mechanismus verwenden.
    Verwendung: QtConcurrent und QThreadPool + QRunnable.

  2. Tasks, die Signale/Slots verwenden und daher die Ereignisschleife benötigen.
    Verwendung: Worker-Objekte nach + QThread verschoben.

Die große Flexibilität des Qt-Frameworks ermöglicht es Ihnen, das Problem der „fehlenden Ereignisschleife“ zu umgehen und eines zu QRunnable :

 class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }

Versuchen Sie jedoch, solche „Workarounds“ zu vermeiden, da diese gefährlich und nicht effizient sind: Wenn einer der Threads aus dem Thread-Pool (auf dem MyTask ausgeführt wird) blockiert wird, weil er auf ein Signal wartet, kann er keine anderen Aufgaben aus dem Pool ausführen.

Alt-Text

Sie können einen QThread auch ohne Ereignisschleife ausführen, indem Sie die Methode QThread::run() überschreiben, und das ist völlig in Ordnung, solange Sie wissen, was Sie tun. Erwarten Sie beispielsweise nicht, dass die Methode quit() in einem solchen Fall funktioniert.

Ausführen einer Aufgabeninstanz nach der anderen

Stellen Sie sich vor, Sie müssen sicherstellen, dass jeweils nur eine Aufgabeninstanz ausgeführt werden kann und alle ausstehenden Anforderungen zur Ausführung derselben Aufgabe in einer bestimmten Warteschlange warten. Dies ist häufig erforderlich, wenn eine Aufgabe auf eine exklusive Ressource zugreift, z. B. beim Schreiben in dieselbe Datei oder beim Senden von Paketen über TCP-Socket.

Vergessen wir für einen Moment die Informatik und das Producer-Consumer-Muster und betrachten etwas Triviales; etwas, das in realen Projekten leicht zu finden ist.

Eine naive Lösung für dieses Problem könnte die Verwendung eines QMutex . Innerhalb der Aufgabenfunktion könnten Sie einfach den Mutex erwerben, der alle Threads serialisiert, die versuchen, die Aufgabe auszuführen. Dies würde garantieren, dass jeweils nur ein Thread die Funktion ausführen kann. Diese Lösung wirkt sich jedoch auf die Leistung aus, indem sie ein Problem mit hoher Konkurrenz einführt, da alle diese Threads (auf dem Mutex) blockiert würden, bevor sie fortfahren könnten. Wenn Sie viele Threads haben, die eine solche Aufgabe aktiv verwenden und zwischendurch einige nützliche Aufgaben erledigen, werden alle diese Threads die meiste Zeit nur schlafen.

 void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }

Um Konflikte zu vermeiden, benötigen wir eine Warteschlange und einen Worker, der in einem eigenen Thread lebt und die Warteschlange verarbeitet. Das ist so ziemlich das klassische Producer-Consumer- Muster. Der Worker ( Verbraucher ) würde Anfragen nacheinander aus der Warteschlange auswählen, und jeder Erzeuger kann seine Anfragen einfach in die Warteschlange einfügen. Klingt zunächst einfach und Sie denken vielleicht an die Verwendung von QQueue und QWaitCondition , aber warten Sie und lassen Sie uns sehen, ob wir das Ziel ohne diese Primitive erreichen können:

  • Wir können QThreadPool verwenden, da es eine Warteschlange mit ausstehenden Aufgaben hat

Oder

  • Wir können standardmäßig QThread::run() verwenden, weil es QEventLoop hat

Die erste Option ist die Verwendung von QThreadPool . Wir können eine QThreadPool Instanz erstellen und QThreadPool::setMaxThreadCount(1) verwenden. Dann können wir QtConcurrent::run() verwenden, um Anfragen zu planen:

 class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };

Diese Lösung hat einen Vorteil: QThreadPool::clear() können Sie alle ausstehenden Anfragen sofort abbrechen , beispielsweise wenn Ihre Anwendung schnell heruntergefahren werden muss. Es gibt jedoch auch einen erheblichen Nachteil, der mit der Thread-Affinität verbunden ist: logEventCore -Funktion wird wahrscheinlich von Aufruf zu Aufruf in verschiedenen Threads ausgeführt. Und wir wissen, dass Qt einige Klassen hat, die Thread-Affinität erfordern: QTimer , QTcpSocket und möglicherweise einige andere.

Was die Qt-Spezifikation über die Thread-Affinität sagt: Timer, die in einem Thread gestartet wurden, können nicht von einem anderen Thread gestoppt werden. Und nur der Thread, der eine Socket-Instanz besitzt, kann diesen Socket verwenden. Dies impliziert, dass Sie alle laufenden Timer in dem Thread stoppen müssen, der sie gestartet hat, und dass Sie QTcpSocket::close() in dem Thread aufrufen müssen, der den Socket besitzt. Beide Beispiele werden normalerweise in Destruktoren ausgeführt.

Die bessere Lösung basiert auf der Verwendung von QThread , das von QEventLoop bereitgestellt wird. Die Idee ist einfach: Wir verwenden einen Signal-/Slot-Mechanismus, um Anfragen zu stellen, und die im Thread laufende Ereignisschleife dient als Warteschlange, in der jeweils nur ein Slot ausgeführt werden kann.

 // the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };

Die Implementierung des LogWorker Konstruktors und von logEvent ist unkompliziert und wird daher hier nicht bereitgestellt. Jetzt brauchen wir einen Dienst, der den Thread und die Worker-Instanz verwaltet:

 // interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent("event"); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); } 

Alt-Text

Lassen Sie uns diskutieren, wie dieser Code funktioniert:

  • Im Konstruktor erstellen wir einen Thread und eine Worker-Instanz. Beachten Sie, dass der Worker kein übergeordnetes Element erhält, da er in den neuen Thread verschoben wird. Aus diesem Grund kann Qt den Arbeitsspeicher des Workers nicht automatisch freigeben, und deshalb müssen wir dies tun, indem wir das QThread::finished -Signal mit dem deleteLater Slot verbinden. Wir verbinden auch die Proxy-Methode LogService::logEvent() mit LogWorker::logEvent() , die aufgrund unterschiedlicher Threads den Qt::QueuedConnection Modus verwenden wird.
  • Im Destruktor stellen wir das quit -Ereignis in die Warteschlange der Ereignisschleife. Dieses Ereignis wird behandelt, nachdem alle anderen Ereignisse behandelt wurden. Wenn wir beispielsweise unmittelbar vor dem Destruktor-Aufruf Hunderte von logEvent() Aufrufen durchgeführt haben, verarbeitet der Logger sie alle, bevor er das quit-Ereignis abruft. Das braucht natürlich Zeit, also müssen wir wait() , bis die Ereignisschleife beendet wird. Es ist erwähnenswert, dass alle zukünftigen Logging-Anforderungen, die nach dem quit-Ereignis gesendet werden, niemals verarbeitet werden.
  • Die Protokollierung selbst ( LogWorker::logEvent ) erfolgt immer im selben Thread, daher funktioniert dieser Ansatz gut für Klassen, die Thread-Affinität erfordern. Gleichzeitig werden der LogWorker Konstruktor und -Destruktor im Hauptthread ausgeführt (insbesondere im Thread, in dem LogService ausgeführt wird), und daher müssen Sie sehr vorsichtig sein, welchen Code Sie dort ausführen. Stoppen Sie insbesondere keine Timer und verwenden Sie keine Sockets im Destruktor des Workers, es sei denn, Sie könnten den Destruktor im selben Thread ausführen!

Ausführen des Destruktors des Workers im selben Thread

Wenn Ihr Worker mit Timern oder Sockets arbeitet, müssen Sie sicherstellen, dass der Destruktor im selben Thread ausgeführt wird (dem Thread, den Sie für den Worker erstellt haben und in den Sie den Worker verschoben haben). Der offensichtliche Weg, dies zu unterstützen, besteht darin, QThread und den Worker in QThread::run() Methode zu delete . Betrachten Sie die folgende Vorlage:

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };

Mit dieser Vorlage definieren wir LogService aus dem vorherigen Beispiel neu:

 // interface class LogService : public Thread<LogWorker> { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread<LogWorker>(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }

Lassen Sie uns besprechen, wie das funktionieren soll:

  • Wir haben LogService zum QThread Objekt gemacht, weil wir die benutzerdefinierte run() Funktion implementieren mussten. Wir haben private Unterklassen verwendet, um den Zugriff auf die Funktionen von QThread zu verhindern, da wir den Lebenszyklus des Threads intern steuern möchten.
  • In der Funktion Thread::run() führen wir die Ereignisschleife aus, indem wir die standardmäßige QThread::run() -Implementierung aufrufen und die Worker-Instanz direkt nach dem Beenden der Ereignisschleife zerstören. Beachten Sie, dass der Destruktor des Workers im selben Thread ausgeführt wird.
  • LogService::logEvent() ist die Proxy-Funktion (Signal), die das Protokollierungsereignis an die Ereigniswarteschlange des Threads sendet.

Anhalten und Fortsetzen der Threads

Eine weitere interessante Möglichkeit besteht darin, unsere benutzerdefinierten Threads auszusetzen und fortzusetzen. Stellen Sie sich vor, Ihre Anwendung führt eine Verarbeitung aus, die angehalten werden muss, wenn die Anwendung minimiert oder gesperrt wird oder einfach die Netzwerkverbindung verloren hat. Dies kann erreicht werden, indem eine benutzerdefinierte asynchrone Warteschlange erstellt wird, die alle ausstehenden Anforderungen enthält, bis der Worker fortgesetzt wird. Da wir jedoch nach einfachsten Lösungen suchen, werden wir (erneut) die Warteschlange der Ereignisschleife für denselben Zweck verwenden.

Um einen Thread zu suspendieren, müssen wir ihn eindeutig auf eine bestimmte Wartebedingung warten lassen. Wenn der Thread auf diese Weise blockiert wird, verarbeitet seine Ereignisschleife keine Ereignisse und Qt muss keep in die Warteschlange stellen. Nach der Wiederaufnahme verarbeitet die Ereignisschleife alle angesammelten Anforderungen. Für die Wartebedingung verwenden wir einfach das QWaitCondition Objekt, das auch ein QMutex erfordert. Um eine generische Lösung zu entwerfen, die von jedem Arbeiter wiederverwendet werden kann, müssen wir die gesamte Suspend/Resume-Logik in eine wiederverwendbare Basisklasse einfügen. Nennen wir es SuspendableWorker . Eine solche Klasse soll zwei Methoden unterstützen:

  • suspend() wäre ein blockierender Aufruf, der den wartenden Thread in eine Wartebedingung versetzt. Dies würde dadurch erfolgen, dass eine Aussetzungsanforderung in die Warteschlange gestellt und gewartet wird, bis sie bearbeitet wird. Ziemlich ähnlich wie QThread::quit() + wait() .
  • resume() würde die Wartebedingung signalisieren, den schlafenden Thread aufzuwecken, um seine Ausführung fortzusetzen.

Lassen Sie uns die Schnittstelle und Implementierung überprüfen:

 // interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
 // implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }

Denken Sie daran, dass ein suspendierter Thread niemals ein quit Ereignis erhält. Aus diesem Grund können wir dies nicht sicher mit Vanilla QThread verwenden, es sei denn, wir setzen den Thread fort, bevor wir ihn beenden. Lassen Sie uns dies in unsere benutzerdefinierte Thread<T> -Vorlage integrieren, um es kugelsicher zu machen.

Alt-Text

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };

Mit diesen Änderungen werden wir den Thread fortsetzen, bevor wir das Quit-Ereignis posten. Außerdem lässt Thread<TWorker> weiterhin zu, dass jede Art von Worker übergeben wird, unabhängig davon, ob es sich um einen SuspendableWorker oder nicht.

Die Verwendung wäre wie folgt:

 LogService logService; logService.logEvent("processed event"); logService.suspend(); logService.logEvent("queued event"); logService.resume(); // "queued event" is now processed.

flüchtig vs atomar

Dies ist ein häufig missverstandenes Thema. Die meisten Leute glauben, dass volatile Variablen verwendet werden können, um bestimmte Flags zu bedienen, auf die von mehreren Threads zugegriffen wird, und dass dies vor Data Race-Bedingungen schützt. Das ist falsch, und für diesen Zweck müssen QAtomic* -Klassen (oder std::atomic ) verwendet werden.

Betrachten wir ein realistisches Beispiel: eine TcpConnection -Verbindungsklasse, die in einem dedizierten Thread arbeitet, und wir möchten, dass diese Klasse eine Thread-sichere Methode exportiert: bool isConnected() . Intern überwacht die Klasse Socket-Ereignisse: connected und disconnected , um ein internes boolesches Flag beizubehalten:

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }

_connected Member volatile zu machen, wird das Problem nicht lösen und isConnected() nicht Thread-sicher machen. Diese Lösung wird 99 % der Zeit funktionieren, aber die restlichen 1 % werden Ihr Leben zu einem Alptraum machen. Um dies zu beheben, müssen wir den Variablenzugriff vor mehreren Threads schützen. Verwenden wir zu diesem Zweck QReadWriteLocker :

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }

Dies funktioniert zuverlässig, aber nicht so schnell wie die Verwendung von „lock-freien“ atomaren Operationen. Die dritte Lösung ist sowohl schnell als auch Thread-sicher (das Beispiel verwendet std::atomic anstelle von QAtomicInt , aber semantisch sind diese identisch):

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic<bool> _connected; }

Fazit

In diesem Artikel haben wir mehrere wichtige Bedenken hinsichtlich der gleichzeitigen Programmierung mit dem Qt-Framework erörtert und Lösungen für bestimmte Anwendungsfälle entwickelt. Wir haben viele der einfachen Themen wie die Verwendung von atomaren Primitives, Lese-Schreib-Sperren und viele andere nicht berücksichtigt, aber wenn Sie daran interessiert sind, hinterlassen Sie unten Ihren Kommentar und fragen Sie nach einem solchen Tutorial.

Wenn Sie daran interessiert sind, Qmake zu erkunden, habe ich kürzlich auch The Vital Guide to Qmake veröffentlicht. Es ist eine großartige Lektüre!