Kamus dan kode di bawah ini berfungsi

  • Pertanyaannya terutama tentang multithreading, saya tahu bahwa kode di bawah ini dapat dengan mudah ditulis ulang dengan cara yang mudah. Tapi pertanyaannya ada di multithreading, saya baru saja membuat satu contoh yang berfungsi untuk diuji
  • total(todos) adalah fungsi utama
  • user_count , title_count , complete_count tidak tergantung satu sama lain
  • Saya perlu mengimplementasikan multithreading/multiprocessing harus mengimplementasikan
  • def total(todos): adalah tempat di mana perlu melakukan multithreading
todos = [{'userId': 1, 'id': 1, 'title': 'A', 'completed': False},
     {'userId': 1, 'id': 2, 'title': 'B ', 'completed': False},
     {'userId': 1, 'id': 1, 'title': 'C', 'completed': False},
     {'userId': 1, 'id': 2, 'title': 'A', 'completed': True},
     {'userId': 2, 'id': 1,'title': 'B', 'completed': False}]
def total(todos):
    ###### Multithreading need to implement ##########
    user_count = userid(todos)
    title_count = title(todos)
    complete_count = completed(todos)
    search_count_all = {**user_count, **title_count, **complete_count}
    return search_count_all
def userid(todos):    
    for d in todos:
        for l, m in d.items():  
            super_dict.setdefault(l, []).append(m)
    d = {k:len(set(v)) for k,v in super_dict.items()}
    return {"userid":d['userId']}
def title(todos):    
    for d in todos:
        for l, m in d.items():  
            super_dict.setdefault(l, []).append(m)
    d = {k:len(set(v)) for k,v in super_dict.items()}
    return {"title":d['title']}
def completed(todos):    
    for d in todos:
        for l, m in d.items():  
            super_dict.setdefault(l, []).append(m)
    d = {k:len(set(v)) for k,v in super_dict.items()}
    return {"completed":d['completed']}

total(todos)

Output saat ini dan output yang diharapkan

{'userid': 2, 'title': 4, 'completed': 2}

Dapatkah kita melakukan multiprocessing juga?

dari joblib import Paralel, tertunda

1
user14257643 6 Januari 2021, 07:21

3 jawaban

Jawaban Terbaik

Seperti yang ditentukan jawaban mandulaj, Anda dapat menggunakan perpustakaan sebagai berikut. Saya menyederhanakan contoh, jadi lebih mudah untuk bernalar.

Saya menulis 3 fungsi yang berbeda, di mana masing-masing hanya mengalikan nilai input dengan angka yang berbeda. Kemudian menggunakan ThreadPoolExecutor saya mengirimkan eksekusi mereka , secara bersamaan.

Anda kemudian perlu mengumpulkan hasil dari setiap eksekusi. Ini dilakukan dengan metode as_completed dari perpustakaan.

from concurrent.futures import ThreadPoolExecutor, as_completed


def func1(i):
    return i


def func2(i):
    return i*2


def func3(i):
    return i*3


funcs_to_execute = [func1, func2, func3]
arg = 1
res = []

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = {executor.submit(f, arg) for f in funcs_to_execute}

    for fut in as_completed(futures):
        res.append(fut.result())

print(res)

>[2, 1, 3]

Untuk menerapkannya pada kasus penggunaan Anda, fungsi yang Anda kirimkan adalah userid, title, completed dengan argumen todo. Hasil dari semuanya berada di wadah daftar res.

Namun, ada sedikit masalah dengan kode Anda saat Anda mengaktifkan konkurensi. Saat ini bekerja untuk Anda, karena berjalan secara berurutan. superdict dibagikan, dan ketika diakses secara bersamaan, tidak dijamin. Anda dapat memperbaikinya dengan membuat daftar baru di setiap fungsi, mengembalikannya sebagai hasilnya, dan pada akhirnya, agregat. Jadi sesuatu seperti ini

def userid(todos):
    res = list()
    for d in todos:
        for l, m in d.items():
            if l == 'userid':
                res.append(m)

    return {"userid": res}

Dan menggabungkan hasil dari futures (yaitu, kamus) menjadi satu superdict.

1
Chen A. 9 Januari 2021, 08:06

Lihat pustaka concurrent.futures python. Anda dapat menjalankan fungsi Anda sebagai utas dengan

import concurrent.futures

with concurrent.futures.ThreadPoolExecutor() as executor:
    future1 = executor.submit(userid, todos)
    future2 = executor.submit(title, todos)
    # ...

    user_count = future1.result()
    title_count = future2.result()
0
mandulaj 6 Januari 2021, 04:51

Seperti yang saya sebutkan dalam komentar saya untuk pertanyaan Anda, Anda benar-benar harus menggunakan multiprocessing daripada multithreading karena tugas Anda intensif CPU. Masalahnya kemudian menjadi berbagi kamus super_dict global karena, tidak seperti multithreading, setiap subproses memiliki ruang memorinya sendiri dan super_dict sekarang harus ditempatkan di memori bersama. Cara untuk menginisialisasi ini adalah dengan menggunakan instance dict yang dikembalikan oleh multiprocessing.SyncManager dan kemudian menginisialisasi setiap subproses dengan proxy ke kamus terkelola ini saat kumpulan proses dibuat. Metode ini akan bekerja untuk platform berbasis Windows dan Unix. Versi kamus ini memiliki beberapa keterbatasan. Tampaknya tidak mendukung metode setdefault dan metode append seperti yang Anda inginkan karena proxy tampaknya tidak menyadari bahwa kamus sebenarnya telah diperbarui. Jadi, beberapa penyesuaian harus dilakukan.

Tetapi ada inefisiensi besar dalam kode Anda. Setiap fungsi Anda userId, title dan completed melakukan pemrosesan redundan yang sama dari daftar todos terhadap kamus super_dict menambahkan beberapa kemunculan id pengguna, judul , dll. Ini pada akhirnya tidak menjadi masalah karena Anda menghitung jumlah nilai unik. Tetapi pemborosan siklus CPU untuk melakukan ini secara berlebihan dan dalam skenario multiprosesor Anda sekarang memiliki masalah dalam menangani pembaruan kamus paralel yang tidak memberi Anda apa-apa. Oleh karena itu kode berikut hanya menginisialisasi super_dict dalam proses utama. Dan fungsi Anda userId, title dan completed melakukan lebih banyak penghitungan daripada yang diperlukan, yang dapat disederhanakan lebih lanjut (lihat di bawah).

Ketika seseorang selesai meningkatkan efisiensi kode, itu tidak membayar untuk benar-benar repot dengan multiprocessing untuk contoh khusus ini karena biaya pembuatan pool dan kamus proxy lebih besar daripada penghematan apa pun dari menjalankan tiga fungsi sepele ini di paralel. Tapi ini, setidaknya, menunjukkan teknik untuk contoh nyata.

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager


def init_pool(d):
    global super_dict
    super_dict = d

def total(todos):
    # Call to Manager() returns a SyncManger instance:
    with Manager() as manager:
        d = manager.dict()
        init_dict(d)
        with ProcessPoolExecutor(max_workers=3, initializer=init_pool, initargs=(d,)) as executor:
            f1 = executor.submit(userid)
            f2 = executor.submit(title)
            f3 = executor.submit(completed)
            user_count = f1.result()
            title_count = f2.result()
            complete_count = f3.result()
            search_count_all = {**user_count, **title_count, **complete_count}
            return search_count_all

def init_dict(the_dict):
    for d in todos:
        for l, m in d.items():
            # Cannot use setdefault:
            if l in the_dict:
                # Use this instead of append:
                the_dict[l] += [m]
            else:
                the_dict[l] = [m]

def userid():
    return {'userId': len({v for v in super_dict['userId']})}

def title():
    return {'title': len({v for v in super_dict['title']})}

def completed():
    return {'completed': len({v for v in super_dict['completed']})}

if __name__ == '__main__':
    todos = [{'userId': 1, 'id': 1, 'title': 'A', 'completed': False},
         {'userId': 1, 'id': 2, 'title': 'B ', 'completed': False},
         {'userId': 1, 'id': 1, 'title': 'C', 'completed': False},
         {'userId': 1, 'id': 2, 'title': 'A', 'completed': True},
         {'userId': 2, 'id': 1,'title': 'B', 'completed': False}]
    print(total(todos))
0
Booboo 12 Januari 2021, 14:39