Brakujący artykuł o wielowątkowości Qt w C++

Opublikowany: 2022-03-11

Deweloperzy C++ starają się budować solidne, wielowątkowe aplikacje Qt, ale wielowątkowość nigdy nie była łatwa przy tych wszystkich warunkach wyścigu, synchronizacji i zakleszczeń i livelocks. Trzeba przyznać, że nie poddajesz się i przeszukujesz StackOverflow. Niemniej jednak wybór właściwego i działającego rozwiązania spośród kilkunastu różnych odpowiedzi jest dość nietrywialny, zwłaszcza biorąc pod uwagę, że każde rozwiązanie ma swoje wady.

Wielowątkowość to szeroko rozpowszechniony model programowania i wykonywania, który umożliwia istnienie wielu wątków w kontekście jednego procesu. Te wątki współdzielą zasoby procesu, ale mogą działać niezależnie. Wątkowy model programowania zapewnia programistom przydatną abstrakcję współbieżnego wykonywania. Wielowątkowość można również zastosować do jednego procesu, aby umożliwić równoległe wykonywanie w systemie wieloprocesorowym.

– Wikipedia

Celem tego artykułu jest zebranie podstawowej wiedzy na temat programowania współbieżnego z frameworkiem Qt, w szczególności najbardziej niezrozumiałych tematów. Oczekuje się, że czytelnik będzie miał wcześniejsze doświadczenie w Qt i C++, aby zrozumieć zawartość.

Wybór między używaniem QThreadPool i QThread

Framework Qt oferuje wiele narzędzi do wielowątkowości. Wybór odpowiedniego narzędzia może na początku być trudny, ale w rzeczywistości drzewo decyzyjne składa się tylko z dwóch opcji: albo chcesz, aby Qt zarządzał wątkami za ciebie, albo chcesz zarządzać wątkami samodzielnie. Istnieją jednak inne ważne kryteria:

  1. Zadania, które nie wymagają pętli zdarzeń. W szczególności zadania, które nie używają mechanizmu sygnału/szczeliny podczas wykonywania zadania.
    Użyj: QtConcurrent i QThreadPool + QRunnable.

  2. Zadania, które wykorzystują sygnał/sloty i dlatego wymagają pętli zdarzeń.
    Użycie: Obiekty typu Worker zostały przeniesione do + QThread.

Ogromna elastyczność frameworka Qt pozwala obejść problem „brakującej pętli zdarzeń” i dodać ją do QRunnable :

 class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }

Staraj się jednak unikać takich „obejścia”, ponieważ są one niebezpieczne i mało wydajne: jeśli jeden z wątków z puli wątków (z uruchomionym MyTask) jest zablokowany z powodu oczekiwania na sygnał, nie może wykonać innych zadań z puli.

tekst alternatywny

Możesz również uruchomić QThread bez żadnej pętli zdarzeń, nadpisując QThread::run() i jest to całkowicie w porządku, o ile wiesz, co robisz. Na przykład nie oczekuj, że w takim przypadku zadziała metoda quit() .

Uruchamianie jednej instancji zadania na raz

Wyobraź sobie, że musisz upewnić się, że tylko jedna instancja zadania może być wykonywana w danym momencie, a wszystkie oczekujące żądania uruchomienia tego samego zadania czekają w określonej kolejce. Jest to często potrzebne, gdy zadanie uzyskuje dostęp do wyłącznego zasobu, takiego jak zapis do tego samego pliku lub wysyłanie pakietów za pomocą gniazda TCP.

Zapomnijmy na chwilę o informatyce i wzorcu producent-konsument i zastanówmy się nad czymś trywialnym; coś, co można łatwo znaleźć w prawdziwych projektach.

Naiwnym rozwiązaniem tego problemu może być użycie QMutex . Wewnątrz funkcji zadania można po prostu uzyskać muteks skutecznie serializujący wszystkie wątki próbujące uruchomić zadanie. To gwarantowałoby, że tylko jeden wątek na raz może uruchamiać funkcję. Jednak to rozwiązanie wpływa na wydajność, wprowadzając problem z dużą rywalizacją , ponieważ wszystkie te wątki zostaną zablokowane (w muteksie), zanim będą mogły kontynuować. Jeśli masz wiele wątków aktywnie korzystających z takiego zadania i wykonujących jakąś użyteczną pracę w międzyczasie, wszystkie te wątki będą po prostu uśpione przez większość czasu.

 void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }

Aby uniknąć rywalizacji, potrzebujemy kolejki i pracownika, który żyje we własnym wątku i przetwarza kolejkę. Jest to w zasadzie klasyczny wzorzec producent-konsument . Pracownik ( konsument ) będzie pobierał żądania z kolejki jeden po drugim, a każdy producent może po prostu dodać swoje żądania do kolejki. Na początku brzmi prosto i możesz pomyśleć o użyciu QQueue i QWaitCondition , ale poczekaj i zobaczmy, czy możemy osiągnąć cel bez tych prymitywów:

  • Możemy użyć QThreadPool , ponieważ ma kolejkę oczekujących zadań

Lub

  • Możemy użyć domyślnej QThread::run() , ponieważ ma ona QEventLoop

Pierwsza opcja to użycie QThreadPool . Możemy stworzyć instancję QThreadPool i użyć QThreadPool::setMaxThreadCount(1) . Następnie możemy użyć QtConcurrent::run() do planowania żądań:

 class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };

To rozwiązanie ma jedną zaletę: QThreadPool::clear() umożliwia natychmiastowe anulowanie wszystkich oczekujących żądań, na przykład, gdy aplikacja musi zostać szybko zamknięta. Jednak istnieje również istotna wada związana z powinowactwem wątków: funkcja logEventCore będzie prawdopodobnie wykonywana w różnych wątkach od wywołania do wywołania. I wiemy, że Qt ma kilka klas, które wymagają powinowactwa wątkowego: QTimer , QTcpSocket i być może kilka innych.

Co specyfikacja Qt mówi o powinowactwie wątku: liczniki uruchomione w jednym wątku, nie mogą zostać zatrzymane z innego wątku. I tylko wątek posiadający instancję gniazda może używać tego gniazda. Oznacza to, że musisz zatrzymać wszystkie działające czasomierze w wątku, który je uruchomił, i wywołać QTcpSocket::close() w wątku będącym właścicielem gniazda. Oba przykłady są zwykle wykonywane w destruktorach.

Lepsze rozwiązanie opiera się na wykorzystaniu QEventLoop dostarczanego przez QThread . Pomysł jest prosty: używamy mechanizmu sygnału/slotu do wysyłania żądań, a pętla zdarzeń działająca wewnątrz wątku będzie służyć jako kolejka umożliwiająca wykonanie tylko jednego slotu na raz.

 // the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };

Implementacja konstruktora LogWorker i logEvent jest prosta i dlatego nie została tutaj przedstawiona. Teraz potrzebujemy usługi, która będzie zarządzać wątkami i instancją roboczą:

 // interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent("event"); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); } 

tekst alternatywny

Omówmy, jak działa ten kod:

  • W konstruktorze tworzymy wątek i instancję pracownika. Zauważ, że pracownik nie otrzyma rodzica, ponieważ zostanie przeniesiony do nowego wątku. Z tego powodu Qt nie będzie w stanie zwolnić pamięci pracownika automatycznie i dlatego musimy to zrobić łącząc sygnał QThread::finished ze slotem deleteLater . Łączymy również metodę proxy LogService::logEvent() z LogWorker::logEvent() , która będzie używać trybu Qt::QueuedConnection z powodu różnych wątków.
  • W destruktorze umieszczamy zdarzenie quit w kolejce pętli zdarzeń. To zdarzenie zostanie obsłużone po obsłużeniu wszystkich innych zdarzeń. Na przykład, jeśli wykonaliśmy setki logEvent() tuż przed wywołaniem destruktora, logger obsłuży je wszystkie, zanim pobierze zdarzenie quit. Oczywiście zajmuje to trochę czasu, więc musimy wait() , aż pętla zdarzeń się zakończy. Warto wspomnieć, że wszystkie przyszłe żądania logowania wysłane po zdarzeniu quit nigdy nie zostaną przetworzone.
  • Samo rejestrowanie ( LogWorker::logEvent ) zawsze będzie wykonywane w tym samym wątku, dlatego to podejście działa dobrze w przypadku klas wymagających powinowactwa wątku . W tym samym czasie konstruktor i destruktor LogWorker są wykonywane w głównym wątku (w szczególności w wątku, w którym działa LogService ), dlatego należy bardzo uważać na kod, który tam uruchamiamy. W szczególności nie zatrzymuj liczników czasu ani nie używaj gniazd w destruktorze pracownika, chyba że możesz uruchomić destruktor w tym samym wątku!

Wykonywanie destruktora pracownika w tym samym wątku

Jeśli twój pracownik ma do czynienia z licznikami czasu lub gniazdami, musisz upewnić się, że destruktor jest wykonywany w tym samym wątku (wątku, który utworzyłeś dla pracownika i do którego go przeniosłeś). Oczywistym sposobem obsługi tego jest podklasa QThread i delete pracownika wewnątrz QThread::run() . Rozważ następujący szablon:

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };

Używając tego szablonu, redefiniujemy LogService z poprzedniego przykładu:

 // interface class LogService : public Thread<LogWorker> { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread<LogWorker>(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }

Porozmawiajmy, jak to ma działać:

  • Zrobiliśmy LogService jako obiekt QThread , ponieważ potrzebowaliśmy zaimplementować niestandardową funkcję run() . Użyliśmy prywatnej podklasy, aby uniemożliwić dostęp do funkcji QThread , ponieważ chcemy wewnętrznie kontrolować cykl życia wątku.
  • W funkcji Thread::run() uruchamiamy pętlę zdarzeń wywołując domyślną QThread::run() i niszczymy instancję procesu roboczego zaraz po zakończeniu pętli zdarzeń. Zauważ, że destruktor pracownika jest wykonywany w tym samym wątku.
  • LogService::logEvent() to funkcja proxy (sygnał), która opublikuje zdarzenie rejestrowania w kolejce zdarzeń wątku.

Wstrzymywanie i wznawianie wątków

Kolejną interesującą okazją jest możliwość zawieszenia i wznowienia naszych wątków niestandardowych. Wyobraź sobie, że Twoja aplikacja wykonuje pewne przetwarzanie, które musi zostać zawieszone, gdy aplikacja zostanie zminimalizowana, zablokowana lub po prostu utraci połączenie sieciowe. Można to osiągnąć, budując niestandardową kolejkę asynchroniczną, która będzie przechowywać wszystkie oczekujące żądania do momentu wznowienia procesu roboczego. Ponieważ jednak szukamy najłatwiejszych rozwiązań, w tym samym celu użyjemy (ponownie) kolejki zdarzeń pętli.

Aby zawiesić wątek, wyraźnie potrzebujemy, aby poczekał na określony warunek oczekiwania. Jeśli wątek jest w ten sposób zablokowany, jego pętla zdarzeń nie obsługuje żadnych zdarzeń i Qt musi umieścić keep w kolejce. Po wznowieniu pętla zdarzeń będzie przetwarzać wszystkie skumulowane żądania. Dla warunku oczekiwania po prostu używamy obiektu QWaitCondition , który również wymaga QMutex . Aby zaprojektować ogólne rozwiązanie, które może być ponownie użyte przez dowolnego pracownika, musimy umieścić całą logikę wstrzymania/wznawiania w klasie bazowej wielokrotnego użytku. Nazwijmy to SuspendableWorker . Taka klasa będzie obsługiwać dwie metody:

  • suspend() byłoby wywołaniem blokującym, które ustawia wątek oczekujący na warunek oczekiwania. Można to zrobić, umieszczając żądanie wstrzymania w kolejce i czekając, aż zostanie obsłużone. Bardzo podobny do QThread::quit() + wait() .
  • resume() zasygnalizuje warunek oczekiwania, aby obudzić uśpiony wątek i kontynuować jego wykonywanie.

Przyjrzyjmy się interfejsowi i implementacji:

 // interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
 // implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }

Pamiętaj, że zawieszony wątek nigdy nie otrzyma zdarzenia quit . Z tego powodu nie możemy bezpiecznie używać tego z waniliowym QThread , chyba że wznowimy wątek przed wysłaniem quit. Zintegrujmy to z naszym niestandardowym szablonem Thread<T> , aby był kuloodporny.

tekst alternatywny

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };

Po wprowadzeniu tych zmian wznowimy wątek przed opublikowaniem zdarzenia zamknięcia. Ponadto Thread<TWorker> nadal umożliwia przekazywanie dowolnego rodzaju procesu roboczego, niezależnie od tego, czy jest to SuspendableWorker , czy nie.

Użycie byłoby następujące:

 LogService logService; logService.logEvent("processed event"); logService.suspend(); logService.logEvent("queued event"); logService.resume(); // "queued event" is now processed.

lotny vs atomowy

To często źle rozumiany temat. Większość ludzi wierzy, że zmienne volatile mogą być używane do obsługi niektórych flag dostępnych przez wiele wątków i że chroni to przed warunkami wyścigu danych. To jest fałsz i do tego celu należy użyć klas QAtomic* (lub std::atomic ).

Rozważmy realistyczny przykład: klasę połączenia TcpConnection , która działa w dedykowanym wątku i chcemy, aby ta klasa eksportowała metodę bezpieczną dla wątków: bool isConnected() . Wewnętrznie klasa będzie nasłuchiwać zdarzeń gniazda: connected i disconnected w celu utrzymania wewnętrznej flagi logicznej:

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }

volatile elementu _connected nie rozwiąże problemu i nie sprawi, że isConnected() będzie bezpieczna wątkowo. To rozwiązanie będzie działać w 99% przypadków, ale pozostałe 1% sprawi, że Twoje życie stanie się koszmarem. Aby to naprawić, musimy zabezpieczyć dostęp do zmiennych przed wieloma wątkami. Użyjmy w tym celu QReadWriteLocker :

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }

Działa to niezawodnie, ale nie tak szybko, jak przy użyciu operacji atomowych „bez blokad”. Trzecie rozwiązanie jest zarówno szybkie, jak i bezpieczne wątkowo (przykład używa std::atomic zamiast QAtomicInt , ale semantycznie są one identyczne):

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic<bool> _connected; }

Wniosek

W tym artykule omówiliśmy kilka ważnych problemów związanych z programowaniem współbieżnym z frameworkiem Qt i zaprojektowanymi rozwiązaniami, aby rozwiązać określone przypadki użycia. Nie rozważaliśmy wielu prostych tematów, takich jak użycie atomowych prymitywów, blokady odczytu i zapisu i wiele innych, ale jeśli jesteś tym zainteresowany, zostaw swój komentarz poniżej i poproś o taki samouczek.

Jeśli jesteś zainteresowany odkrywaniem Qmake, niedawno opublikowałem także The Vital Guide to Qmake. To świetna lektura!