Saya mencoba mencapai transaksi terdistribusi di Apache TomEE. Dengan kata lain, alirannya adalah:

  • Pembaca pesan (yaitu kacang yang digerakkan oleh pesan) membaca dari antrian (1) dan memproses satu pemicu pesan:
    • satu atau lebih sisipan/pembaruan pada database (2)
    • mengirim pesan ke antrian lain (3)

Operasi 1, 2, & 3 semuanya merupakan bagian dari transaksi XA yang sama yang dikendalikan oleh TomEE. Oleh karena itu, dalam keadaan apa pun mereka semua gagal atau semua berhasil.

Tomee.xml

<?xml version="1.0" encoding="UTF-8"?>
<tomee>
     this resource adapter is just necessary to tell tomee to not start internal ActiveMq instance
    <Resource id="MyAdapter" type="ActiveMQResourceAdapter">
        BrokerXmlConfig
        ServerUrl tcp://fakehost:666 
    </Resource> 

     <Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
        BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0 
    </Resource>

    <Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
        BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
    </Resource>

    <Resource id="jms/MyOutgoingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
        PhysicalName MY_OUTGOING_QUEUE 
    </Resource>

    <Resource id="jms/MyIncomingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
        PhysicalName MY_INCOMING_QUEUE 
    </Resource>

    <Resource id="jdbc/myDBXAPooled" type="DataSource">
        XaDataSource myDBXA
        DataSourceCreator dbcp
        JtaManaged true
        UserName TestUser
        Password TestPassword
        MaxWait 2000
        ValidationQuery SELECT 1    
        MaxActive 15
    </Resource> 

    <Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
        Url jdbc:mysql://localhost:3306/test
        User TestUser
        Password TestPassword
    </Resource>
</tomee>

Springconfig.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jee="http://www.springframework.org/schema/jee"
            xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">


    <!-- <jee:jndi-lookup jndi-name="myDBXAPooled" id="myDatasource" resource-ref="true" />  -->
    <jee:jndi-lookup jndi-name="jms/MyOutgoingConnFactory" id="myOutgoingConnFactory" resource-ref="true" />
    <jee:jndi-lookup jndi-name="jms/MyIncomingConnFactory" id="myIncomingConnFactory" resource-ref="true" />  
    <jee:jndi-lookup jndi-name="jms/MyOutgoingQueue" id="myOutgoingQueue" resource-ref="true" />
    <jee:jndi-lookup jndi-name="jms/MyIncomingQueue" id="myIncomingQueue" resource-ref="true" />
    <jee:jndi-lookup jndi-name="jdbc/myDBXAPooled" id="myDatasource" resource-ref="true" />

    <tx:jta-transaction-manager/>
    <!-- <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> -->
    <!-- the previous two ways of getting the transactionManager seems equivalent and both get Geronimo -->


</beans>

SpringConfig.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jee="http://www.springframework.org/schema/jee"
            xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">


    <bean id="messageListener" class="com.test.MyListener">
        <property name="connectionFactory" ref="myIncomingConnFactory" />
        <property name="destination" ref="myIncomingQueue" />
        <!-- <property name="sessionTransacted" value="true" /> -->
        <property name="concurrentConsumers" value="1" />
        <property name="maxConcurrentConsumers" value="6" />
        <property name="messageListener" ref="myMessageProcessor" />
        <property name="transactionManager" ref="transactionManager" />
        <property name="taskExecutor" ref="msgListenersTaskExecutor" />
    </bean>

    <bean id="myMessageProcessor" class="com.test.MyMessageReceiver">
        <property name="forwardConnectionFactory" ref="myOutgoingConnFactory" />
        <property name="forwardQueue" ref="myOutgoingQueue" />
        <property name="datasource" ref="myDatasource" />
    </bean>

    <bean id="msgListenersTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"/>



</beans>

MyMessageReceiver.java:

package com.test;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;

import org.apache.log4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

public class MyMessageReceiver implements MessageListener {

    static Logger log = Logger.getLogger(MyMessageReceiver.class);

    private ConnectionFactory forwardConnectionFactory;
    private Queue forwardQueue;
    private DataSource datasource;

    public void setForwardConnectionFactory(ConnectionFactory connFactory) {
        forwardConnectionFactory=connFactory;
    }
    public void setforwardQueue(Queue queue) {
        forwardQueue=queue;
    }
    public void setDatasource(DataSource ds) {
        datasource=ds;
    }

    @Override
    @Transactional(propagation=Propagation.REQUIRED)
    public void onMessage(Message message) {

        log.info("************************************");
        MyListener listener = (MyListener)SpringContext.getBean("messageListener");
        listener.printInfo();
        log.info("************************************");

        TextMessage msg = (TextMessage) message;
        String text = null;
        try {
            text = msg.getText();

            if (text != null) log.info("MESSAGE RECEIVED: "+ text);

            updateDB(text); // function call to update DB

            sendMsg(text);   // function call to publish messages to queue

           System.out.println("****************Rollback");
            // Throwing exception to rollback DB, Message should not be 
             // published and consumed message sent to a DLQ 
             //(Broker side DLQ configuration already done) 
        throw new RuntimeException();
            //if (text!=null && text.indexOf("rollback")!=-1) throw new RuntimeException("Message content includes the word rollback");

        } catch (Exception e) {
            log.error("Rolling back the entire XA transaction");
            log.error(e.getMessage());
            throw new RuntimeException("Rolled back because of "+e.getMessage());
        }

    }

    private void updateDB(String text) throws Exception {

        Connection conn = null;
        PreparedStatement ps = null;
        try {
            System.out.println("*******datasource "+datasource);
            conn = datasource.getConnection();
            System.out.println("*******conn "+conn.getMetaData().getUserName());
            if (conn!=null) {
                System.out.println("*******conn "+conn.getMetaData().getUserName());
                ps = conn.prepareStatement("INSERT INTO MY_TABLE (name) VALUES(?)");
                ps.setString(1, text);
                ps.executeUpdate();
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (ps!=null) {
                try {
                    ps.close();
                } catch (SQLException e) {
                    log.error(e.getMessage());
                    // do nothing
                }
            }
            if (conn!=null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    log.error(e.getMessage());
                    // do nothing
                }
            }
        }

    }

    private void sendMsg(String msgToBeSent) throws Exception {

        javax.jms.Connection conn = null;
        Session session = null;
        try {
            System.out.println("*************forwardConnectionFactory"+forwardConnectionFactory);
            conn = forwardConnectionFactory.createConnection();
            session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
            MessageProducer messageProducer = session.createProducer(forwardQueue);
            TextMessage msg = session.createTextMessage(msgToBeSent);
            messageProducer.send(msg);

        } catch (Exception e) {
            throw e;
        } finally {
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    // do nothing
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    // do nothing
                }
            }
        }
    }

}

MyListener.java:

package com.test;

import javax.transaction.Status;
import javax.transaction.SystemException;

import org.apache.log4j.Logger;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.jta.JtaTransactionManager;

public class MyListener extends DefaultMessageListenerContainer {

    static Logger log = Logger.getLogger(MyListener.class);

    public void printInfo() {

        try {

            log.info("trans manager="+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager()+","+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().getStatus()+", this.isSessionTransacted()="+this.isSessionTransacted());
            log.info("STATUS_ACTIVE="+Status.STATUS_ACTIVE);
            log.info("STATUS_COMMITTEDE="+Status.STATUS_COMMITTED);
            log.info("STATUS_COMMITTING="+Status.STATUS_COMMITTING);
            log.info("STATUS_MARKED_ROLLBACK="+Status.STATUS_MARKED_ROLLBACK);
            log.info("STATUS_NO_TRANSACTION="+Status.STATUS_NO_TRANSACTION);
            log.info("STATUS_PREPARED="+Status.STATUS_PREPARED);
            log.info("STATUS_PREPARING="+Status.STATUS_PREPARING);
            log.info("STATUS_ROLLEDBACK="+Status.STATUS_ROLLEDBACK);
            log.info("STATUS_ROLLING_BACK="+Status.STATUS_ROLLING_BACK);
            log.info("STATUS_UNKNOWN="+Status.STATUS_UNKNOWN);



        } catch (SystemException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    public void forceRollback() {
        try {
            ((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().setRollbackOnly();
        } catch (IllegalStateException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SecurityException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SystemException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

Setelah memperbarui database dan mengirim pesan ke antrian keluar, saya sengaja melemparkan RuntimeException hanya untuk menguji rollback transaksi dari database dan broker pesan.

Ketiga operasi dilakukan jika berhasil, tetapi hanya memutar kembali operasi database jika gagal, sementara dua operasi JMS tetap dilakukan.

Bisa jadi:

  • pengaturan yang salah di tomee.xml saya (terutama pabrik koneksi antrian) atau di tempat lain
  • bug??

Saya sudah menghabiskan cukup banyak waktu untuk bertarung dengan benda itu dan mencari solusi yang mungkin.

Akan sangat bagus untuk mendengar pendapat Anda tentang ini dan, sekali lagi, permintaan maaf yang mendalam jika ternyata ada kesalahan di pihak saya.

2
Ramden 11 Maret 2020, 04:05

1 menjawab

Jawaban Terbaik

Saya yakin Anda perlu menggunakan adaptor sumber daya ActiveMQ JCA untuk memastikan bahwa koneksi secara otomatis terdaftar ke dalam transaksi XA. Coba ini:

<tomee>
    <Resource id="MyJmsResourceAdapter" type="ActiveMQResourceAdapter">
        # Do not start the embedded ActiveMQ broker
        BrokerXmlConfig  =
        ServerUrl = tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
    </Resource>

    <Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory">
        resourceAdapter = MyJmsResourceAdapter
        transactionSupport = xa
    </Resource>

    <Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory">
        resourceAdapter = MyJmsResourceAdapter
        transactionSupport = xa
    </Resource>

    <Resource id="jms/MyOutgoingQueue" type="javax.jms.Queue"/>
    <Resource id="jms/MyIncomingQueue" type="javax.jms.Queue"/>

    <Resource id="jdbc/myDBXAPooled" type="DataSource">
        XaDataSource myDBXA
        DataSourceCreator dbcp
        JtaManaged true
        UserName TestUser
        Password TestPassword
        MaxWait 2000
        ValidationQuery SELECT 1    
        MaxActive 15
    </Resource> 

    <Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
        Url jdbc:mysql://localhost:3306/test
        User TestUser
        Password TestPassword
    </Resource>
</tomee>
1
Justin Bertram 11 Maret 2020, 03:01