RocketMQ EventBridge下载使用

小小编辑 1年前 ⋅ 213 阅读

依赖RocketMQ和Connect组件

部署Apache RocketMQ Connect

我们使用Apache RocketMQ Connect作为我们的默认Runtime,来连接外部的上下游服务,您可以根据手册完成部署: RocketMQ Connect Quick Start 。在部署 Apache RocketMQ Connect 之前,您应该下载下面的插件,并将其放在rocketmq-connect中配置参数“pluginPaths”所定义的目录下:

rocketmq-connect-eventbridge-jar-with-dependencies.jar
rocketmq-connect-dingtalk-jar-with-dependencies.jar
connect-cloudevent-transform-jar-with-dependencies.jar
connect-filter-transform-jar-with-dependencies.jar
connect-eventbridge-transform-jar-with-dependencies.jar

构建Connect

git clone https://github.com/apache/rocketmq-connect.git

cd rocketmq-connect

mvn -Prelease-connect -DskipTests clean install -U

运行Worker

cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

runtime启动成功:

The standalone worker boot success.

查看启动日志文件:

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

报错处理

/opt/software/rocketmq-connect-0.0.1-SNAPSHOT/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT/bin/runconnect.sh: line 37: bc: command not found
/opt/software/rocketmq-connect-0.0.1-SNAPSHOT/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT/bin/runconnect.sh: line 37: bc: command not found
/opt/software/rocketmq-connect-0.0.1-SNAPSHOT/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT/bin/runconnect.sh: line 37: bc: command not found

linux缺少bc命令,通过以下命令安装在运行即可

yum -y install bc

命令行测试

touch test-source-file.txt
 echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"test-source-file.txt","connect.topicname":"topic-test"}'
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"test-sink-file.txt","connect.topicnames":"fileTopic"}'
cat test-sink-file.txt

输出结果为test-sink-file文件中显示 Hello \r\nRocketMQ\r\n Connect 即为成功

下载地址

Apache CSDN

解压运行

目录信息 1693209596508.jpg

修改配置 EventBridge

运行前,我们需要配置EventBridge的运行环境,修改config/application.properties,参考如下: 需要先把库名建好!!!

# Mysql数据库的连接地址
spring.datasource.url=jdbc:mysql://xxxx:3306/xxxx?characterEncoding=utf8
spring.datasource.username=xxx
spring.datasource.password=xxxx

# RocketMQ nameserver的连接地址
rocketmq.namesrvAddr=xxxxx:9876

# RocketMQ的集群名称.
rocketmq.cluster.name=DefaultCluster

# RocketMQ Connect的连接地址
rocketmq.connect.endpoint=xxxxxx:8082

# log默认配置
log.path=~
log.level=INFO
app.name=rocketmq-eventbridge

启动 EventBridge

sh bin/eventbridge.sh start

log默认目录为~/rocketmq-eventbridge/rocketmq-eventbridge.log,可以修改上述log.path和app.name进行修改。可以通过日志来观察服务是否正常启动

测试 EventBridge

当服务启动后,我们就可以通过下面的Demo用例来测试和验证EventBridge。 Demo

创建事件总线
POST /bus/createEventBus HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"description":"a demo bus."
}
创建事件源
POST /source/createEventSource HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"eventSourceName":"demo-source",
"description":"A demo source."
}
创建事件规则
POST /rule/createEventRule HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
  "eventBusName":"demo-bus",
  "eventRuleName":"demo-rule",
  "description":"A demo rule.",
  "filterPattern":"{}"
}
创建事件目标

创建一个投递到云上EventBridge的事件目标:

POST /target/createEventTargets HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
    "eventBusName":"demo-bus",
    "eventRuleName":"demo-rule",
    "eventTargets":[
            {
            "eventTargetName":"eventbridge-target",
            "className":"acs.eventbridge",
                "config":{
                "RegionId":"cn-hangzhou",
                "AliyunEventBus":"rocketmq-eventbridge"
                }
            }
        ]
}

创建一个投递到钉钉机器人推送通知的事件目标:

POST /target/createEventTargets HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
    "eventBusName":"demo-bus",
    "eventRuleName":"demo-rule",
    "eventTargets":[
        {
            "eventTargetName":"dingtalk-target",
            "className":"acs.dingtalk",
            "config":{
            "WebHook":"https://oapi.dingtalk.com/robot/send?access_token=b43a54b702314415c2acdae97eda1e092528b7a9dddb31510a5b4430be2ef867",
            "SecretKey":"SEC53483bf496b8f9e0b4ab0ab669d422208e6ccfaedfd5120ea6b8426b9ecd47aa",
            "Body":"{\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\",\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\"}"
            }
        }
    ]
}
发送事件到EventBus

最后,我们通过API发送一条事件,并验证Target端是否按预期收到对应的事件。
POST /putEvents HTTP/1.1
Host: demo.eventbridge.com
Content-Type:"application/cloudevents+json; charset=UTF-8"
{
  "specversion" : "1.0",
  "type" : "com.github.pull_request.opened",
  "source" : "https://github.com/cloudevents/spec/pull",
  "subject" : "123",
  "id" : "A234-1234-1234",
  "time" : "2018-04-05T17:31:00Z",
  "datacontenttype" : "application/json",
  "data" : {
    "body":"demo"
  },
  "aliyuneventbusname":"demo-bus"
}