L'articolo mancante sul multithreading Qt in C++

Pubblicato: 2022-03-11

Gli sviluppatori C++ si sforzano di creare robuste applicazioni Qt multithread, ma il multithreading non è mai stato facile con tutte quelle race condition, sincronizzazione, deadlock e livelock. A tuo merito, non ti arrendi e ti ritrovi a perlustrare StackOverflow. Tuttavia, scegliere la soluzione giusta e funzionante da una dozzina di risposte diverse non è abbastanza banale, soprattutto perché ogni soluzione ha i suoi svantaggi.

Il multithreading è un modello di programmazione ed esecuzione diffuso che consente l'esistenza di più thread nel contesto di un processo. Questi thread condividono le risorse del processo ma sono in grado di essere eseguiti in modo indipendente. Il modello di programmazione threaded fornisce agli sviluppatori un'utile astrazione dell'esecuzione simultanea. Il multithreading può essere applicato anche a un processo per consentire l'esecuzione parallela su un sistema multiprocessing.

– Wikipedia

L'obiettivo di questo articolo è aggregare le conoscenze essenziali sulla programmazione simultanea con il framework Qt, in particolare gli argomenti più fraintesi. Un lettore dovrebbe avere precedenti conoscenze in Qt e C++ per comprendere il contenuto.

Scelta tra l'utilizzo QThreadPool e QThread

Il framework Qt offre molti strumenti per il multithreading. Scegliere lo strumento giusto all'inizio può essere difficile, ma in realtà l'albero decisionale consiste in due sole opzioni: vuoi che Qt gestisca i thread per te o vuoi gestirli da solo. Tuttavia, ci sono altri criteri importanti:

  1. Attività che non richiedono il ciclo di eventi. In particolare, le attività che non utilizzano il meccanismo segnale/slot durante l'esecuzione dell'attività.
    Usa: QtConcurrent e QThreadPool + QRunnable.

  2. Attività che utilizzano segnali/slot e quindi necessitano del loop di eventi.
    Usa: oggetti di lavoro spostati su + QThread.

La grande flessibilità del framework Qt consente di aggirare il problema del "mancante evento loop" e di aggiungerne uno a QRunnable :

 class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }

Cerca di evitare tali "soluzioni alternative", tuttavia, perché sono pericolose e non efficienti: se uno dei thread del pool di thread (in esecuzione MyTask) è bloccato a causa dell'attesa di un segnale, non può eseguire altre attività dal pool.

testo alternativo

Puoi anche eseguire un QThread senza alcun ciclo di eventi sovrascrivendo QThread::run() e questo va benissimo fintanto che sai cosa stai facendo. Ad esempio, non aspettarti che il metodo quit() funzioni in questo caso.

Esecuzione di un'istanza di attività alla volta

Immagina di dover garantire che sia possibile eseguire solo un'istanza di attività alla volta e che tutte le richieste in sospeso per eseguire la stessa attività siano in attesa su una determinata coda. Questo è spesso necessario quando un'attività accede a una risorsa esclusiva, come scrivere sullo stesso file o inviare pacchetti utilizzando il socket TCP.

Dimentichiamoci per un momento dell'informatica e del modello produttore-consumatore e consideriamo qualcosa di banale; qualcosa che può essere facilmente trovato in progetti reali.

Una soluzione ingenua a questo problema potrebbe essere l'utilizzo di un QMutex . All'interno della funzione attività, puoi semplicemente acquisire il mutex serializzando efficacemente tutti i thread che tentano di eseguire l'attività. Ciò garantirebbe che solo un thread alla volta possa eseguire la funzione. Tuttavia, questa soluzione influisce sulle prestazioni introducendo un problema di contesa elevata perché tutti quei thread verrebbero bloccati (sul mutex) prima che possano procedere. Se hai molti thread che utilizzano attivamente un'attività del genere e svolgono un lavoro utile nel mezzo, tutti questi thread rimarranno inattivi per la maggior parte del tempo.

 void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }

Per evitare contese, abbiamo bisogno di una coda e di un lavoratore che viva nel proprio thread ed elabori la coda. Questo è praticamente il classico modello produttore-consumatore . Il lavoratore ( consumatore ) raccoglierebbe le richieste dalla coda una per una e ogni produttore può semplicemente aggiungere le proprie richieste alla coda. All'inizio sembra semplice e potresti pensare di usare QQueue e QWaitCondition , ma aspetta e vediamo se possiamo raggiungere l'obiettivo senza queste primitive:

  • Possiamo usare QThreadPool perché ha una coda di attività in sospeso

o

  • Possiamo usare QThread::run() predefinito perché ha QEventLoop

La prima opzione è usare QThreadPool . Possiamo creare un'istanza QThreadPool e utilizzare QThreadPool::setMaxThreadCount(1) . Quindi possiamo usare QtConcurrent::run() per pianificare le richieste:

 class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };

Questa soluzione ha un vantaggio: QThreadPool::clear() consente di annullare istantaneamente tutte le richieste in sospeso, ad esempio quando l'applicazione deve essere chiusa rapidamente. Tuttavia, esiste anche uno svantaggio significativo connesso a thread-affinity : la funzione logEventCore verrà probabilmente eseguita in thread diversi da una chiamata all'altra. E sappiamo che Qt ha alcune classi che richiedono l'affinità dei thread : QTimer , QTcpSocket e forse alcune altre.

Cosa dice la specifica Qt sull'affinità del thread: i timer avviati in un thread, non possono essere fermati da un altro thread. E solo il thread che possiede un'istanza socket può utilizzare questo socket. Ciò implica che devi interrompere tutti i timer in esecuzione nel thread che li ha avviati e devi chiamare QTcpSocket::close() nel thread che possiede il socket. Entrambi gli esempi vengono generalmente eseguiti nei distruttori.

La soluzione migliore si basa sull'utilizzo di QEventLoop fornito da QThread . L'idea è semplice: utilizziamo un meccanismo segnale/slot per inviare le richieste e il loop di eventi in esecuzione all'interno del thread fungerà da coda consentendo l'esecuzione di un solo slot alla volta.

 // the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };

L'implementazione del costruttore LogWorker e logEvent è semplice e quindi non viene fornita qui. Ora abbiamo bisogno di un servizio che gestirà il thread e l'istanza di lavoro:

 // interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent("event"); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); } 

testo alternativo

Discutiamo di come funziona questo codice:

  • Nel costruttore, creiamo un thread e un'istanza di lavoro. Si noti che il lavoratore non riceve un genitore, perché verrà spostato nel nuovo thread. Per questo motivo, Qt non sarà in grado di rilasciare automaticamente la memoria del lavoratore e, pertanto, è necessario farlo collegando il segnale QThread::finished allo slot deleteLater . Colleghiamo anche il metodo proxy LogService::logEvent() a LogWorker::logEvent() che utilizzerà la modalità Qt::QueuedConnection a causa di thread diversi.
  • Nel distruttore, inseriamo l'evento quit nella coda del ciclo degli eventi. Questo evento verrà gestito dopo che tutti gli altri eventi saranno stati gestiti. Ad esempio, se abbiamo effettuato centinaia di chiamate logEvent() appena prima della chiamata al distruttore, il logger le gestirà tutte prima di recuperare l'evento quit. Questo richiede tempo, ovviamente, quindi dobbiamo wait() fino a quando il ciclo di eventi esce. Vale la pena ricordare che tutte le future richieste di registrazione pubblicate dopo l'evento di chiusura non verranno mai elaborate.
  • La registrazione stessa ( LogWorker::logEvent ) verrà sempre eseguita nello stesso thread, quindi questo approccio funziona bene per le classi che richiedono thread-affinity . Allo stesso tempo, il costruttore e il distruttore LogWorker vengono eseguiti nel thread principale (in particolare il thread in cui è in esecuzione LogService ) e, pertanto, è necessario prestare molta attenzione al codice in esecuzione lì. In particolare, non fermare i timer o utilizzare i socket nel distruttore del lavoratore a meno che tu non possa eseguire il distruttore nello stesso thread!

Esecuzione del distruttore di worker nello stesso thread

Se il tuo lavoratore ha a che fare con timer o socket, devi assicurarti che il distruttore venga eseguito nello stesso thread (il thread che hai creato per il lavoratore e dove hai spostato il lavoratore). Il modo più ovvio per supportare questo è sottoclassare QThread ed delete worker all'interno QThread::run() . Considera il seguente modello:

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };

Utilizzando questo modello, ridefiniamo LogService dall'esempio precedente:

 // interface class LogService : public Thread<LogWorker> { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread<LogWorker>(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }

Discutiamo di come dovrebbe funzionare:

  • Abbiamo reso LogService l'oggetto QThread perché dovevamo implementare la funzione run() personalizzata. Abbiamo utilizzato la sottoclasse privata per impedire l'accesso alle funzioni di QThread poiché vogliamo controllare internamente il ciclo di vita del thread.
  • Nella funzione Thread::run() eseguiamo il ciclo di eventi chiamando l'implementazione predefinita QThread::run() e distruggiamo l'istanza di lavoro subito dopo la chiusura del ciclo di eventi. Nota che il distruttore del lavoratore viene eseguito nello stesso thread.
  • LogService::logEvent() è la funzione proxy (segnale) che invierà l'evento di registrazione alla coda degli eventi del thread.

Mettere in pausa e riprendere i thread

Un'altra interessante opportunità è quella di poter sospendere e riprendere i nostri thread personalizzati. Immagina che la tua applicazione stia eseguendo un'elaborazione che deve essere sospesa quando l'applicazione viene ridotta a icona, bloccata o semplicemente ha perso la connessione di rete. Ciò può essere ottenuto creando una coda asincrona personalizzata che conterrà tutte le richieste in sospeso fino al ripristino del ruolo di lavoro. Tuttavia, poiché stiamo cercando soluzioni più semplici, utilizzeremo (di nuovo) la coda del ciclo di eventi per lo stesso scopo.

Per sospendere un thread, abbiamo chiaramente bisogno che attenda una determinata condizione di attesa. Se il thread è bloccato in questo modo, il suo ciclo di eventi non gestisce alcun evento e Qt deve mettere keep in coda. Una volta ripreso, il ciclo di eventi elaborerà tutte le richieste accumulate. Per la condizione di attesa, utilizziamo semplicemente l'oggetto QWaitCondition che richiede anche un QMutex . Per progettare una soluzione generica che possa essere riutilizzata da qualsiasi lavoratore, è necessario inserire tutta la logica di sospensione/ripresa in una classe base riutilizzabile. Chiamiamolo SuspendableWorker . Tale classe deve supportare due metodi:

  • suspend() sarebbe una chiamata di blocco che imposta il thread in attesa di una condizione di attesa. Ciò avverrebbe pubblicando una richiesta di sospensione nella coda e aspettando che venga gestita. Praticamente simile a QThread::quit() + wait() .
  • resume() segnalerebbe la condizione di attesa per riattivare il thread inattivo per continuare la sua esecuzione.

Esaminiamo l'interfaccia e l'implementazione:

 // interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
 // implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }

Ricorda che un thread sospeso non riceverà mai un evento di quit . Per questo motivo, non possiamo usarlo in sicurezza con vanilla QThread a meno che non riprendiamo il thread prima di pubblicare quit. Integriamo questo nel nostro modello Thread<T> personalizzato per renderlo a prova di proiettile.

testo alternativo

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };

Con queste modifiche, riprenderemo il thread prima di pubblicare l'evento di chiusura. Inoltre, Thread<TWorker> consente comunque il passaggio di qualsiasi tipo di lavoratore indipendentemente dal fatto che sia un SuspendableWorker o meno.

L'utilizzo sarebbe il seguente:

 LogService logService; logService.logEvent("processed event"); logService.suspend(); logService.logEvent("queued event"); logService.resume(); // "queued event" is now processed.

volatile vs atomico

Questo è un argomento comunemente frainteso. La maggior parte delle persone crede che le variabili volatile possano essere utilizzate per servire determinati flag a cui accedono più thread e che ciò preserva dalle condizioni della corsa dei dati. Questo è falso e le classi QAtomic* (o std::atomic ) devono essere utilizzate per questo scopo.

Consideriamo un esempio realistico: una classe di connessione TcpConnection che funziona in un thread dedicato e vogliamo che questa classe esporti un metodo thread-safe: bool isConnected() . Internamente, la classe ascolterà gli eventi socket: connected e disconnected per mantenere un flag booleano interno:

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }

Rendere volatile il membro _connected non risolverà il problema e non renderà isConnected() thread-safe. Questa soluzione funzionerà il 99% delle volte, ma il restante 1% renderà la tua vita un incubo. Per risolvere questo problema, dobbiamo proteggere l'accesso alle variabili da più thread. Usiamo QReadWriteLocker per questo scopo:

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }

Funziona in modo affidabile, ma non veloce come l'utilizzo di operazioni atomiche "senza blocco". La terza soluzione è sia veloce che thread-safe (l'esempio utilizza std::atomic invece di QAtomicInt , ma semanticamente questi sono identici):

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic<bool> _connected; }

Conclusione

In questo articolo, abbiamo discusso diverse questioni importanti sulla programmazione simultanea con il framework Qt e abbiamo progettato soluzioni per affrontare casi d'uso specifici. Non abbiamo considerato molti degli argomenti semplici come l'uso di primitive atomiche, i blocchi di lettura-scrittura e molti altri, ma se sei interessato a questi, lascia il tuo commento qui sotto e chiedi un tutorial del genere.

Se sei interessato ad esplorare Qmake, ho anche pubblicato di recente The Vital Guide to Qmake. È un'ottima lettura!