C++'da Qt Multithreading Hakkında Eksik Makale

Yayınlanan: 2022-03-11

C++ geliştiricileri, sağlam, çok iş parçacıklı Qt uygulamaları oluşturmaya çalışır, ancak tüm bu yarış koşulları, senkronizasyon, kilitlenmeler ve canlı kilitler ile çoklu iş parçacığı hiçbir zaman kolay olmadı. Kredinize göre, pes etmiyorsunuz ve kendinizi StackOverflow'u araştırırken buluyorsunuz. Yine de, bir düzine farklı yanıttan doğru ve işe yarayan çözümü seçmek, özellikle her çözümün kendi dezavantajları olduğu düşünüldüğünde, oldukça önemsiz değildir.

Çoklu iş parçacığı, bir işlem bağlamında birden çok iş parçacığının var olmasına izin veren yaygın bir programlama ve yürütme modelidir. Bu iş parçacıkları işlemin kaynaklarını paylaşır ancak bağımsız olarak yürütülebilir. İş parçacıklı programlama modeli, geliştiricilere eşzamanlı yürütmenin yararlı bir soyutlamasını sağlar. Çok işlemli bir sistemde paralel yürütmeyi sağlamak için bir işleme birden çok iş parçacığı da uygulanabilir.

- Vikipedi

Bu makalenin amacı, özellikle en yanlış anlaşılan konular olmak üzere, Qt çerçevesi ile eşzamanlı programlama hakkında temel bilgileri bir araya getirmektir. Bir okuyucunun içeriği anlamak için Qt ve C++'da daha önce geçmişe sahip olması beklenir.

QThreadPool ve QThread kullanma arasında seçim yapma

Qt çerçevesi, çoklu kullanım için birçok araç sunar. Doğru aracı seçmek ilk başta zor olabilir, ancak aslında karar ağacı sadece iki seçenekten oluşur: ya Qt'nin konuları sizin için yönetmesini istersiniz ya da konuları kendiniz yönetmesini istersiniz. Ancak, başka önemli kriterler de var:

  1. Olay döngüsüne ihtiyaç duymayan görevler. Spesifik olarak, görev yürütme sırasında sinyal/yuva mekanizmasını kullanmayan görevler.
    Kullanım: QtConcurrent ve QThreadPool + QRunnable.

  2. Sinyal/yuva kullanan ve bu nedenle olay döngüsüne ihtiyaç duyan görevler.
    Kullanım: Çalışan nesneler + QThread'e taşındı.

Qt çerçevesinin büyük esnekliği, "eksik olay döngüsü" sorunu üzerinde çalışmanıza ve QRunnable bir tane eklemenize olanak tanır:

 class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }

Bununla birlikte, bu tür "geçici çözümler"den kaçınmaya çalışın, çünkü bunlar tehlikelidir ve verimli değildir: iş parçacığı havuzundaki (MyTask çalıştıran) iş parçacıklarından biri bir sinyal beklemekten dolayı engellenirse, havuzdaki diğer görevleri yürütemez.

alternatif metin

Ayrıca QThread::run() yöntemini geçersiz kılarak herhangi bir olay döngüsü olmadan bir QThread çalıştırabilirsiniz ve ne yaptığınızı bildiğiniz sürece bu gayet iyi. Örneğin, bu durumda quit() yönteminin çalışmasını beklemeyin.

Bir seferde bir görev örneğini çalıştırma

Bir seferde yalnızca bir görev örneğinin yürütülebildiğinden ve aynı görevi çalıştırmak için bekleyen tüm isteklerin belirli bir kuyrukta beklediğinden emin olmanız gerektiğini hayal edin. Bu genellikle, bir görev aynı dosyaya yazmak veya TCP soketini kullanarak paket göndermek gibi özel bir kaynağa erişirken gereklidir.

Bir an için bilgisayar bilimini ve üretici-tüketici modelini unutalım ve önemsiz bir şey düşünelim; gerçek projelerde kolayca bulunabilecek bir şey.

Bu soruna naif bir çözüm, bir QMutex kullanmak olabilir. Görev işlevinin içinde, görevi çalıştırmaya çalışan tüm iş parçacıklarını etkin bir şekilde seri hale getiren muteks'i kolayca elde edebilirsiniz. Bu, bir seferde yalnızca bir iş parçacığının işlevi çalıştırabileceğini garanti eder. Bununla birlikte, bu çözüm, yüksek çekişme sorunu ortaya çıkararak performansı etkiler, çünkü tüm bu iş parçacıkları devam etmeden önce (muteks üzerinde) engellenir. Böyle bir görevi aktif olarak kullanan ve aralarında bazı yararlı işler yapan çok sayıda iş parçacığınız varsa, tüm bu iş parçacıkları çoğu zaman uykuda olacaktır.

 void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }

Çekişmeyi önlemek için, bir kuyruğa ve kendi iş parçacığında yaşayan ve kuyruğu işleyen bir işçiye ihtiyacımız var. Bu hemen hemen klasik üretici-tüketici modelidir. İşçi ( tüketici ) kuyruktan istekleri tek tek alıyor olacaktır ve her üretici kendi isteklerini kuyruğa basitçe ekleyebilir. İlk başta kulağa basit geliyor ve QQueue ve QWaitCondition kullanmayı düşünebilirsiniz, ancak bekleyin ve bu ilkeller olmadan hedefe ulaşıp ulaşamayacağımızı görelim:

  • Bekleyen görevler kuyruğu olduğu için QThreadPool kullanabiliriz

Veya

  • QEventLoop'a sahip olduğu için varsayılan QEventLoop QThread::run() kullanabiliriz

İlk seçenek QThreadPool kullanmaktır. Bir QThreadPool örneği oluşturabilir ve QThreadPool::setMaxThreadCount(1) kullanabiliriz. O zaman istekleri programlamak için QtConcurrent::run() kullanabiliriz:

 class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };

Bu çözümün bir avantajı vardır: QThreadPool::clear() , örneğin uygulamanızın hızlı bir şekilde kapatılması gerektiğinde, bekleyen tüm istekleri anında iptal etmenize olanak tanır. Bununla birlikte, iş parçacığı afinitesine bağlı önemli bir dezavantaj da vardır: logEventCore işlevi, büyük olasılıkla, çağrıdan çağrıya farklı iş parçacıklarında yürütülecektir. Ve Qt'nin iş parçacığı afinitesi gerektiren bazı sınıfları olduğunu biliyoruz: QTimer , QTcpSocket ve muhtemelen diğerleri.

Qt spesifikasyonunun iş parçacığı yakınlığı hakkında söyledikleri: bir iş parçacığında başlatılan zamanlayıcılar, başka bir iş parçacığından durdurulamaz. Ve sadece bir soket örneğine sahip olan iş parçacığı bu soketi kullanabilir. Bu, onları başlatan iş parçacığında çalışan tüm zamanlayıcıları durdurmanız ve soketin sahibi olan iş parçacığında QTcpSocket::close() öğesini çağırmanız gerektiği anlamına gelir. Her iki örnek de genellikle yıkıcılarda yürütülür.

Daha iyi çözüm, QThread QEventLoop dayanır. Fikir basit: istekleri göndermek için bir sinyal/yuva mekanizması kullanıyoruz ve iş parçacığının içinde çalışan olay döngüsü, bir seferde yalnızca bir yuvanın yürütülmesine izin veren bir kuyruk görevi görecek.

 // the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };

LogWorker yapıcısının ve logEvent uygulanması basittir ve bu nedenle burada sağlanmamıştır. Şimdi iş parçacığını ve çalışan örneğini yönetecek bir hizmete ihtiyacımız var:

 // interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent("event"); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); } 

alternatif metin

Bu kodun nasıl çalıştığını tartışalım:

  • Yapıcıda bir iş parçacığı ve işçi örneği oluşturuyoruz. Yeni iş parçacığına taşınacağından, çalışanın bir üst öğe almadığına dikkat edin. Bu nedenle, Qt, çalışanın belleğini otomatik olarak serbest bırakamayacak ve bu nedenle, bunu QThread::finished sinyalini deleteLater yuvasına bağlayarak yapmamız gerekiyor. Ayrıca LogService::logEvent() proxy yöntemini, farklı iş parçacıkları nedeniyle Qt::QueuedConnection modunu kullanacak olan LogWorker::logEvent() ()'e bağlarız.
  • quit , exit olayını olay döngüsünün kuyruğuna koyarız. Bu olay, diğer tüm olaylar işlendikten sonra işlenecektir. Örneğin, yok edici çağrısından hemen önce yüzlerce logEvent() çağrısı yaptıysak, günlükçü Quit olayını getirmeden önce hepsini halledecektir. Bu elbette zaman alır, bu yüzden olay döngüsü çıkana kadar wait() . Quit olayından sonra gönderilen gelecekteki tüm günlük isteklerinin hiçbir zaman işlenmeyeceğinden bahsetmeye değer.
  • Günlüğe kaydetmenin kendisi ( LogWorker::logEvent ) her zaman aynı iş parçacığında yapılacaktır, bu nedenle bu yaklaşım iş parçacığı afinitesi gerektiren sınıflar için iyi çalışır. Aynı zamanda, LogWorker yapıcısı ve yıkıcısı ana iş parçacığında (özellikle LogService iş parçacığının çalıştığı iş parçacığı) yürütülür ve bu nedenle, orada hangi kodu çalıştırdığınız konusunda çok dikkatli olmanız gerekir. Özellikle, yıkıcıyı aynı iş parçacığında çalıştıramıyorsanız, işçinin yıkıcısında zamanlayıcıları durdurmayın veya yuvaları kullanmayın!

Aynı iş parçacığında işçinin yıkıcısını yürütme

Çalışanınız zamanlayıcılar veya soketlerle ilgileniyorsa, yıkıcının aynı iş parçacığında (işçi için oluşturduğunuz iş parçacığı ve işçiyi nereye taşıdığınız) yürütüldüğünden emin olmanız gerekir. Bunu desteklemenin bariz yolu, QThread alt sınıflara ayırmak ve QThread::run() yöntemi içindeki işçiyi delete . Aşağıdaki şablonu göz önünde bulundurun:

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };

Bu şablonu kullanarak LogService önceki örnekten yeniden tanımlıyoruz:

 // interface class LogService : public Thread<LogWorker> { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread<LogWorker>(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }

Bunun nasıl çalışması gerektiğini tartışalım:

  • LogService QThread nesnesi yaptık çünkü özel run() işlevini uygulamamız gerekiyordu. İş parçacığının yaşam döngüsünü dahili olarak kontrol etmek istediğimizden, QThread işlevlerine erişimi engellemek için özel alt sınıflama kullandık.
  • Thread::run() işlevinde, varsayılan QThread::run() uygulamasını çağırarak olay döngüsünü çalıştırırız ve olay döngüsü çıktıktan hemen sonra işçi örneğini yok ederiz. İşçinin yıkıcısının aynı iş parçacığında yürütüldüğünü unutmayın.
  • LogService::logEvent() , günlük olayını iş parçacığının olay kuyruğuna gönderecek olan proxy işlevidir (sinyal).

Konuları duraklatma ve devam ettirme

Bir başka ilginç fırsat, özel ileti dizilerimizi askıya alıp devam ettirebilmektir. Uygulamanızın, simge durumuna küçültüldüğünde, kilitlendiğinde veya ağ bağlantısını kaybettiğinde askıya alınması gereken bazı işlemler yaptığını hayal edin. Bu, çalışan yeniden başlatılıncaya kadar bekleyen tüm istekleri tutacak özel bir zaman uyumsuz kuyruk oluşturularak gerçekleştirilebilir. Ancak, en kolay çözümleri aradığımız için (tekrar) olay döngüsünün kuyruğunu aynı amaç için kullanacağız.

Bir iş parçacığını askıya almak için, belirli bir bekleme koşulunda beklememiz gerektiği açıktır. Eğer iş parçacığı bu şekilde engellenirse, olay döngüsü herhangi bir olayı işlemez ve Qt, kuyruğa almak zorundadır. Yeniden başlatıldığında, olay döngüsü birikmiş tüm istekleri işleyecektir. Bekleme koşulu için, aynı zamanda bir QMutex gerektiren QWaitCondition nesnesini kullanırız. Herhangi bir çalışan tarafından yeniden kullanılabilecek genel bir çözüm tasarlamak için, tüm askıya alma/devam ettirme mantığını yeniden kullanılabilir bir temel sınıfa koymamız gerekir. Buna SuspendableWorker diyelim. Böyle bir sınıf iki yöntemi destekleyecektir:

  • suspend() , bekleyen iş parçacığını bekleme koşuluna ayarlayan bir engelleme çağrısı olacaktır. Bu, kuyruğa bir askıya alma isteği göndererek ve işlenene kadar bekleyerek yapılır. QThread::quit() + wait() ile oldukça benzer.
  • resume() , yürütmeye devam etmek için uyuyan iş parçacığını uyandırmak için bekleme koşuluna işaret eder.

Arayüzü ve uygulamayı gözden geçirelim:

 // interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
 // implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }

Askıya alınmış bir iş parçacığının asla bir quit olayı almayacağını unutmayın. Bu nedenle, çıkış göndermeden önce diziye devam etmedikçe, bunu Vanilla QThread ile güvenli bir şekilde kullanamayız. Kurşun geçirmez hale getirmek için bunu özel Thread<T> şablonumuza entegre edelim.

alternatif metin

 template <typename TWorker> class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast<SuspendableWorker*>(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };

Bu değişikliklerle, Quit olayını göndermeden önce diziye devam edeceğiz. Ayrıca Thread<TWorker> , SuspendableWorker olup olmadığına bakılmaksızın herhangi bir tür çalışanın geçirilmesine izin verir.

Kullanım aşağıdaki gibi olacaktır:

 LogService logService; logService.logEvent("processed event"); logService.suspend(); logService.logEvent("queued event"); logService.resume(); // "queued event" is now processed.

uçucu vs atomik

Bu genellikle yanlış anlaşılan bir konudur. Çoğu kişi, volatile değişkenlerin, birden çok iş parçacığı tarafından erişilen belirli bayraklara hizmet etmek için kullanılabileceğine ve bunun veri yarışı koşullarından korunduğuna inanır. Bu yanlıştır ve bu amaç için QAtomic* sınıfları (veya std::atomic ) kullanılmalıdır.

Gerçekçi bir örnek düşünelim: adanmış bir iş parçacığında çalışan bir TcpConnection bağlantı sınıfı ve bu sınıfın iş parçacığı için güvenli bir yöntem vermesini istiyoruz: bool isConnected() . Dahili olarak, sınıf soket olaylarını dinleyecektir: bir dahili boole bayrağını korumak için connected ve disconnected :

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }

_connected üyeyi volatile yapmak sorunu çözmeyecek ve isConnected() iş parçacığı için güvenli hale getirmeyecek. Bu çözüm zamanın %99'unda işe yarayacak, ancak kalan %1'lik kısım hayatınızı kabusa çevirecek. Bunu düzeltmek için, değişken erişimini birden çok iş parçacığından korumamız gerekiyor. Bu amaçla QReadWriteLocker :

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }

Bu güvenilir bir şekilde çalışır, ancak "kilitsiz" atomik işlemler kullanmak kadar hızlı değildir. Üçüncü çözüm hem hızlı hem de iş parçacığı için güvenlidir (örnek, QAtomicInt yerine std::atomic kullanıyor, ancak bunlar anlamsal olarak aynıdır):

 // pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic<bool> _connected; }

Çözüm

Bu makalede, Qt çerçevesiyle eşzamanlı programlama hakkında birkaç önemli endişeyi tartıştık ve belirli kullanım durumlarını ele almak için çözümler tasarladık. Atomik ilkellerin kullanımı, okuma-yazma kilitleri ve diğerleri gibi basit konuların birçoğunu düşünmedik, ancak bunlarla ilgileniyorsanız, aşağıya yorumunuzu bırakın ve böyle bir öğretici isteyin.

Qmake'i keşfetmekle ilgileniyorsanız, yakın zamanda The Vital Guide to Qmake'i de yayınladım. Bu harika bir okuma!