Articolul lipsă despre Qt Multithreading în C++

Publicat: 2022-03-11

Dezvoltatorii C++ se străduiesc să creeze aplicații Qt multithreaded robuste, dar multithreading nu a fost niciodată ușor cu toate acele condiții de cursă, sincronizare și blocaje și blocaje. Spre meritul tău, nu te dai bătut și te trezești cufundat în StackOverflow. Cu toate acestea, alegerea soluției corecte și funcționale dintr-o duzină de răspunsuri diferite este destul de netrivială, mai ales având în vedere că fiecare soluție are propriile dezavantaje.

Multithreading este un model de programare și execuție larg răspândit care permite existența mai multor fire în contextul unui proces. Aceste fire de execuție partajează resursele procesului, dar sunt capabile să se execute independent. Modelul de programare threaded oferă dezvoltatorilor o abstractizare utilă a execuției concurente. Multithreading poate fi, de asemenea, aplicat unui proces pentru a permite execuția paralelă pe un sistem multiprocesare.

– Wikipedia

Scopul acestui articol este de a agrega cunoștințele esențiale despre programarea concomitentă cu cadrul Qt, în special cele mai greșit înțelese subiecte. Se așteaptă ca un cititor să aibă cunoștințe anterioare în Qt și C++ pentru a înțelege conținutul.

Alegerea între utilizarea QThreadPool și QThread

Cadrul Qt oferă multe instrumente pentru multithreading. Alegerea instrumentului potrivit poate fi o provocare la început, dar, de fapt, arborele de decizie constă doar din două opțiuni: fie doriți ca Qt să gestioneze firele pentru dvs., fie doriți să gestionați firele singur. Cu toate acestea, există și alte criterii importante:

  1. Sarcini care nu au nevoie de bucla de evenimente. Mai exact, sarcinile care nu folosesc mecanismul semnal/slot în timpul execuției sarcinii.
    Utilizare: QtConcurrent și QThreadPool + QRunnable.

  2. Sarcini care folosesc semnal/slot și, prin urmare, au nevoie de bucla de evenimente.
    Utilizare: obiectele de lucru mutate în + QThread.

Marea flexibilitate a cadrului Qt vă permite să rezolvați problema „bucla de evenimente lipsă” și să adăugați una la QRunnable :

 class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }

Încercați totuși să evitați astfel de „soluții de soluționare”, deoarece acestea sunt periculoase și nu sunt eficiente: dacă unul dintre firele de execuție din pool-ul de fire (care rulează MyTask) este blocat din cauza așteptării unui semnal, atunci nu poate executa alte sarcini din pool.

text alternativ

De asemenea, puteți rula un QThread fără nicio buclă de evenimente, suprascriind QThread::run() și acest lucru este perfect atâta timp cât știți ce faceți. De exemplu, nu vă așteptați ca metoda quit() să funcționeze în acest caz.

Rularea câte o instanță de activitate la un moment dat

Imaginați-vă că trebuie să vă asigurați că poate fi executată o singură instanță de activitate la un moment dat și că toate solicitările în așteptare pentru a rula aceeași sarcină așteaptă într-o anumită coadă. Acest lucru este adesea necesar atunci când o sarcină accesează o resursă exclusivă, cum ar fi scrierea în același fișier sau trimiterea de pachete folosind soclul TCP.

Să uităm pentru o clipă de informatică și de modelul producător-consumator și să luăm în considerare ceva banal; ceva ce poate fi găsit cu ușurință în proiecte reale.

O soluție naivă la această problemă ar putea fi utilizarea unui QMutex . În interiorul funcției de sarcină, puteți pur și simplu să achiziționați mutex-ul care serializează efectiv toate firele care încearcă să ruleze sarcina. Acest lucru ar garanta că numai un fir la un moment dat ar putea rula funcția. Cu toate acestea, această soluție are un impact asupra performanței prin introducerea unei probleme de conflict ridicat , deoarece toate acele fire ar fi blocate (pe mutex) înainte de a putea continua. Dacă aveți multe fire care folosesc în mod activ o astfel de sarcină și fac o treabă utilă între ele, atunci toate aceste fire vor fi doar adormite de cele mai multe ori.

 void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }

Pentru a evita disputele, avem nevoie de o coadă și de un lucrător care trăiește în propriul fir și care procesează coada. Acesta este aproape modelul clasic producător-consumator . Lucrătorul ( consumatorul ) ar alege cererile din coadă una câte una, iar fiecare producător poate pur și simplu să-și adauge cererile în coadă. Sună simplu la început și s-ar putea să vă gândiți să utilizați QQueue și QWaitCondition , dar țineți-vă și să vedem dacă putem atinge obiectivul fără aceste primitive:

  • Putem folosi QThreadPool deoarece are o coadă de sarcini în așteptare

Sau

  • Putem folosi implicit QThread::run() deoarece are QEventLoop

Prima opțiune este să utilizați QThreadPool . Putem crea o instanță QThreadPool și putem folosi QThreadPool::setMaxThreadCount(1) . Apoi putem folosi QtConcurrent::run() pentru a programa cereri:

 class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };

Această soluție are un singur beneficiu: QThreadPool::clear() vă permite să anulați instantaneu toate solicitările în așteptare, de exemplu atunci când aplicația dvs. trebuie să se închidă rapid. Cu toate acestea, există și un dezavantaj semnificativ care este conectat la afinitatea firului de execuție : funcția logEventCore se va executa probabil în fire diferite de la apel la apel. Și știm că Qt are câteva clase care necesită afinitate fir : QTimer , QTcpSocket și, eventual, altele.

Ce spune Qt spec despre afinitatea firului de execuție: temporizatoarele au început într-un fir, nu pot fi oprite dintr-un alt fir. Și numai firul care deține o instanță de socket poate folosi acest socket. Acest lucru implică faptul că trebuie să opriți orice temporizatoare care rulează în firul care le-a pornit și trebuie să apelați QTcpSocket::close() în firul care deține socket-ul. Ambele exemple sunt de obicei executate în destructori.

Soluția mai bună se bazează pe utilizarea QEventLoop furnizată de QThread . Ideea este simplă: folosim un mecanism semnal/slot pentru a emite cereri, iar bucla de evenimente care rulează în firul de execuție va servi ca o coadă, permițând executarea unui singur slot la un moment dat.

 // the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };

Implementarea constructorului LogWorker și a logEvent este simplă și, prin urmare, nu este furnizată aici. Acum avem nevoie de un serviciu care va gestiona firul și instanța de lucru:

 // interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent("event"); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); } 

text alternativ

Să discutăm cum funcționează acest cod:

  • În constructor, creăm un fir și o instanță de lucru. Observați că lucrătorul nu primește un părinte, deoarece acesta va fi mutat în noul thread. Din această cauză, Qt nu va putea elibera automat memoria lucrătorului și, prin urmare, trebuie să facem acest lucru conectând semnalul QThread::finished la slotul deleteLater . De asemenea, conectăm metoda proxy LogService::logEvent() la LogWorker::logEvent() care va folosi modul Qt::QueuedConnection din cauza diferitelor fire.
  • În destructor, punem evenimentul quit în coada buclei de evenimente. Acest eveniment va fi gestionat după ce toate celelalte evenimente vor fi gestionate. De exemplu, dacă am făcut sute de logEvent() chiar înainte de apelul destructorului, loggerul le va gestiona pe toate înainte de a prelua evenimentul quit. Acest lucru necesită timp, desigur, așa că trebuie să wait() până la ieșirea buclei de evenimente. Merită menționat faptul că toate cererile viitoare de înregistrare postate după evenimentul de ieșire nu vor fi niciodată procesate.
  • Înregistrarea în sine ( LogWorker::logEvent ) se va face întotdeauna în același fir, prin urmare, această abordare funcționează bine pentru clasele care necesită afinitate pentru fire . În același timp, constructorul și destructorul LogWorker sunt executate în firul principal (în special firul în care rulează LogService ) și, prin urmare, trebuie să fiți foarte atenți la ce cod rulați acolo. Mai exact, nu opriți cronometrele și nu folosiți prize în destructorul lucrătorului decât dacă ați putea rula destructorul în același fir!

Executarea destructorului muncitorului în același thread

Dacă lucrătorul dvs. are de-a face cu cronometre sau prize, trebuie să vă asigurați că destructorul este executat în același fir (firul pe care l-ați creat pentru lucrător și unde l-ați mutat pe lucrător). Modul evident de a susține acest lucru este de a subclasa QThread și de a delete lucrătorul din cadrul QThread::run() . Luați în considerare următorul șablon:

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };

Folosind acest șablon, redefinim LogService din exemplul anterior:

 // interface class LogService : public Thread<LogWorker> { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread<LogWorker>(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }

Să discutăm cum ar trebui să funcționeze:

  • Am făcut ca LogService să fie obiectul QThread , deoarece trebuia să implementăm funcția personalizată run() . Am folosit subclasarea privată pentru a preveni accesarea funcțiilor lui QThread , deoarece dorim să controlăm ciclul de viață al firului de execuție în interior.
  • În funcția Thread::run() rulăm bucla de evenimente apelând implementarea implicită QThread::run() și distrugem instanța de lucru imediat după ce bucla de evenimente a ieșit. Rețineți că destructorul lucrătorului este executat în același fir.
  • LogService::logEvent() este funcția proxy (semnal) care va posta evenimentul de înregistrare în coada de evenimente a firului de execuție.

Întreruperea și reluarea firelor

O altă oportunitate interesantă este să putem suspenda și relua firele noastre personalizate. Imaginați-vă că aplicația dvs. efectuează o procesare care trebuie suspendată atunci când aplicația este minimizată, blocată sau pur și simplu a pierdut conexiunea la rețea. Acest lucru poate fi realizat prin construirea unei cozi asincrone personalizate care va reține toate cererile în așteptare până când lucrătorul este reluat. Cu toate acestea, deoarece căutăm cele mai simple soluții, vom folosi (din nou) coada buclei de evenimente în același scop.

Pentru a suspenda un fir, avem nevoie de el pentru a aștepta într-o anumită condiție de așteptare. Dacă firul de execuție este blocat în acest fel, bucla sa de evenimente nu gestionează niciun eveniment și Qt trebuie să pună keep în coadă. Odată reluată, bucla de evenimente va procesa toate cererile acumulate. Pentru condiția de așteptare, pur și simplu folosim obiectul QWaitCondition care necesită și un QMutex . Pentru a proiecta o soluție generică care ar putea fi reutilizată de orice lucrător, trebuie să punem toată logica de suspendare/reluare într-o clasă de bază reutilizabilă. Să-i spunem SuspendableWorker . O astfel de clasă va suporta două metode:

  • suspend() ar fi un apel de blocare care setează firul de execuție într-o condiție de așteptare. Acest lucru se va face prin postarea unei cereri de suspendare în coadă și așteptarea până când este tratată. Cam asemănător cu QThread::quit() + wait() .
  • resume() ar semnala condiția de așteptare pentru a trezi firul în stare de adormire pentru a-și continua execuția.

Să revizuim interfața și implementarea:

 // interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
 // implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }

Amintiți-vă că un thread suspendat nu va primi niciodată un eveniment de quit . Din acest motiv, nu putem folosi acest lucru în siguranță cu vanilla QThread decât dacă reluăm firul înainte de a posta închidere. Să integrăm acest lucru în șablonul nostru personalizat Thread<T> pentru a-l face antiglonț.

text alternativ

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };

Cu aceste modificări, vom relua firul înainte de a posta evenimentul de ieșire. De asemenea, Thread<TWorker> permite în continuare trecerea oricărui tip de lucrător, indiferent dacă este un SuspendableWorker sau nu.

Utilizarea ar fi după cum urmează:

 LogService logService; logService.logEvent("processed event"); logService.suspend(); logService.logEvent("queued event"); logService.resume(); // "queued event" is now processed.

volatil vs atomic

Acesta este un subiect de obicei greșit înțeles. Majoritatea oamenilor cred că variabilele volatile pot fi folosite pentru a servi anumite steaguri accesate de mai multe fire și că acest lucru protejează condițiile de cursă a datelor. Acest lucru este fals, iar clasele QAtomic* (sau std::atomic ) trebuie folosite în acest scop.

Să luăm în considerare un exemplu realist: o clasă de conexiune TcpConnection care funcționează într-un fir dedicat și dorim ca această clasă să exporte o metodă sigură pentru fire: bool isConnected() . Intern, clasa va asculta evenimentele socket: connected și disconnected pentru a menține un semnal boolean intern:

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }

A face volatile membru _connected nu va rezolva problema și nu va face isConnected() sigur pentru fire. Această soluție va funcționa în 99% din timp, dar restul de 1% va face din viața ta un coșmar. Pentru a remedia acest lucru, trebuie să protejăm accesul variabil de mai multe fire. Să folosim QReadWriteLocker în acest scop:

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }

Acest lucru funcționează în mod fiabil, dar nu la fel de rapid ca utilizarea operațiunilor atomice „fără blocare”. A treia soluție este atât rapidă, cât și sigură pentru fire (exemplul folosește std::atomic în loc de QAtomicInt , dar din punct de vedere semantic acestea sunt identice):

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic<bool> _connected; }

Concluzie

În acest articol, am discutat câteva preocupări importante legate de programarea concomitentă cu cadrul Qt și am conceput soluții pentru a aborda cazuri de utilizare specifice. Nu am luat în considerare multe dintre subiectele simple, cum ar fi utilizarea primitivelor atomice, încuietori de citire-scriere și multe altele, dar dacă sunteți interesat de acestea, lăsați comentariul dvs. mai jos și cereți un astfel de tutorial.

Dacă sunteți interesat să explorați Qmake, am publicat recent și The Vital Guide to Qmake. Este o lectura grozava!