Saya memiliki Layanan yang menggunakan pesan kafka dan memicu proses yang panjang. Diperlukan waktu lebih dari 10 menit untuk memproses satu pesan. Saat ini tidak ada pesan baru yang digunakan sampai metode "doSomething()" selesai. Bagaimana saya bisa menjadikan ini sebagai pemrosesan bersamaan dan memproses pesan secara paralel?

@Service
public class MyService {

    @KafkaListener(topics = "request-topic", groupId = "group_id"),
        containerFactory = "requirementsKafkaListenerFactory")    
    private void consumeKafkaRequirementsDataJson(KafkaRequirementsData kafkaRequirementsData) {
        System.out.println("Consumed JSON Message from kafka Topic: " + kafkaRequirementsData);
        doSomething(kafkaRequirementsData);
    }
1
Pablo Delgado 5 April 2021, 10:27

1 menjawab

Jawaban Terbaik

Anda dapat mengatur properti konkurensi untuk ini, tetapi jumlah utas per konsumen dibatasi oleh jumlah partisi untuk topik itu. Jika Anda sudah memiliki jumlah partisi yang layak, ini bisa menjadi cara yang harus dilakukan, tetapi saya tidak akan menambah jumlah partisi hanya demi paralelisme tugas yang panjang.

Anda dapat mencoba menerapkan multithreading pada aplikasi Anda, tetapi mengelola sumber daya akan berada di pihak Anda, selain mempertimbangkan bahwa setelah pesan digunakan dan utas dimulai, Anda akan bertanggung jawab atas pengelolaannya (termasuk mencoba ulang jika terjadi sesuatu salah).

Lihat https://howtoprogram.xyz/2016/ 29/05/create-multi-threaded-Apache-kafka-consumer/ dan https://www.confluent.io/blog/tutorial-getting-started-with-the-new-Apache-kafka-0- 9-consumer-client/ untuk info lebih lanjut

TL;DR, saya akan membuat keputusan berdasarkan:

  1. Jika cukup banyak partisi untuk dijalankan secara paralel tanpa menyumbat tugas Anda, setel properti konkurensi.
  2. Jika tidak cukup partisi, buat tugas Anda dan jalankan menggunakan kumpulan utas.
1
Gastón Marchetta 5 April 2021, 15:37