Saya mencoba menerapkan satu kueri sql sebagai transformasi dalam aliran data. Saya memuat tabel dari bigquery sebagai PCollection. Saya ingin menggabungkan data saya seperti kueri di bawah ini.

PILIH nama, identitas pengguna, tempat, SUM(jumlah) sebagai some_amount , SUM(biaya) sebagai sum_cost DARI [proyek:test.day_0_test] KELOMPOK OLEH 1,2,3 Bagaimana saya bisa menerapkannya dengan mudah. Saya mendengar bahwa Aliran data dengan dukungan Java menjalankan kueri jenis sql pada Koleksi P, tetapi python benar tidak mendukung. Adakah yang bisa membantu saya menyelesaikan ini

Catatan:

Saya ingin menerapkan kueri ini pada Koleksi P .. Tidak membaca dari bigquery secara langsung

1
geek 2 Januari 2018, 21:48

1 menjawab

Jawaban Terbaik

(Saya mengedit jawaban saya saat Anda mengomentari tidak ingin menjalankan kueri SQL secara langsung di BigQuery)

Saya mensimulasikan file input.csv yang berisi:

#input.csv
name1,1,place1,2.,1.5
name1,1,place1,3.,0.5
name1,1,place2,1.,1
name1,2,place3,2.,1.5
name2,2,place3,3.,0.5

Ini adalah data yang tampaknya Anda ambil dari BQ. Kueri SQL Anda dapat diimplementasikan di Beam seperti:

def sum_l(l):                       
    s0, s1 = 0, 0                                         
    for i in range(len(l)):                                        
        s0 += l[i][0]                                                      
        s1 += l[i][1]                
    return [s0, s1] 

with beam.Pipeline(options=po) as p:
     (p | 'Read Input' >> beam.io.ReadFromText("input.csv")
        | 'Split Commas' >> beam.Map(lambda x: x.strip().split(','))
        | 'Prepare Keys' >> beam.Map(lambda x: (x[:-2], map(float, x[-2:])))
        | 'Group Each Key' >> beam.GroupByKey()
        | 'Make Summation' >> beam.Map(lambda x: [x[0], sum_l([e for e in x[1]])])
        | 'Write Results' >> beam.io.WriteToText('results.csv'))

Hasil adalah:

#results.csv-00000-of-00001
[[u'name1', u'1', u'place2'], [1.0, 1.0]]
[[u'name1', u'2', u'place3'], [2.0, 1.5]]
[[u'name1', u'1', u'place1'], [5.0, 2.0]]
[[u'name2', u'2', u'place3'], [3.0, 0.5]]

Ini pada dasarnya adalah implementasi MapReduce langsung dari kueri Anda: sebuah kunci dibuat untuk setiap baris, mereka dikelompokkan bersama dan penjumlahan akhir terjadi dalam operasi Map menggunakan fungsi sum_l.

Saya tidak yakin mengapa Anda ingin menjalankan operasi kueri di Beam, bukan di BigQuery. Saya sarankan untuk mencoba kedua pendekatan karena mungkin tidak mungkin seefisien di Beam seperti yang Anda bisa lakukan di BigQuery dalam kasus ini.

3
Willian Fuks 3 Januari 2018, 15:13