Недостающая статья о многопоточности Qt в C++
Опубликовано: 2022-03-11Разработчики C++ стремятся создавать надежные многопоточные приложения Qt, но многопоточность никогда не была легкой со всеми этими условиями гонки, синхронизацией, взаимоблокировками и активными блокировками. К вашей чести, вы не сдаетесь и рыщете в StackOverflow. Тем не менее, выбрать правильное и работающее решение из дюжины разных ответов довольно нетривиально, особенно с учетом того, что у каждого решения есть свои недостатки.
Многопоточность — это широко распространенная модель программирования и выполнения, позволяющая существовать нескольким потокам в контексте одного процесса. Эти потоки совместно используют ресурсы процесса, но могут выполняться независимо. Модель многопоточного программирования предоставляет разработчикам полезную абстракцию параллельного выполнения. Многопоточность также может быть применена к одному процессу, чтобы обеспечить параллельное выполнение в многопроцессорной системе.
- Википедия
Цель этой статьи — обобщить основные знания о параллельном программировании с помощью среды Qt, в частности, о самых непонятых темах. Ожидается, что читатель имеет опыт работы с Qt и C++, чтобы понять содержание.
Выбор между использованием QThreadPool
и QThread
Платформа Qt предлагает множество инструментов для многопоточности. Поначалу выбор правильного инструмента может быть сложным, но на самом деле дерево решений состоит всего из двух вариантов: либо вы хотите, чтобы Qt управляла потоками за вас, либо вы хотите управлять потоками самостоятельно. Однако есть и другие важные критерии:
Задачи, которым не нужен цикл событий. В частности, задачи, которые не используют механизм сигнала/слота во время выполнения задачи.
Использование: QtConcurrent и QThreadPool + QRunnable.Задачи, которые используют сигнал/слоты и поэтому нуждаются в цикле событий.
Использование: рабочие объекты перемещены в + QThread.
Большая гибкость фреймворка Qt позволяет обойти проблему «отсутствующего цикла событий» и добавить его в QRunnable
:
class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }
Однако старайтесь избегать таких «обходных путей», потому что они опасны и неэффективны: если один из потоков из пула потоков (работающий MyTask) заблокирован из-за ожидания сигнала, то он не может выполнять другие задачи из пула.
Вы также можете запустить QThread
без какого-либо цикла событий, переопределив QThread::run()
, и это прекрасно, если вы знаете, что делаете. Например, не ожидайте, что метод quit()
сработает в таком случае.
Запуск одного экземпляра задачи за раз
Представьте, что вам нужно гарантировать, что одновременно может выполняться только один экземпляр задачи, а все ожидающие запросы на выполнение одной и той же задачи ожидают в определенной очереди. Это часто необходимо, когда задача обращается к эксклюзивному ресурсу, например, записывает в тот же файл или отправляет пакеты с использованием сокета TCP.
Давайте на мгновение забудем о компьютерных науках и модели производитель-потребитель и рассмотрим что-то тривиальное; то, что можно легко найти в реальных проектах.
Наивным решением этой проблемы может быть использование QMutex
. Внутри функции задачи вы можете просто получить мьютекс, эффективно сериализующий все потоки, пытающиеся выполнить задачу. Это гарантировало бы, что только один поток одновременно может выполнять функцию. Однако это решение влияет на производительность, создавая проблему высокой конкуренции , поскольку все эти потоки будут заблокированы (в мьютексе), прежде чем они смогут продолжить работу. Если у вас есть много потоков, активно использующих такую задачу и выполняющих какую-то полезную работу между ними, то все эти потоки большую часть времени будут просто спать.
void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }
Чтобы избежать конфликтов, нам нужна очередь и рабочий процесс, который живет в своем собственном потоке и обрабатывает очередь. Это в значительной степени классический образец производитель-потребитель . Рабочий процесс ( потребитель ) будет выбирать запросы из очереди один за другим, и каждый производитель может просто добавлять свои запросы в очередь. На первый взгляд звучит просто, и вы можете подумать об использовании QQueue
и QWaitCondition
, но подождите и давайте посмотрим, сможем ли мы достичь цели без этих примитивов:
- Мы можем использовать
QThreadPool
, так как у него есть очередь отложенных задач.
Или
- Мы можем использовать
QThread::run()
по умолчанию, потому что он имеетQEventLoop
Первый вариант — использовать QThreadPool
. Мы можем создать экземпляр QThreadPool
и использовать QThreadPool::setMaxThreadCount(1)
. Затем мы можем использовать QtConcurrent::run()
для планирования запросов:
class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };
У этого решения есть одно преимущество: QThreadPool::clear()
позволяет вам мгновенно отменить все ожидающие запросы, например, когда ваше приложение должно быстро закрыться. Однако есть и существенный недостаток, связанный с привязкой к потоку : функция logEventCore
, скорее всего, будет выполняться в разных потоках от вызова к вызову. И мы знаем, что в Qt есть классы, требующие привязки к потоку : QTimer
, QTcpSocket
и, возможно, некоторые другие.
Что спецификация Qt говорит о сходстве потоков: таймеры, запущенные в одном потоке, не могут быть остановлены из другого потока. И только поток, владеющий экземпляром сокета, может использовать этот сокет. Это означает, что вы должны остановить все работающие таймеры в потоке, который их запустил, и вы должны вызвать QTcpSocket::close() в потоке, владеющем сокетом. Оба примера обычно выполняются в деструкторах.
Лучшее решение основано на использовании QEventLoop
, предоставляемого QThread
. Идея проста: мы используем механизм сигнал/слот для выдачи запросов, а цикл событий, работающий внутри потока, будет служить очередью, позволяющей выполнять только один слот за раз.
// the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };
Реализация конструктора LogWorker
и logEvent
проста и поэтому здесь не приводится. Теперь нам нужен сервис, который будет управлять потоком и рабочим экземпляром:
// interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent("event"); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); }
Давайте обсудим, как работает этот код:
- В конструкторе мы создаем поток и рабочий экземпляр. Обратите внимание, что рабочий процесс не получает родителя, потому что он будет перемещен в новый поток. Из-за этого Qt не сможет автоматически освобождать рабочую память, а значит, нам нужно сделать это, подключив сигнал
QThread::finished
кdeleteLater
. Мы также подключаем прокси-методLogService::logEvent()
кLogWorker::logEvent()
, который будет использовать режимQt::QueuedConnection
из-за разных потоков. - В деструкторе мы помещаем событие
quit
в очередь цикла событий. Это событие будет обработано после обработки всех остальных событий. Например, если мы сделали сотниlogEvent()
непосредственно перед вызовом деструктора, регистратор обработает их все до того, как получит событие quit. Конечно, это требует времени, поэтому мы должныwait()
, пока цикл обработки событий не завершится. Стоит отметить, что все будущие запросы на ведение журнала, отправленные после события выхода, никогда не будут обрабатываться. - Само ведение журнала (
LogWorker::logEvent
) всегда будет выполняться в одном и том же потоке, поэтому этот подход хорошо работает для классов, требующих привязки к потоку . В то же время конструктор и деструкторLogWorker
выполняются в основном потоке (конкретно в потоке, в котором выполняетсяLogService
), и поэтому вам нужно очень внимательно относиться к тому, какой код вы там запускаете. В частности, не останавливайте таймеры и не используйте сокеты в деструкторе рабочего процесса, если вы не можете запустить деструктор в том же потоке!
Выполнение деструктора рабочего в том же потоке
Если ваш воркер имеет дело с таймерами или сокетами, вам необходимо убедиться, что деструктор выполняется в том же потоке (потоке, который вы создали для воркера и куда вы его переместили). Очевидным способом поддержки этого является создание подкласса QThread
и delete
рабочего процесса внутри QThread::run()
. Рассмотрим следующий шаблон:

template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };
Используя этот шаблон, мы переопределяем LogService
из предыдущего примера:
// interface class LogService : public Thread<LogWorker> { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread<LogWorker>(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }
Давайте обсудим, как это должно работать:
- Мы сделали
LogService
объектомQThread
, потому что нам нужно было реализовать пользовательскую функциюrun()
. Мы использовали частные подклассы, чтобы предотвратить доступ кQThread
, поскольку мы хотим контролировать жизненный цикл потока внутри. - В функции
Thread::run()
мы запускаем цикл обработки событий, вызывая реализациюQThread::run()
по умолчанию, и уничтожаем рабочий экземпляр сразу после завершения цикла обработки событий. Обратите внимание, что деструктор рабочего процесса выполняется в том же потоке. -
LogService::logEvent()
— это прокси-функция (сигнал), которая отправит событие регистрации в очередь событий потока.
Приостановка и возобновление потоков
Еще одна интересная возможность — приостанавливать и возобновлять наши пользовательские темы. Представьте, что ваше приложение выполняет некоторую обработку, которую необходимо приостановить, когда приложение сворачивается, блокируется или просто теряет сетевое соединение. Этого можно добиться путем создания пользовательской асинхронной очереди, в которой будут храниться все ожидающие запросы до тех пор, пока рабочий процесс не будет возобновлен. Однако, поскольку мы ищем самые простые решения, мы будем использовать (снова) очередь цикла событий для той же цели.
Чтобы приостановить поток, нам явно нужно, чтобы он ждал определенного условия ожидания. Если поток заблокирован таким образом, его цикл обработки событий не обрабатывает никаких событий, и Qt должен поставить keep в очередь. После возобновления цикл событий будет обрабатывать все накопленные запросы. Для условия ожидания мы просто используем объект QWaitCondition
, для которого также требуется QMutex
. Чтобы спроектировать общее решение, которое может быть повторно использовано любым работником, нам нужно поместить всю логику приостановки/возобновления в повторно используемый базовый класс. Назовем его SuspendableWorker
. Такой класс должен поддерживать два метода:
-
suspend()
будет блокирующим вызовом, который устанавливает поток в состояние ожидания. Это можно сделать, отправив запрос на приостановку в очередь и дождавшись его обработки. Очень похоже наQThread::quit()
+wait()
. -
resume()
будет сигнализировать об условии ожидания, чтобы разбудить спящий поток, чтобы продолжить его выполнение.
Давайте рассмотрим интерфейс и реализацию:
// interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
// implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }
Помните, что приостановленный поток никогда не получит событие quit
. По этой причине мы не можем безопасно использовать это с vanilla QThread
, если мы не возобновим поток перед публикацией quit. Давайте интегрируем это в наш пользовательский шаблон Thread<T>
, чтобы сделать его пуленепробиваемым.
template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };
С этими изменениями мы возобновим поток перед публикацией события выхода. Кроме того, Thread<TWorker>
по-прежнему позволяет передавать любой рабочий процесс, независимо от того, является ли он SuspendableWorker
или нет.
Использование будет следующим:
LogService logService; logService.logEvent("processed event"); logService.suspend(); logService.logEvent("queued event"); logService.resume(); // "queued event" is now processed.
летучий против атомарного
Это часто неправильно понимаемая тема. Большинство людей считают, что переменные volatile
можно использовать для обслуживания определенных флагов, к которым обращаются несколько потоков, и что это защищает от условий гонки данных. Это неверно, и для этой цели должны использоваться классы QAtomic*
(или std::atomic
).
Давайте рассмотрим реалистичный пример: класс соединения TcpConnection
, который работает в выделенном потоке, и мы хотим, чтобы этот класс экспортировал потокобезопасный метод: bool isConnected()
. Внутри класс будет прослушивать события сокета: connected
disconnected
для поддержания внутреннего логического флага:
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }
Создание члена _connected
volatile
не решит проблему и не сделает isConnected()
потокобезопасным. Это решение будет работать в 99% случаев, но оставшийся 1% превратит вашу жизнь в кошмар. Чтобы исправить это, нам нужно защитить доступ к переменной от нескольких потоков. Воспользуемся для этого QReadWriteLocker
:
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }
Это работает надежно, но не так быстро, как использование атомарных операций без блокировки. Третье решение быстрое и потокобезопасное (в примере используется std::atomic
вместо QAtomicInt
, но семантически они идентичны):
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic<bool> _connected; }
Заключение
В этой статье мы обсудили несколько важных проблем, связанных с параллельным программированием с помощью среды Qt, и разработали решения для конкретных случаев использования. Мы не рассмотрели многие простые темы, такие как использование атомарных примитивов, блокировки чтения-записи и многие другие, но если вам это интересно, оставьте свой комментарий ниже и попросите такой учебник.
Если вы заинтересованы в изучении Qmake, я также недавно опубликовал The Vital Guide to Qmake. Это отличное чтение!