Untuk tujuan POC, saya telah membangun aplikasi Spring Boot yang menggunakan ActiveMQ untuk pengiriman pesan melalui JMSTemplate.

Untuk pemantauan, saya ingin mendengarkan pesan yang dimasukkan dan dihapus dari antrian menggunakan "Topik Saran".

Saya telah memperbarui konfigurasi ActiveMQ untuk mengaktifkan nasihat yang relevan:

<!-- activemq.xml -->
 <broker xmlns="http://activemq.apache.org/schema/core" useJmx="true" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" advisoryForConsumed="true" advisoryForDelivery="true">
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
                <policyEntry queue=">" advisoryForConsumed="true" advisoryForDelivery="true">
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

</broker>

Dalam aplikasi saya telah mengonfigurasi pabrik koneksi JMS dan pabrik wadah pendengar JMS untuk mengaktifkan domain penasihat dan pubsub, dan menyiapkan pendengar untuk topik konsultasi:

@Configuration
public class JmsConfig {
    @Autowired
    MessageListener messageListener;

    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setWatchTopicAdvisories(true);
        connectionFactory.setBrokerURL("vm://localhost?broker.persistent=false");
        return connectionFactory;
    }

    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) throws JMSException {

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        configurer.configure(factory, connectionFactory);

        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Queue bulkQueue = session.createQueue("bulk");

        Topic deliveredAdvisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(bulkQueue);
        MessageConsumer deliveredAdvisoryTopicConsumer = session.createConsumer(deliveredAdvisoryTopic);
        deliveredAdvisoryTopicConsumer.setMessageListener(messageListener);

        Topic consumedAdvisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic(bulkQueue);
        MessageConsumer consumedAdvisoryTopicConsumer = session.createConsumer(consumedAdvisoryTopic);
        consumedAdvisoryTopicConsumer.setMessageListener(messageListener);

        return factory;
    }

Pendengar yang akan membaca topik nasihat hanya untuk masuk:

@Component
public class AdvisoryMessageListener implements MessageListener {
    @Override public void onMessage(Message message) {
        System.out.println("Received advisory message");
        System.out.println(message);
    }
}

Pendengar sebenarnya yang akan membaca dari antrian mirip dengan pendengar pesan nasihat:

@Component
public class Receiver {

    @JmsListener(destination = "bulk", containerFactory = "jmsListenerContainerFactory")
    public void receiveMessage(Email email) {
        System.out.println("Received <" + email + ">");
    }

}

Rest API akan memicu aplikasi untuk menempatkan pesan pada antrian:


@RestController("/emails")
public class EmailController {

    @Autowired
    private JmsTemplate jmsTemplate;

    @PostMapping("/")
    public void persistEmail(@RequestBody Email email) {
        jmsTemplate.setExplicitQosEnabled(true);
        jmsTemplate.setTimeToLive(0L);
        jmsTemplate.convertAndSend("bulk", email);
    }
}

Setiap kali API dipanggil dan email dimasukkan ke dalam antrean, Receiver.receiveMessage membacanya dan mencatatnya tetapi tidak ada tindakan di AdvisoryMessageListener.

Satu-satunya hal yang muncul di konsol adalah sebagai berikut: Received <Email{to=foo@bar.com, body=Hello}> Dicetak oleh Receiver.receiveMessage

Apa yang saya lakukan salah?

0
FarthVader 5 Desember 2019, 16:53

1 menjawab

Jawaban Terbaik

Ini bekerja dengan baik untuk saya...

@SpringBootApplication
public class So59196698Application {

    public static void main(String[] args) {
        SpringApplication.run(So59196698Application.class, args);
    }

    @JmsListener(destination = "so59196698")
    public void listen(Message in) {
        System.out.println("Received:" + in);
    }

    @JmsListener(destination = "#{advisoryTopicNames.deliveredTopic}", containerFactory = "topicFactory")
    public void delivered(Message in) {
        System.out.println("Delivered:" + in);
    }

    @JmsListener(destination = "#{advisoryTopicNames.consumedTopic}", containerFactory = "topicFactory")
    public void consumed(Message in) {
        System.out.println("Consumed:" + in);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> {
            Thread.sleep(5000);
            template.convertAndSend("so59196698", "test");
        };
    }

    @Bean
    public JmsListenerContainerFactory<?> topicFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }

}

@Component
class AdvisoryTopicNames {

    private static final Destination QUEUE = new ActiveMQQueue("so59196698");

    public String getDeliveredTopic() throws JMSException {
        return AdvisorySupport.getMessageDeliveredAdvisoryTopic(QUEUE).getTopicName();
    }

    public String getConsumedTopic() throws JMSException {
        return AdvisorySupport.getMessageConsumedAdvisoryTopic(QUEUE).getTopicName();
    }

}

Dan

Received:ActiveMQTextMessage {commandId = 11, ...
Delivered:ActiveMQMessage {commandId = 0, ...
Consumed:ActiveMQMessage {commandId = 0, ...
1
Gary Russell 5 Desember 2019, 19:30