Saya memiliki kelas DataClass yang memiliki beberapa data yang perlu generated dan cleaned. Saya tidak tahu kapan selama pelaksanaan program saya akan memiliki data baru, dan saya mencoba menggunakan beberapa utas untuk memungkinkan utas utama untuk melanjutkan saat data sedang diproses.

Ini adalah DataClass:

class DataClass
{
public:
    unsigned int state{0};

    
    void Generate()
    {
        using namespace std::chrono_literals;
        std::this_thread::sleep_for(3s);
        state = 1;
    }

    void Clean()
    {
        using namespace std::chrono_literals;
        std::this_thread::sleep_for(1s);
        state = 2;
    }
};

Saya memegang setiap objek DataClass menjadi dua std::deque, satu dengan yang perlu dibuat dan satu lagi dengan yang perlu dibersihkan.

std::deque<DataClass*> dataToGenerate;
std::deque<DataClass*> dataToClean;

Saya menggunakan dua fungsi CleanerFunction dan GeneratorFunction yang akan memproses konten dari dua daftar.

GeneratorFunction:

void GeneratorFunction()
{
    while (!dataToGenerate.empty())
    {
        auto* c = dataToGenerate.front();
        c->Generate();
        dataToGenerate.pop_front();
        dataToClean.push_back(c);
        std::cout << "Generated one Data Piece." << std::endl;
    }
}

(Yang lebih bersih serupa).

Dan di fungsi utama saya memulai dua utas untuk dua fungsi. Tetapi utas berhenti seketika karena dua daftar kosong dan jadi saya perlu membuat yang baru setiap kali. Saya telah melihat condition_variables tetapi saya tidak dapat membuatnya berfungsi dengan baik karena semua contoh yang saya temukan online berbeda dari skenario ini. Sejauh yang saya mengerti jika saya menggunakan while(!dataToGenerate.empty()) saya akan mengeksekusi baris itu selalu mengisi utas tanpa alasan yang sebenarnya, jadi saya ingin tidak menggunakan metode ini.

Apakah ada cara agar saya dapat menjeda setiap utas hingga daftar tidak kosong lagi dan kemudian memulai utas?

1
Fabrizio 11 Juli 2020, 23:44

1 menjawab

Jawaban Terbaik

Rasio XY dari pertanyaan ini tampaknya agak tinggi. Saya telah membaca tentang antrian tugas dan penguncian dan ulangi solusi Anda dalam hal itu.

Secara khusus, Anda akan menemukan ini dekat dengan pola produsen/konsumen.

Antre

Mari kita mulai dengan antrian penguncian generik minimal:

template <typename T>
struct Queue {
    Queue(size_t max = 50) : _max(max) {}

    size_t enqueue(T v) {
        std::unique_lock lk(_mx);
        _cond.wait(lk, [this] { return (_max == 0) || (_storage.size() < _max); });

        _storage.push_back(std::move(v));
        _cond.notify_one();
        return _storage.size(); // NOTE: very racy load indicator
    }

    template <typename Duration>
    std::optional<T> dequeue(Duration d) {
        std::unique_lock lk(_mx);

        if (_cond.wait_for(lk, d, [this] { return !_storage.empty(); })) {
            auto top = std::move(_storage.front());

            _storage.pop_front();
            _cond.notify_one();

            return top;
        }

        return std::nullopt;
    }

  private:
    size_t _max;
    mutable std::mutex _mx;
    mutable std::condition_variable _cond;
    std::deque<T> _storage;
};

Ini tidak pernah memblokir dequeue (sehingga Anda dapat mendeteksi dan menangani antrian kosong). Ini pada prinsipnya tidak akan memblokir enqueue kecuali batas tertentu telah tercapai. Buat batas 0 untuk memiliki antrian yang tidak terbatas.

Sekarang Anda dapat memiliki sejumlah antrian dengan sejumlah produsen/konsumen yang Anda inginkan. Misalnya.:

Logika Program

struct DataClass {
    int id;
    unsigned int state{ 0 };

    DataClass(int id) : id(id) {}

    void Generate() { sleep_for(3s); state = 1; }
    void Clean()    { sleep_for(1s); state = 2; }
};

Sekarang mari kita buat program dengan 4 thread generator dan 2 thread cleaner, memonitor 2 antrian (genTasks dan cleanTasks).

struct Program {
    Program() {
        auto worker_id = 1;
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
    }

    size_t createWork(DataClass task) {
        return genTasks.enqueue(std::move(task));
    }

    ~Program() {
        _shutdown = true;
        for (auto& th: _workers) 
            if (th.joinable()) th.join();
    }

  private:
    Queue<DataClass> genTasks, cleanTasks;
    std::atomic_bool _shutdown { false };
    std::list<std::thread> _workers;

    void generate_worker(int worker_id) {
        while (!_shutdown) {
            while (auto task = genTasks.dequeue(1s)) {
                std::cout << "Worker #" << worker_id << " Generate: " << task->id << std::endl;
                task->Generate();
                cleanTasks.enqueue(std::move(*task));
            }
        }
        std::cout << "Worker #" << worker_id << " Exit generate_worker" << std::endl;
    }

    void clean_worker(int worker_id) {
        while (!_shutdown) {
            while (auto task = cleanTasks.dequeue(1s)) {
                std::cout << "Worker #" << worker_id << " Clean: " << task->id << std::endl;
                task->Clean();
                std::cout << "Worker #" << worker_id << " Done: " << task->id << std::endl;
            }
        }
        std::cout << "Worker #" << worker_id << " Exit clean_worker" << std::endl;
    }
};

Saya menambahkan tanda _shutdown untuk ukuran yang baik, meskipun tidak terlalu kuat (menunggu sampai pekerja menganggur setidaknya selama satu detik (dequeue(1s)). Jika Anda ingin shutdown yang lebih mengganggu, taburkan if (_shutdown) break; pernyataan melalui loop pekerja.

Demo Lengkap

Mari kita kendarai dengan beberapa pekerjaan:

int main() {
    Program p; 

    for (auto i : {1,2,3,4,5,6,7,8,9,10}) {
        sleep_for((rand()%100) * 1ms);
        p.createWork(i);
    }

    sleep_for(2.5s);
    std::cout << "Load at createWork(42) is ~" << p.createWork(42) << std::endl;
    sleep_for(2.5s);
    std::cout << "Load at createWork(43) is ~" << p.createWork(43) << std::endl;

    sleep_for(4s);
    std::cout << "Initiating shutdown\n";
    // Program destructor performs shutdown
}

Cetakan

Live On Coliru

Worker #2 Generate: 1
Worker #1 Generate: 2
Worker #3 Generate: 3
Worker #4 Generate: 4
Worker #2 Generate: 5
Worker #5 Clean: 1
Load at createWork(42) is ~6
Worker #1 Generate: 6
Worker #6 Clean: 2
Worker #3 Generate: 7
Worker #4 Generate: 8
Worker #5 Done: 1
Worker #5 Clean: 3
Worker #6 Done: 2
Worker #6 Clean: 4
Worker #5 Done: 3
Worker #6 Done: 4
Load at createWork(43) is ~4
Worker #2 Generate: 9
Worker #5 Clean: 5
Worker #1 Generate: 10
Worker #6 Clean: 6
Worker #3 Generate: 42
Worker #4 Generate: 43
Worker #5 Done: 5
Worker #5 Clean: 7
Worker #6 Done: 6
Worker #6 Clean: 8
Worker #5 Done: 7
Worker #6 Done: 8
Worker #5 Clean: 9
Worker #6 Clean: 10
Initiating shutdown
Worker #2 Exit generate_worker
Worker #5 Done: 9
Worker #5 Clean: 42
Worker #1 Exit generate_worker
Worker #6 Done: 10
Worker #6 Clean: 43
Worker #3 Exit generate_worker
Worker #4 Exit generate_worker
Worker #5 Done: 42
Worker #6 Done: 43
Worker #5 Exit clean_worker
Worker #6 Exit clean_worker
Unfinished generate/clean tasks: 0/0

Daftar Lengkap

Live On Coliru

#include <mutex>
#include <condition_variable>
#include <deque>
#include <optional>

template <typename T>
struct Queue {
    Queue(size_t max = 50) : _max(max) {}

    size_t enqueue(T v) {
        std::unique_lock lk(_mx);
        _cond.wait(lk, [this] { return (_max == 0) || (_storage.size() < _max); });

        _storage.push_back(std::move(v));
        _cond.notify_one();
        return _storage.size(); // NOTE: very racy load indicator
    }

    template <typename Duration>
    std::optional<T> dequeue(Duration d) {
        std::unique_lock lk(_mx);

        if (_cond.wait_for(lk, d, [this] { return !_storage.empty(); })) {
            auto top = std::move(_storage.front());

            _storage.pop_front();
            _cond.notify_one();

            return top;
        }

        return std::nullopt;
    }

    size_t size() const { // racy in multi-thread situations
        std::unique_lock lk(_mx);
        return _storage.size();
    }

  private:
    size_t _max;
    mutable std::mutex _mx;
    mutable std::condition_variable _cond;
    std::deque<T> _storage;
};

#include <chrono>
#include <thread>
#include <iostream>
#include <list>
#include <atomic>
using namespace std::chrono_literals;
static inline auto sleep_for = [](auto d) { std::this_thread::sleep_for(d); };

struct DataClass {
    int id;
    unsigned int state{ 0 };

    DataClass(int id) : id(id) {}
    //DataClass(DataClass&&) = default;
    //DataClass& operator=(DataClass&&) = default;
    //DataClass(DataClass const&) = delete;

    void Generate() { sleep_for(3s); state = 1; }
    void Clean()    { sleep_for(1s); state = 2; }
};

struct Program {
    Program() {
        auto worker_id = 1;
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
    }

    size_t createWork(DataClass task) {
        return genTasks.enqueue(std::move(task));
    }

    ~Program() {
        _shutdown = true;
        for (auto& th: _workers) 
            if (th.joinable()) th.join();
        std::cout << "Unfinished generate/clean tasks: " << genTasks.size() << "/" << cleanTasks.size() << "\n";
    }

  private:
    Queue<DataClass> genTasks, cleanTasks;
    std::atomic_bool _shutdown { false };
    std::list<std::thread> _workers;

    void generate_worker(int worker_id) {
        while (!_shutdown) {
            while (auto task = genTasks.dequeue(1s)) {
                std::cout << "Worker #" << worker_id << " Generate: " << task->id << std::endl;
                task->Generate();
                cleanTasks.enqueue(std::move(*task));
            }
        }
        std::cout << "Worker #" << worker_id << " Exit generate_worker" << std::endl;
    }

    void clean_worker(int worker_id) {
        while (!_shutdown) {
            while (auto task = cleanTasks.dequeue(1s)) {
                std::cout << "Worker #" << worker_id << " Clean: " << task->id << std::endl;
                task->Clean();
                std::cout << "Worker #" << worker_id << " Done: " << task->id << std::endl;
            }
        }
        std::cout << "Worker #" << worker_id << " Exit clean_worker" << std::endl;
    }
};

int main() {
    Program p; 

    for (auto i : {1,2,3,4,5,6,7,8,9,10}) {
        sleep_for((rand()%100) * 1ms);
        p.createWork(i);
    }

    sleep_for(2.5s);
    std::cout << "Load at createWork(42) is ~" << p.createWork(42) << std::endl;
    sleep_for(2.5s);
    std::cout << "Load at createWork(43) is ~" << p.createWork(43) << std::endl;

    sleep_for(4s);
    std::cout << "Initiating shutdown\n";
    // Program destructor performs shutdown
}
0
sehe 11 Juli 2020, 22:12