Saya mencoba mengatur konsumen asinkron untuk Apache Pulsar, tetapi masalah saya hanya 1 pesan yang diterima dan tidak ada pesan lain yang masuk kecuali saya me-restart aplikasi Spring Boot saya. Sayangnya, dokumentasi seputar penggunaan CompletableFuture dengan Pulsar tidak bagus dan saya baru menggunakannya.

Kode saya di bawah ini:

@EventListener(ApplicationReadyEvent.class)
public void subscribe() throws PulsarClientException {

Consumer consumer = this.pulsarClient.newConsumer(JSONSchema.of(TenantMicroserviceProvisioningRequestSchema.class))
            .subscriptionName(this.pulsarSubscriptionName)
            .topic(this.pulsarTopicName)
            .ackTimeout(240, TimeUnit.SECONDS)
            .subscriptionType(SubscriptionType.Exclusive)
            .subscribe();

while(true) {
    CompletableFuture<Message> asyncMessage = consumer.receiveAsync();

asyncMessage.thenAcceptAsync(incomingMessage -> {
        TenantMicroserviceProvisioningRequestSchema schema = (TenantMicroserviceProvisioningRequestSchema) incomingMessage.getValue();
        
       LOGGER.info(String.format("***New provisioning request recieved for tenant database [%s] with instance code [%s]", schema.getDatabaseName(), schema.getInstanceCode()));
                
       consumer.acknowledgeAsync(incomingMessage.getMessageId());
       
    });
 }
}

Dalam Java Docs dikatakan "receiveAsync() harus dipanggil kemudian setelah CompletableFuture yang dikembalikan selesai dengan pesan yang diterima. Jika tidak, itu akan membuat simpanan permintaan penerimaan dalam aplikasi.", tapi saya tidak yakin bagaimana melakukan ini. Saya pikir ini yang menyebabkan masalah.

Dokumen: Dokumen Pulsar: https://pulsar.Apache.org/ docs/en/client-libraries-java/#async-receive Pulsar Java Doc untuk receieveAsync(): https: //pulsar.apache.org/api/client/org/apache/pulsar/client/api/consumer.html

*** PEMBARUAN *** Saya telah menambahkan loop sementara, tetapi ketika saya melakukan ini, konsumsi memori Boot Musim Semi saya mengapung hingga 10 GB. Tidak yakin, tetapi bertanya-tanya apakah itu pengaturan masa depan.

1
user521990 20 April 2021, 04:06

1 menjawab

Jawaban Terbaik

Untuk mencegah konsumsi memori berlebih seperti itu, Anda perlu menambahkan struktur data perantara ke kelas Anda yang membatasi jumlah CompletableFutures yang beredar seperti LinkedBlockingQueue seperti yang ditunjukkan di bawah ini

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;

public class AsyncConsumerDemo {
    
    private static final LinkedBlockingQueue<CompletableFuture<Message<String>>> outstandingMessages = 
      new LinkedBlockingQueue<CompletableFuture<Message<String>>>(1000);

    public static void main(String[] args) throws PulsarClientException, InterruptedException, ExecutionException {
        
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl("pulsar://broker:6650")
                .build();

        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                    .subscriptionName("pulsarSubscriptionName")
                    .topic("pulsarTopicName")
                    .ackTimeout(240, TimeUnit.SECONDS)
                    .subscriptionType(SubscriptionType.Exclusive)
                    .subscribe();
        
        new Thread(() -> { // Message Retrieval Thread
            CompletableFuture<Message<String>> future;
            
            while ( (future = consumer.receiveAsync()) != null) {
                outstandingMessages.add(future);
            }
          }).start();
        
        new Thread(() -> { // Message Consumer Thread
            while(true) {
                try {
                    CompletableFuture<Message<String>> future = outstandingMessages.take();
                    Message<String> msg = future.get();
                    
                    // Process the message
                    
                    consumer.acknowledgeAsync(msg.getMessageId());
                    
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
          }).start();
    }
    
}
0
David Kjerrumgaard 21 April 2021, 16:09