Saya mendapatkan kesalahan yang sama dengan Tidak ada lokasi keluaran untuk shuffle saat bergabung dengan kerangka data besar di Spark SQL. Rekomendasinya adalah untuk menyetel MEMORY_AND_DISK dan/atau spark.shuffle.memoryFraction 0. Namun, spark.shuffle.memoryFraction tidak digunakan lagi di Spark >= 1.6.0 dan pengaturan MEMORY_AND_DISK seharusnya tidak membantu jika saya tidak melakukan caching RDD atau Dataframe, bukan? Saya juga mendapatkan banyak log PERINGATAN dan percobaan ulang tugas yang membuat saya berpikir bahwa pekerjaan itu tidak stabil.

Oleh karena itu, pertanyaan saya adalah:

  • Apa praktik terbaik untuk bergabung dengan kerangka data besar di Spark SQL >= 1.6.0?

Pertanyaan yang lebih spesifik adalah:

  • Bagaimana cara menyetel jumlah pelaksana dan spark.sql.shuffle.partitions untuk mencapai stabilitas/kinerja yang lebih baik?
  • Bagaimana menemukan keseimbangan yang tepat antara tingkat paralelisme (jumlah pelaksana/inti) dan jumlah partisi? Saya telah menemukan bahwa meningkatkan jumlah pelaksana tidak selalu merupakan solusi karena dapat menghasilkan pengecualian waktu membaca I/O karena lalu lintas jaringan.
  • Apakah ada parameter lain yang relevan untuk disetel untuk tujuan ini?
  • Pemahaman saya adalah bahwa menggabungkan data yang disimpan sebagai ORC atau Parket menawarkan kinerja yang lebih baik daripada teks atau Avro untuk operasi gabungan. Apakah ada perbedaan yang signifikan antara Parket dan ORC?
  • Apakah ada keuntungan dari SQLContext vs HiveContext mengenai stabilitas/kinerja untuk operasi gabungan?
  • Apakah ada perbedaan mengenai kinerja/stabilitas ketika kerangka data yang terlibat dalam penggabungan sebelumnya registerTempTable() atau saveAsTable()?

Sejauh ini saya menggunakan ini jawabannya dan bab ini sebagai titik awal. Dan ada beberapa halaman stackoverflow lainnya yang terkait dengan subjek ini. Namun saya belum menemukan jawaban yang komprehensif untuk masalah populer ini.

Terima kasih sebelumnya.

11
leo9r 23 Juni 2016, 12:34
1
Jawaban ini merekomendasikan untuk menyetel spark.sql.shuffle.partitions di atas 2000 ketika ada masalah memori pengacakan, karena Spark menggunakan struktur data yang berbeda untuk pembukuan acak ketika jumlah partisi lebih besar dari ambang batas itu: stackoverflow.com/a/36459198/2482894
 – 
leo9r
24 Juni 2016, 00:50
1
Menyetel spark.yarn.executor.memoryOverhead=1024 disarankan dalam jawaban ini: stackoverflow.com/a/33118489/2482894
 – 
leo9r
24 Juni 2016, 00:52

1 menjawab

Jawaban Terbaik

Itu banyak sekali pertanyaannya. Izinkan saya untuk menjawab ini satu per satu:

Jumlah pelaksana Anda adalah sebagian besar variabel waktu dalam lingkungan produksi. Hal ini tergantung pada sumber daya yang tersedia. Jumlah partisi penting ketika Anda melakukan shuffle. Dengan asumsi bahwa data Anda sekarang miring, Anda dapat menurunkan beban per tugas dengan meningkatkan jumlah partisi. Sebuah tugas idealnya harus mengambil beberapa minus. Jika tugas memakan waktu terlalu lama, ada kemungkinan wadah Anda dikosongkan dan pekerjaan hilang. Jika tugas hanya membutuhkan beberapa milidetik, overhead untuk memulai tugas menjadi dominan.

Tingkat paralelisme dan penyetelan ukuran pelaksana Anda, saya ingin merujuk ke panduan luar biasa dari Cloudera: https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part- 2/

ORC dan Parket hanya mengkodekan data saat istirahat. Saat melakukan penggabungan yang sebenarnya, data dalam format Spark dalam memori. Parket semakin populer sejak Netflix dan Facebook mengadopsinya dan berusaha keras di dalamnya. Parket memungkinkan Anda untuk menyimpan data lebih efisien dan memiliki beberapa pengoptimalan (predikat pushdown) yang digunakan Spark.

Anda harus menggunakan SQLContext daripada HiveContext, karena HiveContext sudah usang. SQLContext lebih umum dan tidak hanya berfungsi dengan Hive.

Saat melakukan registerTempTable, data disimpan dalam SparkSession. Ini tidak mempengaruhi eksekusi join. Apa yang disimpannya hanyalah rencana eksekusi yang dipanggil ketika suatu tindakan dilakukan (misalnya saveAsTable). Saat melakukan saveAsTable, data disimpan di sistem file terdistribusi.

Semoga ini membantu. Saya juga menyarankan menonton pembicaraan kami di Spark Summit tentang melakukan bergabung: https://www. youtube.com/watch?v=6zg7NTw-kTQ. Ini mungkin memberi Anda beberapa wawasan.

Semangat, Fokko

7
Fokko Driesprong 14 November 2017, 12:29
Saya telah mencari Anda sejak saya melihat presentasi Anda di Iterative Broadcast bergabung. Itu sangat bagus. Saya dapat melakukan sort merge join dengan dua dataframe besar dalam batch dan tampaknya berfungsi dengan baik. link di bawah stackoverflow.com/questions/53524062/efisien-pyspark-join /…..
 – 
vikrant rana
13 Mei 2019, 14:07
Saya ingin menggunakan teknik yang sama dengan gabungan siaran berulang tetapi tidak bisa mendapatkan cara menghapus partisi siaran dari memori sebelum kami menyiarkan kumpulan berikutnya dari kerangka data kecil. Bisakah Anda memberikan masukan berharga Anda untuk pertanyaan di bawah ini. lihat tautan di bawah stackoverflow.com/questions/53784272/… .. Bantuan apa pun akan sangat dihargai. Terima kasih
 – 
vikrant rana
13 Mei 2019, 14:07
Bantuan apapun! Saya berjuang untuk mengkodekan hash siaran iteratif bergabung
 – 
vikrant rana
16 Mei 2019, 10:59