Saya memiliki skrip ini untuk mengirim banyak (seperti 100.000++) catatan dari sebuah tabel. Terkadang saya mendapatkan kesalahan pengecualian 'java heap space'. Jadi saya ingin memiliki beberapa penanganan kesalahan, dan memecah muatan data menjadi potongan yang lebih kecil dan mengirim melalui permintaan POST. Skrip saya saat ini terkadang berfungsi, tetapi seringkali mengembalikan kesalahan ruang tumpukan itu.

import json
import requests
from multiprocessing import Pool

url = "some url"
header = {"Content-Type": "application/json", 'Accept':'application/json'}
api_key="some key"

def send_request(data):
    try:
      res = requests.post(url, headers=header, json={"api_key": api_key, "attributes": data})
      print(res.status_code)
      print(res.json())
    except requests.exceptions.RequestException as e:
      #catastrophic error. bail.
      print(e)

def chunks(data):
    for i in range(0, len(data), 50):
        yield data[i:i + 50]



p = Pool(8)
p.map(send_request, chunks(json.loads(data)))
3
Dametime 3 Juni 2020, 11:04

1 menjawab

Jawaban Terbaik

Dari multiprocessing dokumentasi :

peta (func, iterable[, chunksize])

...

Perhatikan bahwa ini dapat menyebabkan penggunaan memori yang tinggi untuk iterable yang sangat lama. Pertimbangkan untuk menggunakan imap() atau imap_unordered() dengan chunksize eksplisit pilihan untuk efisiensi yang lebih baik.

(Penekanan ditambahkan)

Lalu:

imap (func, iterable[, chunksize])

Versi peta yang lebih malas().

Argumen chunksize sama dengan yang digunakan oleh metode map() . Untuk iterable yang sangat lama menggunakan nilai chunksize yang besar dapat membuat pekerjaan selesai lebih cepat daripada menggunakan nilai default 1.

Seberapa hebat itu? imap melakukan chunking untuk Anda, jadi Anda tidak perlu khawatir membuat fungsi Anda sendiri.

Jadi, mari kita uji!

from multiprocessing import Pool
from datetime import datetime
import sys

testObj = [{ "data": list(range(5000)) }] * 250000

def doFunc(data):
    # arbitrary test function, take square root of all numbers
    for item in data:
        for num in item['data']:
            num ** 0.5

def chunks(data):
    for i in range(0, len(data), 50):
        yield data[i:i + 50]

p = Pool(8)

start =  datetime.now()
mapTest = p.map(doFunc, chunks(testObj))
print("map took ", datetime.now() - start)
print("sizeof map: ", sys.getsizeof(mapTest))

start = datetime.now()
imapTest = p.imap(doFunc, testObj, 50)
print("imap took ", datetime.now() - start)
print("sizeof imap: ", sys.getsizeof(imapTest))

start = datetime.now()
imapOTest = p.imap_unordered(doFunc, testObj, 50)
print("imap_unordered took ", datetime.now() - start)
print("sizeof imap_unordered: ", sys.getsizeof(imapOTest))

Hasil (rata-rata 50 iterasi):

map took  0:00:26.61296 
sizeof map:  40072 
imap took  0:00:00.00106
sizeof imap:  128 
imap_unordered took  0:00:00.00108
sizeof imap_unordered:  128

Itu akan menjadi 26,6 detik vs 0,001 detik. Tetapi yang lebih penting untuk apa yang Anda cari, lihat penghematan memori! Hasil yang cukup signifikan dalam tes yang sangat tidak ilmiah ini.

Anda tidak akan melihat penghematan waktu yang sama karena permintaan POST tidak dapat dipercepat seperti perhitungan akar kuadrat, tetapi mudah-mudahan beralih ke imap akan membantu Anda secara signifikan, terutama di bagian memori.

3
jdaz 6 Juni 2020, 04:35