Saya menggunakan Kafka Streams 2.6.0 (dalam Spring Boot) dan mengalami masalah yang sangat aneh. Saya mencoba melakukan operasi stateful (pengelompokan dan agregasi) di aliran:

        freeTextSignPartialUpdateStream
                .groupBy((key, value) -> value.getObjectId(), Grouped.with(Serdes.Long(), FTS_PARTIAL_UPDATE_MSG_SERDE))
                .aggregate(
                        ArrayList::new,
                        freeTextSignUtils::updateFreeTextSignUpdateList,
                        Materialized.<Long, List<FreeTextSignPartialUpdate>>as(Stores.inMemoryKeyValueStore("STORE_NAME"))
                                .withKeySerde(Serdes.Long())
                                .withValueSerde(FTS_PARTIAL_UPDATE_LIST_SERDE)
                                .withCachingDisabled()
                                .withLoggingDisabled()
                )
                .toStream()
                .to(
                        storesService.getFreeTextSignUpdatesStoreTopicName(),
                        Produced.with(Serdes.Long(), FTS_PARTIAL_UPDATE_LIST_SERDE)
                );

FTS_PARTIAL_UPDATE_LIST adalah implementasi Serdes yang tepat yang didefinisikan sebagai konstan (mirip dengan FTS_PARTIAL_UPDATE_MSG_SERDE yang berfungsi tanpa masalah).

Yang aneh bukanlah bahwa saya mendapatkan kesalahan ser/des, tetapi nilai yang didefinisikan dalam withValueSerde benar-benar diabaikan dan sebagai gantinya StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG digunakan (yang saat ini disetel ke JsonSerdes).

Saya menggunakan pola yang sama seperti di atas di bagian pemrosesan aliran lain dari aplikasi tanpa masalah. Juga, ketika saya mengganti JsonSerdes default dengan FTS_PARTIAL_UPDATE_LIST saya, ini berfungsi.

Saya telah memeriksa dokumentasi withValueSerde di mana dikatakan bahwa itu akan mundur ke default Serdes ketika nilai input adalah null, yang jelas tidak (saya telah memeriksanya di debuger).

0
rapasoft 10 Mei 2021, 12:30

1 menjawab

Jawaban Terbaik

Jadi sebenarnya, itu adalah kesalahan saya, karena deserializer dalam implementasi SerDes mengembalikan nol.

0
rapasoft 12 Mei 2021, 12:36