Saya mencoba membangun aplikasi Java Spring Boot yang akan memposting & mendapatkan pesan dari Confluent Cloud Kafka.

Saya mengikuti artikel untuk memublikasikan pesan Kafka ke Confluent Cloud dan berhasil.

Di bawah ini adalah implementasinya

KafkaController.java

package com.seroter.confluentboot.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.seroter.confluentboot.dto.Product;
import com.seroter.confluentboot.engine.Producer;

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final Producer producer;
    
    private final com.seroter.confluentboot.engine.Consumer consumer;

    @Autowired
    KafkaController(Producer producer,com.seroter.confluentboot.engine.Consumer consumer) {
        this.producer = producer;
        this.consumer=consumer;
    }

    @PostMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
    }
   
    
    @PostMapping(value="/publishJson")
    public ResponseEntity<Product> publishJsonMessage(@RequestBody Product product) {
        producer.sendJsonMessage(product);
        ResponseEntity<Product> responseEntity=new ResponseEntity<>(product,HttpStatus.CREATED);
        return responseEntity;
    }
    
    
}

Product.java

package com.seroter.confluentboot.dto;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonPropertyOrder(value = {"product_id","product_name","quantity","price"})
public class Product {

    @JsonProperty(value = "product_id")
    private int productId;
    @JsonProperty(value="product_name")
    private String productName;
    
    private int quantity;
    
    private double price;
    
}

Producer.java

package com.seroter.confluentboot.engine;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;

import com.seroter.confluentboot.dto.Product;

@Service
@EnableBinding(Source.class)
public class Producer {

    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    private static final String TOPIC = "users";

    @Autowired
    private Source source;

    public void sendMessage(String message) {
        logger.info(String.format("#### -> Producing message -> %s", message));
        this.source.output().send(new GenericMessage<>(message));
    }
    
    
    public void sendJsonMessage(Product product)
    {
        logger.info(String.format("#### -> Producing message -> %s",product.toString()));
        this.source.output().send(new GenericMessage<>(product));
    }
    
}

ConfluentBootApplication.java

package com.seroter.confluentboot;

import org.apache.tomcat.util.net.WriteBuffer.Sink;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.seroter.confluentboot.dto.Product;


@SpringBootApplication
@EnableBinding(Source.class)
@RestController
@RequestMapping(value = "/confluent")

public class ConfluentBootApplication {
    @Autowired
    private  com.seroter.confluentboot.engine.Consumer consumer;
    
    public static void main(String[] args) {
        SpringApplication.run(ConfluentBootApplication.class, args);
    }
    
     
}

aplikasi.properti

spring.cloud.stream.kafka.binder.brokers=pkc-epwny.eastus.azure.confluent.cloud:9092
spring.cloud.stream.bindings.output.destination=test
  
spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

server.port=9000

Berhasil

enter image description here

Dan saya bisa memverifikasi

enter image description here

Saya ingin membangun Titik Akhir REST Konsumen Boot Musim Semi? Bagaimana saya melakukannya?

Pembaruan:

ConfluentConsumer.java

package com.seroter.confluentboot.controller;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

import com.seroter.confluentboot.dto.Product;

//@RestController
@EnableBinding(Sink.class)
public class ConfluentConsumer {
    
    @StreamListener(Sink.INPUT)
    public void consumeMessage(Product product)
    {
        System.out.println("******************************");
        System.out.println("============= "+product.getProductId()+" ================");
        System.out.println("******************************");
    }

}

Consumer.java

package com.seroter.confluentboot.engine;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Service;

@Service
@PropertySource("classpath:application.properties")
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Producer.class);
}
5
Karthikeyan Vijayakumar 19 November 2020, 17:18

1 menjawab

Jawaban Terbaik

Saya yakin apa yang Anda coba lakukan di sini adalah, pilih pesan terbaru dari Kafka consumer melalui titik akhir REST yaitu Anda ingin polling topik Kafka secara manual. Memublikasikan pesan melalui titik akhir REST adalah logis, tetapi mengonsumsi pesan melalui titik akhir sepertinya bukan ide yang bagus. Jika Anda menginginkan perilaku antrian, Anda harus menggunakan RabbitMQ daripada Kafka.

Tapi tetap saja, jika Anda ingin menggunakan Kafka dan polling pesan secara manual. Anda dapat menggunakan salah satu dari 2 pendekatan di bawah ini.

Pendekatan 1: Buat ConsumerFactory dan dapatkan Consumer dari pabrik, lalu jajak pendapat Kafka menggunakan Consumer

@Configuration
class KafkaConsumerConfig {

    private static final String TOPIC_NAME = "test";
    private final String userName = "username";
    private final String password = "password";

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"pkc-epwny.eastus.azure.confluent.cloud:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"conumer-gp-1");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + userName + " password=" + password);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public Consumer<String, String> createConsumer(ConsumerFactory consumerFactory) {
        Consumer consumer = consumerFactory.createConsumer("consumer-group-1", "client-1");
        consumer.subscribe(List.of(TOPIC_NAME));
        return consumer;
    }
}

Anda dapat membaca nama topik, id grup, server bootstrap, konfigurasi SSL, dll. dari application.properties

Sekarang Anda dapat menggunakan pesan dengan menyuntikkan konsumen di RestController.

private final Consumer<String, String> consumer;

@Autowired
ConsumerController(Consumer<String, String> consumer) {
    this.consumer = consumer;
}

@GetMapping("retrieveMessage")
public String getMessage() {
    // Kafka might return more than 1 events so be careful
    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
    if (!consumerRecords.isEmpty()) {
        Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
        String value = iterator.next().value();
        consumer.commitSync();
        return value;
    } else {
        return "no message";
    }
}

Pendekatan 2: simpan pesan dalam antrean dalam memori dan kemudian polling antrean dalam memori

spring.cloud.stream.bindings.input.destination=test

Kemudian simpan pesan dalam Queue dan ambil melalui titik akhir REST

@RestController
@EnableBinding(Sink.class)
class ConsumerController {

    private final Queue<String> queue;

    ConsumerController() {
        this.queue = new ConcurrentLinkedQueue<>();
    }


    @StreamListener(target = Sink.INPUT)
    public void consume(String message) {
        this.queue.add(message);
    }

    @GetMapping("getMessage")
    public String retrieveMessage() {
        return this.queue.poll();
    }
}

Kekurangan: Anda akan kehilangan semua pesan dalam memori jika aplikasi dimulai ulang. Dengan demikian, menyimpan pesan dalam cache terdistribusi seperti Redis akan menjadi solusi yang lebih baik.

0
Govinda Sakhare 29 November 2020, 09:17