Apa yang dibutuhkan?

Saya sedang menulis aplikasi (Spring + Kotlin) yang mengambil informasi dengan Kafka. Jika saya menyetel autoStartup = "true" saat mendeklarasikan @KafkaListener maka aplikasi berfungsi dengan baik tetapi hanya jika broker tersedia. Ketika broker tidak tersedia, aplikasi macet saat memulai. Itu perilaku yang tidak diinginkan. Aplikasi harus bekerja dan melakukan fungsi lainnya.

Apa yang saya coba lakukan

Untuk menghindari aplikasi mogok saat memulai, seseorang di situs ini dalam topik lain menyarankan pengaturan autoStartup = "false" saat mendeklarasikan @KafkaListener. Dan itu sangat membantu untuk mencegah crash saat start. Tapi sekarang saya tidak berhasil memulai KafkaListener secara manual. Dalam contoh lain saya melihat pengkabelan otomatis KafkaListenerEndpointRegistry, tetapi ketika saya mencoba melakukannya:

@Service
class KafkaConsumer @Autowired constructor(
        private val kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry
) {

IntelliJ Idea memperingatkan:

Tidak dapat melakukan autowire. Tidak ditemukan kacang jenis 'KafkaListenerEndpointRegistry'.

Ketika saya mencoba menggunakan KafkaListenerEndpointRegistry tanpa autowiring dan melakukan kode ini:

@Service
class KafkaConsumer {
    private val logger = LoggerFactory.getLogger(this::class.java)
    private val kafkaListenerEndpointRegistry = KafkaListenerEndpointRegistry()

    @Scheduled(fixedDelay = 10000)
    fun startCpguListener(){
        val container = kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
        if (!container.isRunning)
            try {
                logger.info("Kafka Consumer is not running. Trying to start...")
                container.start()
            } catch (e: Exception){
                logger.error(e.message)
            }
    }

    @KafkaListener(
            id = "consumer1",
            topics = ["cpgdb.public.user"],
            autoStartup = "false"
    )
    private fun listen(it: ConsumerRecord<JsonNode, JsonNode>, qwe: Consumer<Any, Any>){
        val pay = it.value().get("payload")
        val after = pay.get("after")
        val id = after["id"].asInt()
        
        val receivedUser = CpguUser(
                id = id,
                name = after["name"].asText()
        ) 
        logger.info("received user with id = $id")
        }
    }
}

kafkaListenerEndpointRegistry.getListenerContainer("consumer1") selalu mengembalikan nol. Saya kira itu karena saya tidak melakukan auto wire kafkaListenerEndpointRegistry. Bagaimana saya bisa melakukannya? Atau jika ada solusi lain dari jawaban saya, saya akan menghargai bantuan apa pun! Terima kasih!

Ada konfigurasi Kafka:

@Configuration
@EnableConfigurationProperties(KafkaProperties::class)
class KafkaConfiguration(private val props: KafkaProperties) {

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
        val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
        factory.consumerFactory = consumerFactory()
        factory.setConcurrency(1)
        factory.setMessageConverter(MessagingMessageConverter())
        factory.setStatefulRetry(true)

        val retryTemplate = RetryTemplate()
        retryTemplate.setRetryPolicy(AlwaysRetryPolicy())
        retryTemplate.setBackOffPolicy(ExponentialBackOffPolicy())
        factory.setRetryTemplate(retryTemplate)
        val handler = SeekToCurrentErrorHandler()
        handler.isAckAfterHandle = false
        factory.setErrorHandler(handler)
        factory.containerProperties.isMissingTopicsFatal = false

        return factory
    }

    @Bean
    fun consumerFactory(): ConsumerFactory<Any, Any> {
        return DefaultKafkaConsumerFactory(consumerConfigs())
    }

    @Bean
    fun consumerConfigs(): Map<String, Any> {
        return mapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to props.bootstrap.address,
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to listOf(MonitoringConsumerInterceptor::class.java),
                ConsumerConfig.CLIENT_ID_CONFIG to props.receiver.clientId,
                ConsumerConfig.GROUP_ID_CONFIG to props.receiver.groupId,
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
                ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed",
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true
        )
    }
}
  • versi boot musim semi: 2.3.0
  • versi spring-kafka: 2.5.3
  • versi kafka-clients: 2.5.0
0
Enbirr 3 Juli 2020, 08:23

1 menjawab

Jawaban Terbaik

Abaikan saja peringatan IntelliJ tentang pengkabelan otomatis; kacang memang ada; hanya saja IntelliJ tidak bisa mendeteksinya.

0
Gary Russell 6 Juli 2020, 14:41