Apa yang saya coba capai:

Memparalelkan fungsi yang memunculkan sejumlah utas per panggilan, seperti ini:

 - PROCESS01 -> 16 Threads
 - PROCESS02 -> 16 Threads
 - ...
 - PROCESSn -> 16 Threads

Kode:

with multiprocessing.Pool(4) as process_pool:
    results = process_pool.map(do_stuff, [drain_queue()])

Di mana drain_queue() mengembalikan daftar item dan

do_stuff(item_list):
    print('> PID: ' + str(os.getpid()))
    with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
        result_dict = {executor.submit(thread_function, item): item for item in item_list}
        for future in concurrent.futures.as_completed(result_dict):
            pass

Dan thread_function() memproses setiap item yang diteruskan ke sana.

Namun, ketika dieksekusi output kode seperti ini:

> PID: 1000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
> PID: 2000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
> PID: 3000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
> PID: 3000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)

Berikut adalah screenshot dari Task Manager

Apa yang kulewatkan di sini? Saya tidak tahu mengapa tidak bekerja seperti yang diharapkan. Terima kasih!

0
Temperosa 6 Mei 2018, 20:43

1 menjawab

Jawaban Terbaik

Saya telah menemukan masalahnya. Argumen kedua map() diharapkan dapat diubah, di mana dalam kasus saya adalah daftar yang berisi objek tunggal.

Apa yang salah ? Ini: [drain_queue()] , yang menghasilkan daftar dengan satu objek di dalamnya.

Dalam hal ini, kode

with multiprocessing.Pool(4) as process_pool:
    results = process_pool.map(do_stuff, [drain_queue()])

Memaksa multiprocessing.Pool.map untuk "mendistribusikan" satu objek ke satu proses, meskipun menciptakan n jumlah proses, pekerjaan akan tetap dilakukan oleh satu proses. Untungnya tidak ada hubungannya dengan keterbatasan GIL.

3
Temperosa 7 Mei 2018, 14:24