下载安装包
官网地址 RocketMQ下载 csdn rocketmq安装包
解压安装包
unzip rocketmq-all-5.1.3-bin-release.zip
启动命令
启动NameServer
cd rocketmq-all-5.1.3-bin-release
sh bin/mqnamesrv 出现上面日志,说明启动成功,也可通过下面命令后台启动 nohup sh bin/mqnamesrv &
启动Broker
需要另起一个窗口,输入以下命令
sh bin/mqbroker -n 192.168.4.105:9876
后台启动命令
nohup sh bin/mqbroker -n 192.168.44.128:9876 &
创建消息
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=NORMAL
Java生产消费RocketMQ
引入pom文件
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.3</version>
</dependency>
生产者:
package com.rocket.demo;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketProducerDemo {
private final static String nameServer = "192.168.4.105:9876";
private final static String producerGroup = "my_group";
private final static String topic = "topic-test";
public static void main(String[] args) {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
// 启动producer
producer.start();
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 利用producer进行发送,并同步等待发送结果
SendResult sendResult = producer.send(msg, 10000);
System.out.println(sendResult);
// 一旦producer不再使用,关闭producer
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者:
package com.rocket.demo;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketConsumerDemo {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("producer_group");
consumer.setNamesrvAddr("192.168.4.105:9876"); // RocketMQ 服务器地址
consumer.subscribe("topic-test", "*"); // 订阅主题和标签,* 表示订阅所有标签
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
System.out.println("Received message: " + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started");
}
}
注意:本文归作者所有,未经作者允许,不得转载