Linux下载安装RocketMQ以及Java生产消费RocketMQ

小小编辑 1年前 ⋅ 147 阅读

下载安装包

官网地址 RocketMQ下载 csdn rocketmq安装包

解压安装包

unzip rocketmq-all-5.1.3-bin-release.zip 图片1.png

启动命令

启动NameServer

cd rocketmq-all-5.1.3-bin-release

sh bin/mqnamesrv 图片2.png 出现上面日志,说明启动成功,也可通过下面命令后台启动 nohup sh bin/mqnamesrv &

启动Broker

需要另起一个窗口,输入以下命令

sh bin/mqbroker -n 192.168.4.105:9876

图片3.png 后台启动命令

nohup sh bin/mqbroker -n 192.168.44.128:9876 &

创建消息

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=NORMAL

Java生产消费RocketMQ

引入pom文件

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>5.1.3</version>
        </dependency>

生产者:

package com.rocket.demo;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketProducerDemo {

    private final static String nameServer = "192.168.4.105:9876";

    private final static String producerGroup = "my_group";

    private final static String topic = "topic-test";

    public static void main(String[] args) {
        try {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 启动producer
            producer.start();
            // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 利用producer进行发送,并同步等待发送结果
            SendResult sendResult = producer.send(msg, 10000);
            System.out.println(sendResult);
            // 一旦producer不再使用,关闭producer
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

消费者:

package com.rocket.demo;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RocketConsumerDemo {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("producer_group");
        consumer.setNamesrvAddr("192.168.4.105:9876"); // RocketMQ 服务器地址
        consumer.subscribe("topic-test", "*"); // 订阅主题和标签,* 表示订阅所有标签
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    System.out.println("Received message: " + new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer started");
    }
}