Artikel yang Hilang Tentang Qt Multithreading di C++
Diterbitkan: 2022-03-11Pengembang C++ berusaha keras untuk membangun aplikasi Qt multithreaded yang kuat, tetapi multithreading tidak pernah mudah dengan semua kondisi balapan, sinkronisasi, dan deadlock dan livelock tersebut. Untuk kredit Anda, Anda tidak menyerah dan menemukan diri Anda menjelajahi StackOverflow. Namun demikian, memilih solusi yang tepat dan bekerja dari selusin jawaban yang berbeda cukup non-sepele, terutama mengingat bahwa setiap solusi datang dengan kekurangannya sendiri.
Multithreading adalah model pemrograman dan eksekusi yang tersebar luas yang memungkinkan banyak utas ada dalam konteks satu proses. Utas ini berbagi sumber daya proses tetapi dapat dieksekusi secara independen. Model pemrograman berulir menyediakan pengembang dengan abstraksi yang berguna dari eksekusi bersamaan. Multithreading juga dapat diterapkan pada satu proses untuk mengaktifkan eksekusi paralel pada sistem multiproses.
– Wikipedia
Tujuan artikel ini adalah untuk mengumpulkan pengetahuan penting tentang pemrograman bersamaan dengan kerangka kerja Qt, khususnya topik yang paling disalahpahami. Pembaca diharapkan memiliki latar belakang Qt dan C++ sebelumnya untuk memahami konten.
Memilih antara menggunakan QThreadPool
dan QThread
Kerangka kerja Qt menawarkan banyak alat untuk multithreading. Memilih alat yang tepat dapat menjadi tantangan pada awalnya, tetapi pada kenyataannya, pohon keputusan hanya terdiri dari dua opsi: Anda ingin Qt mengelola utas untuk Anda, atau Anda ingin mengelola utas sendiri. Namun, ada kriteria penting lainnya:
Tugas yang tidak memerlukan loop acara. Secara khusus, tugas-tugas yang tidak menggunakan mekanisme sinyal/slot selama pelaksanaan tugas.
Gunakan: QtConcurrent dan QThreadPool + QRunnable.Tugas yang menggunakan sinyal/slot dan karena itu memerlukan loop peristiwa.
Gunakan: Objek pekerja dipindahkan ke + QThread.
Fleksibilitas luar biasa dari kerangka kerja Qt memungkinkan Anda untuk mengatasi masalah "loop peristiwa yang hilang" dan menambahkannya ke QRunnable
:
class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }
Cobalah untuk menghindari "solusi" seperti itu, karena ini berbahaya dan tidak efisien: jika salah satu utas dari kumpulan utas (menjalankan MyTask) diblokir karena menunggu sinyal, maka itu tidak dapat menjalankan tugas lain dari kumpulan.
Anda juga dapat menjalankan QThread
tanpa loop peristiwa apa pun dengan mengganti metode QThread::run()
dan ini baik-baik saja selama Anda tahu apa yang Anda lakukan. Misalnya, jangan berharap metode quit()
berfungsi dalam kasus seperti itu.
Menjalankan satu contoh tugas pada satu waktu
Bayangkan Anda perlu memastikan bahwa hanya satu instance tugas pada satu waktu yang dapat dieksekusi dan semua permintaan tertunda untuk menjalankan tugas yang sama sedang menunggu di antrean tertentu. Ini sering diperlukan ketika tugas mengakses sumber daya eksklusif, seperti menulis ke file yang sama atau mengirim paket menggunakan soket TCP.
Mari sejenak lupakan ilmu komputer dan pola produsen-konsumen dan anggap sepele; sesuatu yang dapat dengan mudah ditemukan dalam proyek nyata.
Solusi naif untuk masalah ini bisa menggunakan QMutex
. Di dalam fungsi tugas, Anda cukup mendapatkan mutex yang secara efektif membuat serial semua utas yang mencoba menjalankan tugas. Ini akan menjamin bahwa hanya satu utas pada satu waktu yang dapat menjalankan fungsi. Namun, solusi ini berdampak pada kinerja dengan memperkenalkan masalah pertengkaran tinggi karena semua utas tersebut akan diblokir (di mutex) sebelum dapat melanjutkan. Jika Anda memiliki banyak utas yang secara aktif menggunakan tugas seperti itu dan melakukan beberapa pekerjaan yang berguna di antaranya, maka semua utas ini hanya akan tidur sebagian besar waktu.
void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }
Untuk menghindari pertengkaran, kita membutuhkan antrian dan pekerja yang tinggal di threadnya sendiri dan memproses antrian. Ini adalah pola produsen-konsumen klasik. Pekerja ( konsumen ) akan memilih permintaan dari antrian satu per satu, dan setiap produsen dapat dengan mudah menambahkan permintaannya ke dalam antrian. Kedengarannya sederhana pada awalnya dan Anda mungkin berpikir untuk menggunakan QQueue
dan QWaitCondition
, tetapi tunggu dan mari kita lihat apakah kita dapat mencapai tujuan tanpa primitif ini:
- Kami dapat menggunakan
QThreadPool
karena memiliki antrian tugas yang tertunda
Atau
- Kita dapat menggunakan
QThread::run()
default karena memilikiQEventLoop
Opsi pertama adalah menggunakan QThreadPool
. Kita dapat membuat instance QThreadPool
dan menggunakan QThreadPool::setMaxThreadCount(1)
. Kemudian kita dapat menggunakan QtConcurrent::run()
untuk menjadwalkan permintaan:
class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };
Solusi ini memiliki satu keuntungan: QThreadPool::clear()
memungkinkan Anda untuk langsung membatalkan semua permintaan yang tertunda, misalnya saat aplikasi Anda perlu dimatikan dengan cepat. Namun, ada juga kelemahan signifikan yang terhubung ke thread-affinity : fungsi logEventCore
kemungkinan akan dieksekusi di thread yang berbeda dari panggilan ke panggilan. Dan kami tahu Qt memiliki beberapa kelas yang memerlukan afinitas utas : QTimer
, QTcpSocket
dan mungkin beberapa lainnya.
Apa yang dikatakan Qt spec tentang afinitas utas: pengatur waktu dimulai di satu utas, tidak dapat dihentikan dari utas lain. Dan hanya utas yang memiliki instance soket yang dapat menggunakan soket ini. Ini menyiratkan bahwa Anda harus menghentikan penghitung waktu yang berjalan di utas yang memulainya dan Anda harus memanggil QTcpSocket::close() di utas yang memiliki soket. Kedua contoh biasanya dieksekusi di destruktor.
Solusi yang lebih baik bergantung pada penggunaan QEventLoop
disediakan oleh QThread
. Idenya sederhana: kami menggunakan mekanisme sinyal/slot untuk mengeluarkan permintaan, dan loop peristiwa yang berjalan di dalam utas akan berfungsi sebagai antrian yang memungkinkan hanya satu slot pada satu waktu untuk dieksekusi.
// the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };
Implementasi konstruktor LogWorker
dan logEvent
sangat mudah dan oleh karena itu tidak disediakan di sini. Sekarang kita membutuhkan layanan yang akan mengelola utas dan instance pekerja:
// interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent("event"); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); }
Mari kita bahas cara kerja kode ini:
- Di konstruktor, kami membuat contoh utas dan pekerja. Perhatikan bahwa pekerja tidak menerima induk, karena akan dipindahkan ke utas baru. Karena itu, Qt tidak akan dapat melepaskan memori pekerja secara otomatis, dan oleh karena itu, kita perlu melakukan ini dengan menghubungkan sinyal
QThread::finished
ke slotdeleteLater
. Kami juga menghubungkan metode proxyLogService::logEvent()
keLogWorker::logEvent()
yang akan menggunakan modeQt::QueuedConnection
karena utas yang berbeda. - Di destruktor, kami menempatkan acara
quit
ke dalam antrian loop acara. Acara ini akan ditangani setelah semua acara lainnya ditangani. Misalnya, jika kita telah membuat ratusan panggilanlogEvent()
sesaat sebelum panggilan destruktor, logger akan menangani semuanya sebelum mengambil acara quit. Ini membutuhkan waktu, tentu saja, jadi kita haruswait()
sampai event loop keluar. Perlu disebutkan bahwa semua permintaan pencatatan di masa mendatang yang diposting setelah acara berhenti tidak akan pernah diproses. - Logging itu sendiri (
LogWorker::logEvent
) akan selalu dilakukan di utas yang sama, oleh karena itu pendekatan ini berfungsi dengan baik untuk kelas yang membutuhkan afinitas utas . Pada saat yang sama, konstruktor dan destruktorLogWorker
dieksekusi di utas utama (khususnya utas tempatLogService
berjalan), dan oleh karena itu, Anda harus sangat berhati-hati dengan kode apa yang Anda jalankan di sana. Secara khusus, jangan hentikan penghitung waktu atau gunakan soket di destruktor pekerja kecuali Anda dapat menjalankan destruktor di utas yang sama!
Menjalankan destruktor pekerja di utas yang sama
Jika pekerja Anda berurusan dengan pengatur waktu atau soket, Anda perlu memastikan destruktor dijalankan di utas yang sama (utas yang Anda buat untuk pekerja dan tempat Anda memindahkan pekerja). Cara yang jelas untuk mendukung ini adalah dengan membuat subkelas QThread
dan delete
pekerja di dalam QThread::run()
. Perhatikan templat berikut:

template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };
Dengan menggunakan template ini, kami mendefinisikan ulang LogService
dari contoh sebelumnya:
// interface class LogService : public Thread<LogWorker> { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread<LogWorker>(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }
Mari kita bahas bagaimana ini seharusnya bekerja:
- Kami menjadikan
LogService
sebagai objekQThread
karena kami perlu mengimplementasikan fungsirun()
kustom. Kami menggunakan subkelas pribadi untuk mencegah mengakses fungsiQThread
karena kami ingin mengontrol siklus hidup utas secara internal. - Dalam fungsi
Thread::run()
kami menjalankan loop peristiwa dengan memanggilQThread::run()
default, dan menghancurkan instance pekerja tepat setelah loop peristiwa berhenti. Perhatikan bahwa destruktor pekerja dieksekusi di utas yang sama. -
LogService::logEvent()
adalah fungsi proxy (sinyal) yang akan memposting peristiwa logging ke antrean peristiwa utas.
Menjeda dan melanjutkan utas
Peluang menarik lainnya adalah dapat menangguhkan dan melanjutkan utas khusus kami. Bayangkan aplikasi Anda sedang melakukan beberapa pemrosesan yang perlu ditangguhkan ketika aplikasi diminimalkan, dikunci, atau baru saja kehilangan koneksi jaringan. Ini dapat dicapai dengan membuat antrean asinkron khusus yang akan menampung semua permintaan yang tertunda hingga pekerja dilanjutkan. Namun, karena kami mencari solusi termudah, kami akan menggunakan (sekali lagi) antrian loop acara untuk tujuan yang sama.
Untuk menangguhkan utas, kami jelas membutuhkannya untuk menunggu pada kondisi menunggu tertentu. Jika utas diblokir dengan cara ini, loop peristiwanya tidak menangani peristiwa apa pun dan Qt harus dimasukkan ke dalam antrian. Setelah dilanjutkan, loop acara akan memproses semua permintaan yang terakumulasi. Untuk kondisi waiting, kita cukup menggunakan objek QWaitCondition
yang juga membutuhkan QMutex
. Untuk merancang solusi umum yang dapat digunakan kembali oleh pekerja mana pun, kita perlu menempatkan semua logika penangguhan/lanjutan ke dalam kelas dasar yang dapat digunakan kembali. Sebut saja SuspendableWorker
. Kelas seperti itu harus mendukung dua metode:
-
suspend()
akan menjadi panggilan pemblokiran yang menetapkan utas menunggu pada kondisi menunggu. Ini akan dilakukan dengan memposting permintaan penangguhan ke dalam antrian dan menunggu sampai ditangani. Hampir mirip denganQThread::quit()
+wait()
. -
resume()
akan memberi sinyal kondisi tunggu untuk membangunkan utas tidur untuk melanjutkan eksekusinya.
Mari kita tinjau antarmuka dan implementasinya:
// interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
// implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }
Ingat bahwa utas yang ditangguhkan tidak akan pernah menerima acara quit
. Untuk alasan ini, kami tidak dapat menggunakan ini dengan aman dengan vanilla QThread
kecuali kami melanjutkan utas sebelum memposting berhenti. Mari kita integrasikan ini ke dalam template Thread<T>
kustom kita untuk membuatnya antipeluru.
template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };
Dengan perubahan ini, kami akan melanjutkan utas sebelum memposting acara berhenti. Juga, Thread<TWorker>
masih memungkinkan semua jenis pekerja untuk diteruskan terlepas dari apakah itu SuspendableWorker
atau bukan.
Penggunaannya akan menjadi sebagai berikut:
LogService logService; logService.logEvent("processed event"); logService.suspend(); logService.logEvent("queued event"); logService.resume(); // "queued event" is now processed.
volatil vs atom
Ini adalah topik yang sering disalahpahami. Kebanyakan orang percaya bahwa variabel volatile
dapat digunakan untuk melayani flag tertentu yang diakses oleh banyak utas dan ini dipertahankan dari kondisi balapan data. Itu salah, dan kelas QAtomic*
(atau std::atomic
) harus digunakan untuk tujuan ini.
Mari kita pertimbangkan contoh realistis: kelas koneksi TcpConnection
yang bekerja di utas khusus, dan kami ingin kelas ini mengekspor metode aman-utas: bool isConnected()
. Secara internal, kelas akan mendengarkan peristiwa soket: connected
dan disconnected
untuk mempertahankan flag boolean internal:
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }
Membuat _connected
member volatile
tidak akan menyelesaikan masalah dan tidak akan membuat thread-safe isConnected()
. Solusi ini akan bekerja 99% dari waktu, tetapi 1% sisanya akan membuat hidup Anda menjadi mimpi buruk. Untuk memperbaikinya, kita perlu melindungi akses variabel dari beberapa utas. Mari kita gunakan QReadWriteLocker
untuk tujuan ini:
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }
Ini bekerja dengan andal, tetapi tidak secepat menggunakan operasi atom "bebas kunci". Solusi ketiga cepat dan aman (contohnya menggunakan std::atomic
alih-alih QAtomicInt
, tetapi secara semantik ini identik):
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic<bool> _connected; }
Kesimpulan
Dalam artikel ini, kami membahas beberapa masalah penting tentang pemrograman bersamaan dengan kerangka kerja Qt dan solusi yang dirancang untuk menangani kasus penggunaan tertentu. Kami belum mempertimbangkan banyak topik sederhana seperti penggunaan atom primitif, kunci baca-tulis, dan banyak lainnya, tetapi jika Anda tertarik dengan ini, tinggalkan komentar Anda di bawah dan mintalah tutorial semacam itu.
Jika Anda tertarik untuk menjelajahi Qmake, saya juga baru-baru ini menerbitkan The Vital Guide to Qmake. Ini adalah bacaan yang bagus!