El artículo que falta sobre Qt Multithreading en C++
Publicado: 2022-03-11Los desarrolladores de C++ se esfuerzan por crear aplicaciones Qt robustas de subprocesos múltiples, pero los subprocesos múltiples nunca fueron fáciles con todas esas condiciones de carrera, sincronización y puntos muertos y bloqueos en vivo. Para su crédito, no se da por vencido y se encuentra recorriendo StackOverflow. Sin embargo, elegir la solución correcta y que funcione de una docena de respuestas diferentes no es trivial, especialmente dado que cada solución tiene sus propios inconvenientes.
Multithreading es un modelo generalizado de programación y ejecución que permite que existan múltiples hilos dentro del contexto de un proceso. Estos subprocesos comparten los recursos del proceso pero pueden ejecutarse de forma independiente. El modelo de programación con subprocesos proporciona a los desarrolladores una abstracción útil de la ejecución concurrente. Los subprocesos múltiples también se pueden aplicar a un proceso para permitir la ejecución paralela en un sistema de procesamiento múltiple.
–Wikipedia
El objetivo de este artículo es agregar el conocimiento esencial sobre la programación concurrente con el marco Qt, específicamente los temas más incomprendidos. Se espera que un lector tenga experiencia previa en Qt y C++ para comprender el contenido.
Elegir entre usar QThreadPool
y QThread
El framework Qt ofrece muchas herramientas para subprocesos múltiples. Elegir la herramienta correcta puede ser un desafío al principio, pero de hecho, el árbol de decisiones consta de solo dos opciones: desea que Qt administre los subprocesos por usted o desea administrar los subprocesos usted mismo. Sin embargo, existen otros criterios importantes:
Tareas que no necesitan el bucle de eventos. Específicamente, las tareas que no usan el mecanismo de señal/ranura durante la ejecución de la tarea.
Uso: QtConcurrent y QThreadPool + QRunnable.Tareas que usan señal/slots y por lo tanto necesitan el bucle de eventos.
Uso: objetos de trabajo movidos a + QThread.
La gran flexibilidad del marco Qt le permite solucionar el problema del "bucle de eventos faltantes" y agregar uno a QRunnable
:
class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }
Sin embargo, intente evitar tales "soluciones alternativas", ya que son peligrosas y no eficientes: si uno de los subprocesos del grupo de subprocesos (ejecutando MyTask) está bloqueado debido a la espera de una señal, entonces no puede ejecutar otras tareas del grupo.
También puede ejecutar un QThread
sin ningún bucle de eventos anulando QThread::run()
y esto está perfectamente bien siempre que sepa lo que está haciendo. Por ejemplo, no espere que el método quit()
funcione en tal caso.
Ejecutar una instancia de tarea a la vez
Imagine que necesita asegurarse de que solo se pueda ejecutar una instancia de tarea a la vez y que todas las solicitudes pendientes para ejecutar la misma tarea estén esperando en una determinada cola. Esto suele ser necesario cuando una tarea accede a un recurso exclusivo, como escribir en el mismo archivo o enviar paquetes mediante un socket TCP.
Olvidémonos de la informática y el patrón productor-consumidor por un momento y consideremos algo trivial; algo que se puede encontrar fácilmente en proyectos reales.
Una solución ingenua a este problema podría ser usar un QMutex
. Dentro de la función de la tarea, simplemente puede adquirir el mutex serializando de manera efectiva todos los subprocesos que intentan ejecutar la tarea. Esto garantizaría que solo un subproceso a la vez podría ejecutar la función. Sin embargo, esta solución afecta el rendimiento al presentar un problema de alta contención porque todos esos subprocesos se bloquearían (en el mutex) antes de que puedan continuar. Si tiene muchos subprocesos que utilizan activamente una tarea de este tipo y realizan algún trabajo útil en el medio, entonces todos estos subprocesos estarán inactivos la mayor parte del tiempo.
void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }
Para evitar conflictos, necesitamos una cola y un trabajador que viva en su propio subproceso y procese la cola. Este es más o menos el patrón clásico productor-consumidor . El trabajador ( consumidor ) seleccionaría las solicitudes de la cola una por una, y cada productor puede simplemente agregar sus solicitudes a la cola. Suena simple al principio y puede pensar en usar QQueue
y QWaitCondition
, pero espere y veamos si podemos lograr el objetivo sin estas primitivas:
- Podemos usar
QThreadPool
ya que tiene una cola de tareas pendientes
O
- Podemos usar
QThread::run()
predeterminado porque tieneQEventLoop
La primera opción es usar QThreadPool
. Podemos crear una instancia de QThreadPool
y usar QThreadPool::setMaxThreadCount(1)
. Entonces podemos usar QtConcurrent::run()
para programar solicitudes:
class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };
Esta solución tiene un beneficio: QThreadPool::clear()
le permite cancelar instantáneamente todas las solicitudes pendientes, por ejemplo, cuando su aplicación necesita cerrarse rápidamente. Sin embargo, también hay un inconveniente importante relacionado con la afinidad de subprocesos : es probable que la función logEventCore
se ejecute en diferentes subprocesos de una llamada a otra. Y sabemos que Qt tiene algunas clases que requieren afinidad de subprocesos : QTimer
, QTcpSocket
y posiblemente algunas otras.
Lo que dice la especificación Qt sobre la afinidad de subprocesos: los temporizadores comenzaron en un subproceso, no se pueden detener desde otro subproceso. Y solo el subproceso que posee una instancia de socket puede usar este socket. Esto implica que debe detener los temporizadores en ejecución en el subproceso que los inició y debe llamar a QTcpSocket::close() en el subproceso que posee el zócalo. Ambos ejemplos suelen ejecutarse en destructores.
La mejor solución se basa en el uso de QEventLoop
proporcionado por QThread
. La idea es simple: usamos un mecanismo de señal/ranura para emitir solicitudes, y el bucle de eventos que se ejecuta dentro del subproceso servirá como una cola que permite ejecutar solo una ranura a la vez.
// the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };
La implementación del constructor LogWorker
y logEvent
es sencilla y, por lo tanto, no se proporciona aquí. Ahora necesitamos un servicio que administrará el hilo y la instancia del trabajador:
// interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent("event"); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); }
Analicemos cómo funciona este código:
- En el constructor, creamos una instancia de hilo y trabajador. Tenga en cuenta que el trabajador no recibe un padre porque se moverá al nuevo subproceso. Debido a esto, Qt no podrá liberar la memoria del trabajador automáticamente y, por lo tanto, debemos hacerlo conectando la señal
QThread::finished
a la ranuradeleteLater
. También conectamos el método proxyLogService::logEvent()
aLogWorker::logEvent()
que usará el modoQt::QueuedConnection
debido a diferentes subprocesos. - En el destructor, colocamos el evento de
quit
en la cola del bucle de eventos. Este evento se manejará después de que se manejen todos los demás eventos. Por ejemplo, si hemos realizado cientos de llamadas alogEvent()
justo antes de la llamada al destructor, el registrador las manejará todas antes de obtener el evento de salida. Esto lleva tiempo, por supuesto, por lo que debemoswait()
hasta que finalice el ciclo de eventos. Vale la pena mencionar que todas las futuras solicitudes de registro publicadas después del evento de salida nunca se procesarán. - El registro en sí mismo (
LogWorker::logEvent
) siempre se realizará en el mismo subproceso, por lo tanto, este enfoque funciona bien para las clases que requieren afinidad de subprocesos . Al mismo tiempo, el constructor y el destructor deLogWorker
se ejecutan en el subproceso principal (específicamente el subproceso en el que se ejecutaLogService
) y, por lo tanto, debe tener mucho cuidado con el código que está ejecutando allí. Específicamente, no detenga los temporizadores ni use sockets en el destructor del trabajador a menos que pueda estar ejecutando el destructor en el mismo hilo.
Ejecutando el destructor del trabajador en el mismo hilo
Si su trabajador está trabajando con temporizadores o enchufes, debe asegurarse de que el destructor se ejecute en el mismo subproceso (el subproceso que creó para el trabajador y al que movió al trabajador). La forma obvia de admitir esto es crear una subclase de QThread
y delete
el trabajador dentro QThread::run()
. Considere la siguiente plantilla:

template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };
Usando esta plantilla, redefinimos LogService
del ejemplo anterior:
// interface class LogService : public Thread<LogWorker> { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread<LogWorker>(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }
Analicemos cómo se supone que funciona esto:
- Hicimos que
LogService
sea el objetoQThread
porque necesitábamos implementar la funciónrun()
personalizada. Usamos subclases privadas para evitar el acceso a las funciones deQThread
, ya que queremos controlar el ciclo de vida del subproceso internamente. - En la función
Thread::run()
, ejecutamos el bucle de eventos llamando a la implementación predeterminadaQThread::run()
y destruimos la instancia del trabajador justo después de que el bucle de eventos se haya cerrado. Tenga en cuenta que el destructor del trabajador se ejecuta en el mismo hilo. -
LogService::logEvent()
es la función proxy (señal) que publicará el evento de registro en la cola de eventos del subproceso.
Pausar y reanudar los hilos
Otra oportunidad interesante es poder suspender y reanudar nuestros hilos personalizados. Imagine que su aplicación está realizando algún procesamiento que debe suspenderse cuando la aplicación se minimiza, se bloquea o simplemente pierde la conexión a la red. Esto se puede lograr mediante la creación de una cola asincrónica personalizada que retendrá todas las solicitudes pendientes hasta que se reanude el trabajador. Sin embargo, dado que estamos buscando soluciones más fáciles, usaremos (nuevamente) la cola del bucle de eventos para el mismo propósito.
Para suspender un hilo, claramente necesitamos que espere en una cierta condición de espera. Si el subproceso se bloquea de esta manera, su ciclo de eventos no está manejando ningún evento y Qt tiene que poner keep en la cola. Una vez reanudado, el bucle de eventos procesará todas las solicitudes acumuladas. Para la condición de espera, simplemente usamos el objeto QWaitCondition
que también requiere un QMutex
. Para diseñar una solución genérica que cualquier trabajador pueda reutilizar, debemos colocar toda la lógica de suspensión/reanudación en una clase base reutilizable. Llamémoslo SuspendableWorker
. Dicha clase admitirá dos métodos:
-
suspend()
sería una llamada de bloqueo que establece el hilo en espera en una condición de espera. Esto se haría publicando una solicitud de suspensión en la cola y esperando hasta que se maneje. Bastante similar aQThread::quit()
+wait()
. -
resume()
señalaría la condición de espera para despertar el subproceso inactivo para continuar su ejecución.
Repasemos la interfaz y la implementación:
// interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
// implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }
Recuerde que un hilo suspendido nunca recibirá un evento de quit
. Por esta razón, no podemos usar esto de manera segura con Vanilla QThread
a menos que reanudemos el hilo antes de dejar de publicar. Integrémoslo en nuestra plantilla Thread<T>
personalizada para que sea a prueba de balas.
template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };
Con estos cambios, retomaremos el hilo antes de publicar el evento de salida. Además, Thread<TWorker>
aún permite pasar cualquier tipo de trabajador, independientemente de si es un SuspendableWorker
o no.
El uso sería el siguiente:
LogService logService; logService.logEvent("processed event"); logService.suspend(); logService.logEvent("queued event"); logService.resume(); // "queued event" is now processed.
volátil vs atómico
Este es un tema comúnmente mal entendido. La mayoría de la gente cree que las variables volatile
se pueden usar para servir ciertas banderas a las que acceden múltiples subprocesos y que esto preserva las condiciones de carrera de datos. Eso es falso, y las clases QAtomic*
(o std::atomic
) deben usarse para este propósito.
Consideremos un ejemplo realista: una clase de conexión TcpConnection
que funciona en un subproceso dedicado, y queremos que esta clase exporte un método seguro para subprocesos: bool isConnected()
. Internamente, la clase escuchará eventos de socket: connected
y disconnected
para mantener un indicador booleano interno:
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }
Hacer que el miembro _connected
sea volatile
no resolverá el problema y no hará que isConnected()
sea seguro para subprocesos. Esta solución funcionará el 99 % de las veces, pero el 1 % restante hará de tu vida una pesadilla. Para arreglar esto, necesitamos proteger el acceso variable de múltiples hilos. QReadWriteLocker
para este propósito:
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }
Esto funciona de manera confiable, pero no tan rápido como usar operaciones atómicas "sin bloqueo". La tercera solución es rápida y segura para subprocesos (el ejemplo usa std::atomic
en lugar de QAtomicInt
, pero semánticamente son idénticos):
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic<bool> _connected; }
Conclusión
En este artículo, discutimos varias preocupaciones importantes sobre la programación concurrente con el marco Qt y diseñamos soluciones para abordar casos de uso específicos. No hemos considerado muchos de los temas simples, como el uso de primitivas atómicas, bloqueos de lectura y escritura y muchos otros, pero si está interesado en estos, deje su comentario a continuación y solicite un tutorial de este tipo.
Si está interesado en explorar Qmake, también publiqué recientemente The Vital Guide to Qmake. ¡Es una gran lectura!