• 大小: 51.13MB
    文件类型: .gz
    金币: 1
    下载: 0 次
    发布日期: 2022-05-25
  • 语言: 其他
  • 标签: activemq  

资源简介

apache-activemq-5.13.0-bin.tar.gz,处理分布式事务

资源截图

代码片段和文件信息

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License Version 2.0
 * (the “License“); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing software
 * distributed under the License is distributed on an “AS IS“ BASIS
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package example;

import org.apache.qpid.jms.*;
import javax.jms.*;

class Listener {

    public static void main(String[] args) throws JMSException {

        final String TOPIC_PREFIX = “topic://“;

        String user = env(“ACTIVEMQ_USER“ “admin“);
        String password = env(“ACTIVEMQ_PASSWORD“ “password“);
        String host = env(“ACTIVEMQ_HOST“ “localhost“);
        int port = Integer.parseInt(env(“ACTIVEMQ_PORT“ “5672“));

        String connectionURI = “amqp://“ + host + “:“ + port;
        String destinationName = arg(args 0 “topic://event“);

        JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);

        Connection connection = factory.createConnection(user password);
        connection.start();
        Session session = connection.createSession(false Session.AUTO_ACKNOWLEDGE);

        Destination destination = null;
        if (destinationName.startsWith(TOPIC_PREFIX)) {
            destination = session.createTopic(destinationName.substring(TOPIC_PREFIX.length()));
        } else {
            destination = session.createQueue(destinationName);
        }

        MessageConsumer consumer = session.createConsumer(destination);
        long start = System.currentTimeMillis();
        long count = 1;
        System.out.println(“Waiting for messages...“);
        while (true) {
            Message msg = consumer.receive();
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                if (“SHUTDOWN“.equals(body)) {
                    long diff = System.currentTimeMillis() - start;
                    System.out.println(String.format(“Received %d in %.2f seconds“ count (1.0 * diff / 1000.0)));
                    connection.close();
                    try {
                        Thread.sleep(10);
                    } catch (Exception e) {}
                    System.exit(1);
                } else {
                    try {
                        if (count != msg.getIntProperty(“id“)) {
                            System.out.println(“mismatch: “ + count + “!=“ + msg.getIntProperty(“id“));
                        }
                    } catch (

评论

共有 条评论