Saya memiliki kode berikut yang ingin saya ubah dari menggunakan threadpool untuk menggunakan prosespoolExecutor karena itu semua perhitungan intensif CPU dan ketika saya mengamati monitor CPU saya perhatikan bahwa 8 prosesor inti saya hanya menggunakan satu utas.

import datetime
from multiprocessing.dummy import Pool as ThreadPool


def thread_run(q, clients_credit_array, clients_terr_array,
               freq_small_list, freq_large_list, clients, year, admin):
    claim_id = []
    claim_client_id = []
    claim_company_id = []
    claim_year = []
    claim_type = []
    claim_closed = []
    claim_cnt = []
    claim_amount = []

    print(datetime.datetime.utcnow())
    i = 0
    client_cnt = 1000
    loop_incr = 8
    while i < client_cnt:
        ind_rng = range(i, min((i + loop_incr), (client_cnt)), 1)
        call_var = []
        for q in ind_rng:
            call_var.append((q,
                             clients_credit_array,
                             clients_terr_array,
                             freq_small_list,
                             freq_large_list,
                             clients,
                             year,
                             admin))

        pool = ThreadPool(len(call_var))
        results = pool.map(call_claim, call_var)
        pool.close()
        pool.join()

        for result in results:
            if result[0] == []:
                pass
            else:
                r = 0
                if r < len(result[0]):
                    claim_index += 1
                    claim_id.append(claim_index)
                    claim_client_id.append(result[0][r])
                    claim_company_id.append(result[1][r])
                    claim_year.append(result[2][r])
                    claim_type.append(result[3][r])
                    claim_closed.append(result[4][r])
                    claim_cnt.append(result[5][r])
                    claim_amount.append(result[6][r])
                    r += 1
        i += loop_incr
    print(datetime.datetime.utcnow())

Namun, kesulitan yang saya alami adalah ketika saya memodifikasi kode sebagai berikut, saya mendapatkan pesan kesalahan:

from concurrent.futures import ProcessPoolExecutor as PThreadPool
        pool = PThreadPool(max_workers=len(call_var))
        #pool = ThreadPool(len(call_var))
        results = pool.map(call_claim, call_var)
        #pool.close()
        #pool.join()

Saya harus menghapus pool.close () dan pool.join () seperti yang dihasilkan kesalahan. Tetapi ketika saya menghapusnya, kode saya tidak memanfaatkan prosesor paralel dan berlari lebih lama dan lebih lambat daripada awalnya. Apa yang saya lewatkan?

0
C. Cooney 5 April 2021, 00:48

1 menjawab

Jawaban Terbaik

Seperti yang ditunjukkan dalam komentar, umum untuk melihat Executor digunakan sebagai bagian dari manajer konteks dan tanpa perlu untuk operasi join atau close. Di bawah ini adalah contoh yang disederhanakan untuk menggambarkan konsep.

Contoh:

import concurrent.futures
import random
import time
import os

values = [1, 2, 3, 4, 5]


def times_two(n):
    time.sleep(random.randrange(1, 5))
    print("pid:", os.getpid())
    return n * 2


def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = executor.map(times_two, values)

        for one_result in results:
            print(one_result)


if __name__ == "__main__":
    main()

Keluaran:

pid: 396
pid: 8904
pid: 25440
pid: 20592
pid: 14636
2
4
6
8
10
1
rhurwitz 5 April 2021, 05:43