Artikel yang Hilang Tentang Qt Multithreading di C++

Diterbitkan: 2022-03-11

Pengembang C++ berusaha keras untuk membangun aplikasi Qt multithreaded yang kuat, tetapi multithreading tidak pernah mudah dengan semua kondisi balapan, sinkronisasi, dan deadlock dan livelock tersebut. Untuk kredit Anda, Anda tidak menyerah dan menemukan diri Anda menjelajahi StackOverflow. Namun demikian, memilih solusi yang tepat dan bekerja dari selusin jawaban yang berbeda cukup non-sepele, terutama mengingat bahwa setiap solusi datang dengan kekurangannya sendiri.

Multithreading adalah model pemrograman dan eksekusi yang tersebar luas yang memungkinkan banyak utas ada dalam konteks satu proses. Utas ini berbagi sumber daya proses tetapi dapat dieksekusi secara independen. Model pemrograman berulir menyediakan pengembang dengan abstraksi yang berguna dari eksekusi bersamaan. Multithreading juga dapat diterapkan pada satu proses untuk mengaktifkan eksekusi paralel pada sistem multiproses.

– Wikipedia

Tujuan artikel ini adalah untuk mengumpulkan pengetahuan penting tentang pemrograman bersamaan dengan kerangka kerja Qt, khususnya topik yang paling disalahpahami. Pembaca diharapkan memiliki latar belakang Qt dan C++ sebelumnya untuk memahami konten.

Memilih antara menggunakan QThreadPool dan QThread

Kerangka kerja Qt menawarkan banyak alat untuk multithreading. Memilih alat yang tepat dapat menjadi tantangan pada awalnya, tetapi pada kenyataannya, pohon keputusan hanya terdiri dari dua opsi: Anda ingin Qt mengelola utas untuk Anda, atau Anda ingin mengelola utas sendiri. Namun, ada kriteria penting lainnya:

  1. Tugas yang tidak memerlukan loop acara. Secara khusus, tugas-tugas yang tidak menggunakan mekanisme sinyal/slot selama pelaksanaan tugas.
    Gunakan: QtConcurrent dan QThreadPool + QRunnable.

  2. Tugas yang menggunakan sinyal/slot dan karena itu memerlukan loop peristiwa.
    Gunakan: Objek pekerja dipindahkan ke + QThread.

Fleksibilitas luar biasa dari kerangka kerja Qt memungkinkan Anda untuk mengatasi masalah "loop peristiwa yang hilang" dan menambahkannya ke QRunnable :

 class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }

Cobalah untuk menghindari "solusi" seperti itu, karena ini berbahaya dan tidak efisien: jika salah satu utas dari kumpulan utas (menjalankan MyTask) diblokir karena menunggu sinyal, maka itu tidak dapat menjalankan tugas lain dari kumpulan.

teks alternatif

Anda juga dapat menjalankan QThread tanpa loop peristiwa apa pun dengan mengganti metode QThread::run() dan ini baik-baik saja selama Anda tahu apa yang Anda lakukan. Misalnya, jangan berharap metode quit() berfungsi dalam kasus seperti itu.

Menjalankan satu contoh tugas pada satu waktu

Bayangkan Anda perlu memastikan bahwa hanya satu instance tugas pada satu waktu yang dapat dieksekusi dan semua permintaan tertunda untuk menjalankan tugas yang sama sedang menunggu di antrean tertentu. Ini sering diperlukan ketika tugas mengakses sumber daya eksklusif, seperti menulis ke file yang sama atau mengirim paket menggunakan soket TCP.

Mari sejenak lupakan ilmu komputer dan pola produsen-konsumen dan anggap sepele; sesuatu yang dapat dengan mudah ditemukan dalam proyek nyata.

Solusi naif untuk masalah ini bisa menggunakan QMutex . Di dalam fungsi tugas, Anda cukup mendapatkan mutex yang secara efektif membuat serial semua utas yang mencoba menjalankan tugas. Ini akan menjamin bahwa hanya satu utas pada satu waktu yang dapat menjalankan fungsi. Namun, solusi ini berdampak pada kinerja dengan memperkenalkan masalah pertengkaran tinggi karena semua utas tersebut akan diblokir (di mutex) sebelum dapat melanjutkan. Jika Anda memiliki banyak utas yang secara aktif menggunakan tugas seperti itu dan melakukan beberapa pekerjaan yang berguna di antaranya, maka semua utas ini hanya akan tidur sebagian besar waktu.

 void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }

Untuk menghindari pertengkaran, kita membutuhkan antrian dan pekerja yang tinggal di threadnya sendiri dan memproses antrian. Ini adalah pola produsen-konsumen klasik. Pekerja ( konsumen ) akan memilih permintaan dari antrian satu per satu, dan setiap produsen dapat dengan mudah menambahkan permintaannya ke dalam antrian. Kedengarannya sederhana pada awalnya dan Anda mungkin berpikir untuk menggunakan QQueue dan QWaitCondition , tetapi tunggu dan mari kita lihat apakah kita dapat mencapai tujuan tanpa primitif ini:

  • Kami dapat menggunakan QThreadPool karena memiliki antrian tugas yang tertunda

Atau

  • Kita dapat menggunakan QThread::run() default karena memiliki QEventLoop

Opsi pertama adalah menggunakan QThreadPool . Kita dapat membuat instance QThreadPool dan menggunakan QThreadPool::setMaxThreadCount(1) . Kemudian kita dapat menggunakan QtConcurrent::run() untuk menjadwalkan permintaan:

 class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };

Solusi ini memiliki satu keuntungan: QThreadPool::clear() memungkinkan Anda untuk langsung membatalkan semua permintaan yang tertunda, misalnya saat aplikasi Anda perlu dimatikan dengan cepat. Namun, ada juga kelemahan signifikan yang terhubung ke thread-affinity : fungsi logEventCore kemungkinan akan dieksekusi di thread yang berbeda dari panggilan ke panggilan. Dan kami tahu Qt memiliki beberapa kelas yang memerlukan afinitas utas : QTimer , QTcpSocket dan mungkin beberapa lainnya.

Apa yang dikatakan Qt spec tentang afinitas utas: pengatur waktu dimulai di satu utas, tidak dapat dihentikan dari utas lain. Dan hanya utas yang memiliki instance soket yang dapat menggunakan soket ini. Ini menyiratkan bahwa Anda harus menghentikan penghitung waktu yang berjalan di utas yang memulainya dan Anda harus memanggil QTcpSocket::close() di utas yang memiliki soket. Kedua contoh biasanya dieksekusi di destruktor.

Solusi yang lebih baik bergantung pada penggunaan QEventLoop disediakan oleh QThread . Idenya sederhana: kami menggunakan mekanisme sinyal/slot untuk mengeluarkan permintaan, dan loop peristiwa yang berjalan di dalam utas akan berfungsi sebagai antrian yang memungkinkan hanya satu slot pada satu waktu untuk dieksekusi.

 // the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };

Implementasi konstruktor LogWorker dan logEvent sangat mudah dan oleh karena itu tidak disediakan di sini. Sekarang kita membutuhkan layanan yang akan mengelola utas dan instance pekerja:

 // interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent("event"); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); } 

teks alternatif

Mari kita bahas cara kerja kode ini:

  • Di konstruktor, kami membuat contoh utas dan pekerja. Perhatikan bahwa pekerja tidak menerima induk, karena akan dipindahkan ke utas baru. Karena itu, Qt tidak akan dapat melepaskan memori pekerja secara otomatis, dan oleh karena itu, kita perlu melakukan ini dengan menghubungkan sinyal QThread::finished ke slot deleteLater . Kami juga menghubungkan metode proxy LogService::logEvent() ke LogWorker::logEvent() yang akan menggunakan mode Qt::QueuedConnection karena utas yang berbeda.
  • Di destruktor, kami menempatkan acara quit ke dalam antrian loop acara. Acara ini akan ditangani setelah semua acara lainnya ditangani. Misalnya, jika kita telah membuat ratusan panggilan logEvent() sesaat sebelum panggilan destruktor, logger akan menangani semuanya sebelum mengambil acara quit. Ini membutuhkan waktu, tentu saja, jadi kita harus wait() sampai event loop keluar. Perlu disebutkan bahwa semua permintaan pencatatan di masa mendatang yang diposting setelah acara berhenti tidak akan pernah diproses.
  • Logging itu sendiri ( LogWorker::logEvent ) akan selalu dilakukan di utas yang sama, oleh karena itu pendekatan ini berfungsi dengan baik untuk kelas yang membutuhkan afinitas utas . Pada saat yang sama, konstruktor dan destruktor LogWorker dieksekusi di utas utama (khususnya utas tempat LogService berjalan), dan oleh karena itu, Anda harus sangat berhati-hati dengan kode apa yang Anda jalankan di sana. Secara khusus, jangan hentikan penghitung waktu atau gunakan soket di destruktor pekerja kecuali Anda dapat menjalankan destruktor di utas yang sama!

Menjalankan destruktor pekerja di utas yang sama

Jika pekerja Anda berurusan dengan pengatur waktu atau soket, Anda perlu memastikan destruktor dijalankan di utas yang sama (utas yang Anda buat untuk pekerja dan tempat Anda memindahkan pekerja). Cara yang jelas untuk mendukung ini adalah dengan membuat subkelas QThread dan delete pekerja di dalam QThread::run() . Perhatikan templat berikut:

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };

Dengan menggunakan template ini, kami mendefinisikan ulang LogService dari contoh sebelumnya:

 // interface class LogService : public Thread<LogWorker> { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread<LogWorker>(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }

Mari kita bahas bagaimana ini seharusnya bekerja:

  • Kami menjadikan LogService sebagai objek QThread karena kami perlu mengimplementasikan fungsi run() kustom. Kami menggunakan subkelas pribadi untuk mencegah mengakses fungsi QThread karena kami ingin mengontrol siklus hidup utas secara internal.
  • Dalam fungsi Thread::run() kami menjalankan loop peristiwa dengan memanggil QThread::run() default, dan menghancurkan instance pekerja tepat setelah loop peristiwa berhenti. Perhatikan bahwa destruktor pekerja dieksekusi di utas yang sama.
  • LogService::logEvent() adalah fungsi proxy (sinyal) yang akan memposting peristiwa logging ke antrean peristiwa utas.

Menjeda dan melanjutkan utas

Peluang menarik lainnya adalah dapat menangguhkan dan melanjutkan utas khusus kami. Bayangkan aplikasi Anda sedang melakukan beberapa pemrosesan yang perlu ditangguhkan ketika aplikasi diminimalkan, dikunci, atau baru saja kehilangan koneksi jaringan. Ini dapat dicapai dengan membuat antrean asinkron khusus yang akan menampung semua permintaan yang tertunda hingga pekerja dilanjutkan. Namun, karena kami mencari solusi termudah, kami akan menggunakan (sekali lagi) antrian loop acara untuk tujuan yang sama.

Untuk menangguhkan utas, kami jelas membutuhkannya untuk menunggu pada kondisi menunggu tertentu. Jika utas diblokir dengan cara ini, loop peristiwanya tidak menangani peristiwa apa pun dan Qt harus dimasukkan ke dalam antrian. Setelah dilanjutkan, loop acara akan memproses semua permintaan yang terakumulasi. Untuk kondisi waiting, kita cukup menggunakan objek QWaitCondition yang juga membutuhkan QMutex . Untuk merancang solusi umum yang dapat digunakan kembali oleh pekerja mana pun, kita perlu menempatkan semua logika penangguhan/lanjutan ke dalam kelas dasar yang dapat digunakan kembali. Sebut saja SuspendableWorker . Kelas seperti itu harus mendukung dua metode:

  • suspend() akan menjadi panggilan pemblokiran yang menetapkan utas menunggu pada kondisi menunggu. Ini akan dilakukan dengan memposting permintaan penangguhan ke dalam antrian dan menunggu sampai ditangani. Hampir mirip dengan QThread::quit() + wait() .
  • resume() akan memberi sinyal kondisi tunggu untuk membangunkan utas tidur untuk melanjutkan eksekusinya.

Mari kita tinjau antarmuka dan implementasinya:

 // interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
 // implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }

Ingat bahwa utas yang ditangguhkan tidak akan pernah menerima acara quit . Untuk alasan ini, kami tidak dapat menggunakan ini dengan aman dengan vanilla QThread kecuali kami melanjutkan utas sebelum memposting berhenti. Mari kita integrasikan ini ke dalam template Thread<T> kustom kita untuk membuatnya antipeluru.

teks alternatif

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };

Dengan perubahan ini, kami akan melanjutkan utas sebelum memposting acara berhenti. Juga, Thread<TWorker> masih memungkinkan semua jenis pekerja untuk diteruskan terlepas dari apakah itu SuspendableWorker atau bukan.

Penggunaannya akan menjadi sebagai berikut:

 LogService logService; logService.logEvent("processed event"); logService.suspend(); logService.logEvent("queued event"); logService.resume(); // "queued event" is now processed.

volatil vs atom

Ini adalah topik yang sering disalahpahami. Kebanyakan orang percaya bahwa variabel volatile dapat digunakan untuk melayani flag tertentu yang diakses oleh banyak utas dan ini dipertahankan dari kondisi balapan data. Itu salah, dan kelas QAtomic* (atau std::atomic ) harus digunakan untuk tujuan ini.

Mari kita pertimbangkan contoh realistis: kelas koneksi TcpConnection yang bekerja di utas khusus, dan kami ingin kelas ini mengekspor metode aman-utas: bool isConnected() . Secara internal, kelas akan mendengarkan peristiwa soket: connected dan disconnected untuk mempertahankan flag boolean internal:

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }

Membuat _connected member volatile tidak akan menyelesaikan masalah dan tidak akan membuat thread-safe isConnected() . Solusi ini akan bekerja 99% dari waktu, tetapi 1% sisanya akan membuat hidup Anda menjadi mimpi buruk. Untuk memperbaikinya, kita perlu melindungi akses variabel dari beberapa utas. Mari kita gunakan QReadWriteLocker untuk tujuan ini:

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }

Ini bekerja dengan andal, tetapi tidak secepat menggunakan operasi atom "bebas kunci". Solusi ketiga cepat dan aman (contohnya menggunakan std::atomic alih-alih QAtomicInt , tetapi secara semantik ini identik):

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic<bool> _connected; }

Kesimpulan

Dalam artikel ini, kami membahas beberapa masalah penting tentang pemrograman bersamaan dengan kerangka kerja Qt dan solusi yang dirancang untuk menangani kasus penggunaan tertentu. Kami belum mempertimbangkan banyak topik sederhana seperti penggunaan atom primitif, kunci baca-tulis, dan banyak lainnya, tetapi jika Anda tertarik dengan ini, tinggalkan komentar Anda di bawah dan mintalah tutorial semacam itu.

Jika Anda tertarik untuk menjelajahi Qmake, saya juga baru-baru ini menerbitkan The Vital Guide to Qmake. Ini adalah bacaan yang bagus!