Saya belajar dari Blog ini dan < a href="https://kafka-tutorials.confluent.io/window-final-result/kstreams.html" rel="nofollow noreferrer">tutorial ini bahwa untuk menguji penekanan dengan semantik waktu peristiwa, seseorang harus mengirim catatan dummy untuk memajukan waktu streaming. Saya telah mencoba untuk memajukan waktu dengan melakukan hal itu. Tapi ini sepertinya tidak berhasil kecuali waktu dimajukan untuk kunci tertentu.

Saya memiliki TimestampExtractor khusus yang mengaitkan "waktu streaming" pilihan saya dengan catatan. Pseudocode topologi stream saya adalah sebagai berikut (saya menggunakan Kafka Streams DSL API):

    source.mapValues(someProcessingLambda)
          .flatMap(flattenRecordsLambda)
          .groupByKey(Grouped.with(Serdes.ByteArray(), Serdes.ByteArray()))
          .windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(Duration.ZERO))
          .aggregate(()->null, aggregationLambda)
          .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));

Masukan saya dalam format berikut:

   1 - {"stream_time":"2019-04-09T11:08:36.000-04:00", id:"1", data:"..."}
   2 - {"stream_time":"2019-04-09T11:09:36.000-04:00", id:"1", data:"..."}
   3 - {"stream_time":"2019-04-09T11:18:36.000-04:00", id:"2", data:"..."}
   4 - {"stream_time":"2019-04-09T11:19:36.000-04:00", id:"2", data:"..."}
    .
    .

Sekarang catatan 1 dan 2 milik jendela 10 menit menurut stream_time dan 3 dan 4 milik yang lain. Dalam jendela itu, catatan dikumpulkan sesuai id. Saya berharap bahwa catatan 3 akan menandakan bahwa aliran telah maju dan menyebabkan penekanan untuk memancarkan data yang sesuai dengan jendela pertama. Namun, data tidak dipancarkan sampai saya mengirim catatan dummy dengan id:1 untuk memajukan waktu streaming untuk kunci itu.

Apakah saya salah memahami instruksi pengujian? Apakah ini perilaku yang diharapkan? Apakah kunci dari catatan dummy itu penting?

2
ConfusedSoul 9 Juli 2020, 02:43

1 menjawab

Jawaban Terbaik

Saya minta maaf untuk masalah ini. Ini memang masalah yang rumit. Saya memiliki beberapa ide untuk menambahkan beberapa operasi untuk mendukung pengujian integrasi semacam ini, tetapi sulit dilakukan tanpa melanggar semantik waktu pemrosesan aliran dasar.

Sepertinya Anda sedang menguji aplikasi KafkaStreams "asli", sebagai lawan dari pengujian dengan TopologyTestDriver. Saran pertama saya adalah Anda akan memiliki waktu yang jauh lebih baik untuk memvalidasi semantik aplikasi Anda dengan TopologyTestDriver, jika memenuhi kebutuhan Anda.

Kedengarannya bagi saya seperti Anda mungkin memiliki lebih dari satu partisi dalam topik input Anda (dan karenanya aplikasi Anda). Jika kunci 1 pergi ke satu partisi, dan kunci 3 pergi ke yang lain, Anda akan melihat apa yang telah Anda amati. Setiap partisi aplikasi Anda melacak waktu streaming secara independen. TopologyTestDriver bekerja dengan baik karena hanya menggunakan satu partisi, dan juga karena memproses data secara sinkron. Jika tidak, Anda harus membuat pesan kemajuan waktu "dummy" Anda untuk pergi ke partisi yang sama dengan kunci yang Anda coba hapus.

Ini akan menjadi sangat rumit karena "flatMap().groupByKey()" Anda akan mempartisi ulang data. Anda harus membuat pesan dummy sehingga masuk ke partisi yang tepat setelah partisi ulang. Atau Anda bisa bereksperimen dengan menulis pesan dummy Anda langsung ke topik partisi ulang.

Jika Anda perlu menguji dengan KafkaStreams alih-alih TopologyTestDriver, saya kira hal termudah adalah menulis pesan "kemajuan waktu" per kunci, seperti yang Anda sarankan dalam pertanyaan Anda. Bukan karena itu sangat diperlukan, tetapi karena ini adalah cara termudah untuk memenuhi semua peringatan ini. Saya juga akan menyebutkan bahwa kami sedang mengerjakan beberapa peningkatan umum untuk penanganan waktu streaming di Kafka Streams yang seharusnya menyederhanakan situasi secara signifikan, tetapi itu tidak membantu Anda saat ini, tentu saja.

2
John Roesler 13 Juli 2020, 14:40