Der fehlende Artikel über Qt Multithreading in C++
Veröffentlicht: 2022-03-11C++-Entwickler streben danach, robuste Multithreading-Qt-Anwendungen zu erstellen, aber Multithreading war nie einfach mit all diesen Race Conditions, Synchronisationen und Deadlocks und Livelocks. Zu Ihrer Ehre geben Sie nicht auf und finden sich StackOverflow durchwühlend wieder. Dennoch ist es ziemlich nicht trivial, aus einem Dutzend verschiedener Antworten die richtige und funktionierende Lösung auszuwählen, insbesondere angesichts der Tatsache, dass jede Lösung ihre eigenen Nachteile hat.
Multithreading ist ein weit verbreitetes Programmier- und Ausführungsmodell, das es ermöglicht, dass mehrere Threads im Kontext eines Prozesses existieren. Diese Threads teilen sich die Ressourcen des Prozesses, können aber unabhängig voneinander ausgeführt werden. Das Threaded-Programmiermodell bietet Entwicklern eine nützliche Abstraktion der gleichzeitigen Ausführung. Multithreading kann auch auf einen Prozess angewendet werden, um eine parallele Ausführung auf einem Multiprozessorsystem zu ermöglichen.
– Wikipedia
Das Ziel dieses Artikels ist es, das grundlegende Wissen über die gleichzeitige Programmierung mit dem Qt-Framework zusammenzufassen, insbesondere die am häufigsten missverstandenen Themen. Es wird erwartet, dass ein Leser über Vorkenntnisse in Qt und C++ verfügt, um den Inhalt zu verstehen.
Auswahl zwischen der Verwendung von QThreadPool
und QThread
Das Qt-Framework bietet viele Tools für Multithreading. Die Auswahl des richtigen Tools kann zunächst schwierig sein, aber tatsächlich besteht der Entscheidungsbaum nur aus zwei Optionen: Sie möchten entweder, dass Qt die Threads für Sie verwaltet, oder Sie möchten die Threads selbst verwalten. Es gibt jedoch noch weitere wichtige Kriterien:
Aufgaben, die die Ereignisschleife nicht benötigen. Insbesondere die Tasks, die während der Taskausführung keinen Signal/Slot-Mechanismus verwenden.
Verwendung: QtConcurrent und QThreadPool + QRunnable.Tasks, die Signale/Slots verwenden und daher die Ereignisschleife benötigen.
Verwendung: Worker-Objekte nach + QThread verschoben.
Die große Flexibilität des Qt-Frameworks ermöglicht es Ihnen, das Problem der „fehlenden Ereignisschleife“ zu umgehen und eines zu QRunnable
:
class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }
Versuchen Sie jedoch, solche „Workarounds“ zu vermeiden, da diese gefährlich und nicht effizient sind: Wenn einer der Threads aus dem Thread-Pool (auf dem MyTask ausgeführt wird) blockiert wird, weil er auf ein Signal wartet, kann er keine anderen Aufgaben aus dem Pool ausführen.
Sie können einen QThread
auch ohne Ereignisschleife ausführen, indem Sie die Methode QThread::run()
überschreiben, und das ist völlig in Ordnung, solange Sie wissen, was Sie tun. Erwarten Sie beispielsweise nicht, dass die Methode quit()
in einem solchen Fall funktioniert.
Ausführen einer Aufgabeninstanz nach der anderen
Stellen Sie sich vor, Sie müssen sicherstellen, dass jeweils nur eine Aufgabeninstanz ausgeführt werden kann und alle ausstehenden Anforderungen zur Ausführung derselben Aufgabe in einer bestimmten Warteschlange warten. Dies ist häufig erforderlich, wenn eine Aufgabe auf eine exklusive Ressource zugreift, z. B. beim Schreiben in dieselbe Datei oder beim Senden von Paketen über TCP-Socket.
Vergessen wir für einen Moment die Informatik und das Producer-Consumer-Muster und betrachten etwas Triviales; etwas, das in realen Projekten leicht zu finden ist.
Eine naive Lösung für dieses Problem könnte die Verwendung eines QMutex
. Innerhalb der Aufgabenfunktion könnten Sie einfach den Mutex erwerben, der alle Threads serialisiert, die versuchen, die Aufgabe auszuführen. Dies würde garantieren, dass jeweils nur ein Thread die Funktion ausführen kann. Diese Lösung wirkt sich jedoch auf die Leistung aus, indem sie ein Problem mit hoher Konkurrenz einführt, da alle diese Threads (auf dem Mutex) blockiert würden, bevor sie fortfahren könnten. Wenn Sie viele Threads haben, die eine solche Aufgabe aktiv verwenden und zwischendurch einige nützliche Aufgaben erledigen, werden alle diese Threads die meiste Zeit nur schlafen.
void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }
Um Konflikte zu vermeiden, benötigen wir eine Warteschlange und einen Worker, der in einem eigenen Thread lebt und die Warteschlange verarbeitet. Das ist so ziemlich das klassische Producer-Consumer- Muster. Der Worker ( Verbraucher ) würde Anfragen nacheinander aus der Warteschlange auswählen, und jeder Erzeuger kann seine Anfragen einfach in die Warteschlange einfügen. Klingt zunächst einfach und Sie denken vielleicht an die Verwendung von QQueue
und QWaitCondition
, aber warten Sie und lassen Sie uns sehen, ob wir das Ziel ohne diese Primitive erreichen können:
- Wir können
QThreadPool
verwenden, da es eine Warteschlange mit ausstehenden Aufgaben hat
Oder
- Wir können standardmäßig
QThread::run()
verwenden, weil esQEventLoop
hat
Die erste Option ist die Verwendung von QThreadPool
. Wir können eine QThreadPool
Instanz erstellen und QThreadPool::setMaxThreadCount(1)
verwenden. Dann können wir QtConcurrent::run()
verwenden, um Anfragen zu planen:
class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };
Diese Lösung hat einen Vorteil: QThreadPool::clear()
können Sie alle ausstehenden Anfragen sofort abbrechen , beispielsweise wenn Ihre Anwendung schnell heruntergefahren werden muss. Es gibt jedoch auch einen erheblichen Nachteil, der mit der Thread-Affinität verbunden ist: logEventCore
-Funktion wird wahrscheinlich von Aufruf zu Aufruf in verschiedenen Threads ausgeführt. Und wir wissen, dass Qt einige Klassen hat, die Thread-Affinität erfordern: QTimer
, QTcpSocket
und möglicherweise einige andere.
Was die Qt-Spezifikation über die Thread-Affinität sagt: Timer, die in einem Thread gestartet wurden, können nicht von einem anderen Thread gestoppt werden. Und nur der Thread, der eine Socket-Instanz besitzt, kann diesen Socket verwenden. Dies impliziert, dass Sie alle laufenden Timer in dem Thread stoppen müssen, der sie gestartet hat, und dass Sie QTcpSocket::close() in dem Thread aufrufen müssen, der den Socket besitzt. Beide Beispiele werden normalerweise in Destruktoren ausgeführt.
Die bessere Lösung basiert auf der Verwendung von QThread
, das von QEventLoop
bereitgestellt wird. Die Idee ist einfach: Wir verwenden einen Signal-/Slot-Mechanismus, um Anfragen zu stellen, und die im Thread laufende Ereignisschleife dient als Warteschlange, in der jeweils nur ein Slot ausgeführt werden kann.
// the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };
Die Implementierung des LogWorker
Konstruktors und von logEvent
ist unkompliziert und wird daher hier nicht bereitgestellt. Jetzt brauchen wir einen Dienst, der den Thread und die Worker-Instanz verwaltet:
// interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent("event"); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); }
Lassen Sie uns diskutieren, wie dieser Code funktioniert:
- Im Konstruktor erstellen wir einen Thread und eine Worker-Instanz. Beachten Sie, dass der Worker kein übergeordnetes Element erhält, da er in den neuen Thread verschoben wird. Aus diesem Grund kann Qt den Arbeitsspeicher des Workers nicht automatisch freigeben, und deshalb müssen wir dies tun, indem wir das
QThread::finished
-Signal mit demdeleteLater
Slot verbinden. Wir verbinden auch die Proxy-MethodeLogService::logEvent()
mitLogWorker::logEvent()
, die aufgrund unterschiedlicher Threads denQt::QueuedConnection
Modus verwenden wird. - Im Destruktor stellen wir das
quit
-Ereignis in die Warteschlange der Ereignisschleife. Dieses Ereignis wird behandelt, nachdem alle anderen Ereignisse behandelt wurden. Wenn wir beispielsweise unmittelbar vor dem Destruktor-Aufruf Hunderte vonlogEvent()
Aufrufen durchgeführt haben, verarbeitet der Logger sie alle, bevor er das quit-Ereignis abruft. Das braucht natürlich Zeit, also müssen wirwait()
, bis die Ereignisschleife beendet wird. Es ist erwähnenswert, dass alle zukünftigen Logging-Anforderungen, die nach dem quit-Ereignis gesendet werden, niemals verarbeitet werden. - Die Protokollierung selbst (
LogWorker::logEvent
) erfolgt immer im selben Thread, daher funktioniert dieser Ansatz gut für Klassen, die Thread-Affinität erfordern. Gleichzeitig werden derLogWorker
Konstruktor und -Destruktor im Hauptthread ausgeführt (insbesondere im Thread, in demLogService
ausgeführt wird), und daher müssen Sie sehr vorsichtig sein, welchen Code Sie dort ausführen. Stoppen Sie insbesondere keine Timer und verwenden Sie keine Sockets im Destruktor des Workers, es sei denn, Sie könnten den Destruktor im selben Thread ausführen!
Ausführen des Destruktors des Workers im selben Thread
Wenn Ihr Worker mit Timern oder Sockets arbeitet, müssen Sie sicherstellen, dass der Destruktor im selben Thread ausgeführt wird (dem Thread, den Sie für den Worker erstellt haben und in den Sie den Worker verschoben haben). Der offensichtliche Weg, dies zu unterstützen, besteht darin, QThread
und den Worker in QThread::run()
Methode zu delete
. Betrachten Sie die folgende Vorlage:

template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };
Mit dieser Vorlage definieren wir LogService
aus dem vorherigen Beispiel neu:
// interface class LogService : public Thread<LogWorker> { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread<LogWorker>(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }
Lassen Sie uns besprechen, wie das funktionieren soll:
- Wir haben
LogService
zumQThread
Objekt gemacht, weil wir die benutzerdefinierterun()
Funktion implementieren mussten. Wir haben private Unterklassen verwendet, um den Zugriff auf die Funktionen vonQThread
zu verhindern, da wir den Lebenszyklus des Threads intern steuern möchten. - In der Funktion
Thread::run()
führen wir die Ereignisschleife aus, indem wir die standardmäßigeQThread::run()
-Implementierung aufrufen und die Worker-Instanz direkt nach dem Beenden der Ereignisschleife zerstören. Beachten Sie, dass der Destruktor des Workers im selben Thread ausgeführt wird. -
LogService::logEvent()
ist die Proxy-Funktion (Signal), die das Protokollierungsereignis an die Ereigniswarteschlange des Threads sendet.
Anhalten und Fortsetzen der Threads
Eine weitere interessante Möglichkeit besteht darin, unsere benutzerdefinierten Threads auszusetzen und fortzusetzen. Stellen Sie sich vor, Ihre Anwendung führt eine Verarbeitung aus, die angehalten werden muss, wenn die Anwendung minimiert oder gesperrt wird oder einfach die Netzwerkverbindung verloren hat. Dies kann erreicht werden, indem eine benutzerdefinierte asynchrone Warteschlange erstellt wird, die alle ausstehenden Anforderungen enthält, bis der Worker fortgesetzt wird. Da wir jedoch nach einfachsten Lösungen suchen, werden wir (erneut) die Warteschlange der Ereignisschleife für denselben Zweck verwenden.
Um einen Thread zu suspendieren, müssen wir ihn eindeutig auf eine bestimmte Wartebedingung warten lassen. Wenn der Thread auf diese Weise blockiert wird, verarbeitet seine Ereignisschleife keine Ereignisse und Qt muss keep in die Warteschlange stellen. Nach der Wiederaufnahme verarbeitet die Ereignisschleife alle angesammelten Anforderungen. Für die Wartebedingung verwenden wir einfach das QWaitCondition
Objekt, das auch ein QMutex
erfordert. Um eine generische Lösung zu entwerfen, die von jedem Arbeiter wiederverwendet werden kann, müssen wir die gesamte Suspend/Resume-Logik in eine wiederverwendbare Basisklasse einfügen. Nennen wir es SuspendableWorker
. Eine solche Klasse soll zwei Methoden unterstützen:
-
suspend()
wäre ein blockierender Aufruf, der den wartenden Thread in eine Wartebedingung versetzt. Dies würde dadurch erfolgen, dass eine Aussetzungsanforderung in die Warteschlange gestellt und gewartet wird, bis sie bearbeitet wird. Ziemlich ähnlich wieQThread::quit()
+wait()
. -
resume()
würde die Wartebedingung signalisieren, den schlafenden Thread aufzuwecken, um seine Ausführung fortzusetzen.
Lassen Sie uns die Schnittstelle und Implementierung überprüfen:
// interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
// implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }
Denken Sie daran, dass ein suspendierter Thread niemals ein quit
Ereignis erhält. Aus diesem Grund können wir dies nicht sicher mit Vanilla QThread
verwenden, es sei denn, wir setzen den Thread fort, bevor wir ihn beenden. Lassen Sie uns dies in unsere benutzerdefinierte Thread<T>
-Vorlage integrieren, um es kugelsicher zu machen.
template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };
Mit diesen Änderungen werden wir den Thread fortsetzen, bevor wir das Quit-Ereignis posten. Außerdem lässt Thread<TWorker>
weiterhin zu, dass jede Art von Worker übergeben wird, unabhängig davon, ob es sich um einen SuspendableWorker
oder nicht.
Die Verwendung wäre wie folgt:
LogService logService; logService.logEvent("processed event"); logService.suspend(); logService.logEvent("queued event"); logService.resume(); // "queued event" is now processed.
flüchtig vs atomar
Dies ist ein häufig missverstandenes Thema. Die meisten Leute glauben, dass volatile
Variablen verwendet werden können, um bestimmte Flags zu bedienen, auf die von mehreren Threads zugegriffen wird, und dass dies vor Data Race-Bedingungen schützt. Das ist falsch, und für diesen Zweck müssen QAtomic*
-Klassen (oder std::atomic
) verwendet werden.
Betrachten wir ein realistisches Beispiel: eine TcpConnection
-Verbindungsklasse, die in einem dedizierten Thread arbeitet, und wir möchten, dass diese Klasse eine Thread-sichere Methode exportiert: bool isConnected()
. Intern überwacht die Klasse Socket-Ereignisse: connected
und disconnected
, um ein internes boolesches Flag beizubehalten:
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }
_connected
Member volatile
zu machen, wird das Problem nicht lösen und isConnected()
nicht Thread-sicher machen. Diese Lösung wird 99 % der Zeit funktionieren, aber die restlichen 1 % werden Ihr Leben zu einem Alptraum machen. Um dies zu beheben, müssen wir den Variablenzugriff vor mehreren Threads schützen. Verwenden wir zu diesem Zweck QReadWriteLocker
:
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }
Dies funktioniert zuverlässig, aber nicht so schnell wie die Verwendung von „lock-freien“ atomaren Operationen. Die dritte Lösung ist sowohl schnell als auch Thread-sicher (das Beispiel verwendet std::atomic
anstelle von QAtomicInt
, aber semantisch sind diese identisch):
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic<bool> _connected; }
Fazit
In diesem Artikel haben wir mehrere wichtige Bedenken hinsichtlich der gleichzeitigen Programmierung mit dem Qt-Framework erörtert und Lösungen für bestimmte Anwendungsfälle entwickelt. Wir haben viele der einfachen Themen wie die Verwendung von atomaren Primitives, Lese-Schreib-Sperren und viele andere nicht berücksichtigt, aber wenn Sie daran interessiert sind, hinterlassen Sie unten Ihren Kommentar und fragen Sie nach einem solchen Tutorial.
Wenn Sie daran interessiert sind, Qmake zu erkunden, habe ich kürzlich auch The Vital Guide to Qmake veröffentlicht. Es ist eine großartige Lektüre!