Saya sedang merancang lanskap Data baru, saat ini sedang mengembangkan Bukti konsep saya. Di sini saya menggunakan arsitektur berikut: Fungsi Azure --> hub acara Azure --> penyimpanan Azure Blob --> Pabrik Azure --> Azure databricks --> Server Azure SQL.

Apa yang saya perjuangkan saat ini adalah ide tentang bagaimana mengoptimalkan "pengambilan data" untuk memberi makan proses ETL saya di Azure Databricks.

Saya menangani data pabrik transaksional yang dijumlahkan per menit ke penyimpanan gumpalan Azure melalui saluran di depannya. Jadi saya berakhir dengan 86000 file setiap hari yang perlu ditangani. Memang, ini adalah sejumlah besar file terpisah untuk diproses. Saat ini saya menggunakan potongan kode berikut untuk membuat daftar nama file yang saat ini ada di penyimpanan gumpalan Azure. Selanjutnya, saya mengambilnya dengan membaca setiap file menggunakan loop.

Masalah yang saya hadapi adalah waktu yang dibutuhkan untuk proses ini. Tentu saja kita berbicara di sini tentang sejumlah besar file kecil yang perlu dibaca. Jadi saya tidak mengharapkan proses ini selesai dalam beberapa menit.

Saya sadar bahwa meningkatkan klaster databricks dapat menyelesaikan masalah, tetapi saya tidak yakin hanya itu yang akan menyelesaikannya, melihat jumlah file yang perlu saya transfer dalam kasus ini. Saya menjalankan kode berikut dengan databricks.

# Define function to list content of mounted folder
def get_dir_content(ls_path):
  dir_paths = ""
  dir_paths = dbutils.fs.ls(ls_path)
  subdir_paths = [get_dir_content(p.path) for p in dir_paths if p.isDir() and p.path != ls_path]
  flat_subdir_paths = [p for subdir in subdir_paths for p in subdir]
  return list(map(lambda p: p.path, dir_paths)) + flat_subdir_paths
filenames = []
paths = 0

mount_point = "PATH"

paths = get_dir_content(mount_point)
for p in paths:
#   print(p)
  filenames.append(p)

avroFile = pd.DataFrame(filenames)
avroFileList = avroFile[(avroFile[0].str.contains('.avro')) & (avroFile[0].str.contains('dbfs:/mnt/PATH'))]
avro_result = []
# avro_file = pd.DataFrame()
avro_complete = pd.DataFrame()
for i in avroFileList[0]:
  avro_file = spark.read.format("avro").load(i)
  avro_result.append(avro_file)

Akhirnya, saya melakukan penyatuan untuk semua file ini untuk membuat satu kerangka data dari mereka.

# Schema definiëren op basis van 
avro_df = avro_result[0]

# Union all dataframe
for i in avro_result:
  avro_df = avro_df.union(i)

display(avro_df)

Saya bertanya-tanya bagaimana cara mengoptimalkan proses ini. Alasan untuk keluaran per menit adalah bahwa kami berencana untuk membangun "wawasan hampir realtime" di kemudian hari, setelah kami memiliki arsitektur pelaporan analitis (yang kami hanya memerlukan proses harian) di tempat.

1
Richard_F96 10 Mei 2021, 15:54

2 jawaban

Jawaban Terbaik

Alih-alih membuat daftar file, dan kemudian membacanya secara terpisah, saya akan merekomendasikan untuk melihat ke Azure Databricks Autoloader sebagai gantinya. Ini mungkin menggunakan pemberitahuan untuk menemukan file baru apa yang diunggah ke penyimpanan gumpalan alih-alih membuat daftar file.

Itu juga akan bekerja dengan banyak file pada titik waktu, alih-alih membacanya satu per satu & melakukan penyatuan.

Jika Anda tidak memerlukan pemrosesan data yang berkelanjutan, Anda dapat menggunakan .trigger(once=True) untuk mengemulasi beban batch data.

0
Alex Ott 10 Mei 2021, 14:01

Ada beberapa cara untuk melakukannya, tetapi inilah yang akan saya lakukan:

Gunakan Fungsi Azure untuk memicu kode python Anda setiap kali gumpalan baru dibuat di akun penyimpanan Azure Anda. Ini akan menghapus bagian polling dari kode Anda dan akan mengirim data ke databricks segera setelah file tersedia di akun penyimpanan Anda

Untuk pelaporan hampir waktu nyata, Anda dapat menggunakan Azure Stream Analytics dan menjalankan kueri di Event Hub dan output ke Power Bi, misalnya.

0
Thiago Custodio 10 Mei 2021, 13:32