依赖RocketMQ和Connect组件
部署Apache RocketMQ Connect
我们使用Apache RocketMQ Connect作为我们的默认Runtime,来连接外部的上下游服务,您可以根据手册完成部署: RocketMQ Connect Quick Start 。在部署 Apache RocketMQ Connect 之前,您应该下载下面的插件,并将其放在rocketmq-connect中配置参数“pluginPaths”所定义的目录下:
rocketmq-connect-eventbridge-jar-with-dependencies.jar
rocketmq-connect-dingtalk-jar-with-dependencies.jar
connect-cloudevent-transform-jar-with-dependencies.jar
connect-filter-transform-jar-with-dependencies.jar
connect-eventbridge-transform-jar-with-dependencies.jar
构建Connect
git clone https://github.com/apache/rocketmq-connect.git
cd rocketmq-connect
mvn -Prelease-connect -DskipTests clean install -U
运行Worker
cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
runtime启动成功:
The standalone worker boot success.
查看启动日志文件:
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
报错处理
/opt/software/rocketmq-connect-0.0.1-SNAPSHOT/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT/bin/runconnect.sh: line 37: bc: command not found
/opt/software/rocketmq-connect-0.0.1-SNAPSHOT/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT/bin/runconnect.sh: line 37: bc: command not found
/opt/software/rocketmq-connect-0.0.1-SNAPSHOT/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT/bin/runconnect.sh: line 37: bc: command not found
linux缺少bc命令,通过以下命令安装在运行即可
yum -y install bc
命令行测试
touch test-source-file.txt
echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"test-source-file.txt","connect.topicname":"topic-test"}'
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"test-sink-file.txt","connect.topicnames":"fileTopic"}'
cat test-sink-file.txt
输出结果为test-sink-file文件中显示 Hello \r\nRocketMQ\r\n Connect 即为成功
下载地址
解压运行
目录信息
修改配置 EventBridge
运行前,我们需要配置EventBridge的运行环境,修改config/application.properties,参考如下: 需要先把库名建好!!!
# Mysql数据库的连接地址
spring.datasource.url=jdbc:mysql://xxxx:3306/xxxx?characterEncoding=utf8
spring.datasource.username=xxx
spring.datasource.password=xxxx
# RocketMQ nameserver的连接地址
rocketmq.namesrvAddr=xxxxx:9876
# RocketMQ的集群名称.
rocketmq.cluster.name=DefaultCluster
# RocketMQ Connect的连接地址
rocketmq.connect.endpoint=xxxxxx:8082
# log默认配置
log.path=~
log.level=INFO
app.name=rocketmq-eventbridge
启动 EventBridge
sh bin/eventbridge.sh start
log默认目录为~/rocketmq-eventbridge/rocketmq-eventbridge.log,可以修改上述log.path和app.name进行修改。可以通过日志来观察服务是否正常启动
测试 EventBridge
当服务启动后,我们就可以通过下面的Demo用例来测试和验证EventBridge。 Demo
创建事件总线
POST /bus/createEventBus HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"description":"a demo bus."
}
创建事件源
POST /source/createEventSource HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"eventSourceName":"demo-source",
"description":"A demo source."
}
创建事件规则
POST /rule/createEventRule HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"eventRuleName":"demo-rule",
"description":"A demo rule.",
"filterPattern":"{}"
}
创建事件目标
创建一个投递到云上EventBridge的事件目标:
POST /target/createEventTargets HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"eventRuleName":"demo-rule",
"eventTargets":[
{
"eventTargetName":"eventbridge-target",
"className":"acs.eventbridge",
"config":{
"RegionId":"cn-hangzhou",
"AliyunEventBus":"rocketmq-eventbridge"
}
}
]
}
创建一个投递到钉钉机器人推送通知的事件目标:
POST /target/createEventTargets HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"eventRuleName":"demo-rule",
"eventTargets":[
{
"eventTargetName":"dingtalk-target",
"className":"acs.dingtalk",
"config":{
"WebHook":"https://oapi.dingtalk.com/robot/send?access_token=b43a54b702314415c2acdae97eda1e092528b7a9dddb31510a5b4430be2ef867",
"SecretKey":"SEC53483bf496b8f9e0b4ab0ab669d422208e6ccfaedfd5120ea6b8426b9ecd47aa",
"Body":"{\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\",\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\"}"
}
}
]
}
发送事件到EventBus
最后,我们通过API发送一条事件,并验证Target端是否按预期收到对应的事件。
POST /putEvents HTTP/1.1
Host: demo.eventbridge.com
Content-Type:"application/cloudevents+json; charset=UTF-8"
{
"specversion" : "1.0",
"type" : "com.github.pull_request.opened",
"source" : "https://github.com/cloudevents/spec/pull",
"subject" : "123",
"id" : "A234-1234-1234",
"time" : "2018-04-05T17:31:00Z",
"datacontenttype" : "application/json",
"data" : {
"body":"demo"
},
"aliyuneventbusname":"demo-bus"
}
注意:本文归作者所有,未经作者允许,不得转载