创建verticawriter module
datax的目录结构都比较类似,可以直接复制mysqlwriter改为verticawriter
替换pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>verticawriter</artifactId>
<name>verticawriter</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>com.vertica</groupId>
<artifactId>vertica-jdbc</artifactId>
<version>9.3.1-0</version>
</dependency>
<!-- hutool -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.7</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>6.22.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
java代码
VerticaConstant
public class VerticaConstant {
public interface VERTICA_TYPE {
String NUMBER = "6";
String VARCHAR = "9";
String LONG_VARCHAR = "115";
String TIMESTAMP = "13";
}
}
VerticaWriter
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.DbUtil;
import cn.hutool.db.Entity;
import cn.hutool.db.handler.EntityListHandler;
import cn.hutool.db.sql.SqlExecutor;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class VerticaWriter extends Writer {
private static final DataBaseType DATABASE_TYPE = DataBaseType.Vertica;
public static class Job extends Writer.Job {
private static final Logger LOG = LoggerFactory
.getLogger(Job.class);
private Configuration originalConfig = null;
private CommonRdbmsWriter.Job commonRdbmsWriterJob;
@Override
public void preCheck(){
this.init();
this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE);
}
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterJob.init(this.originalConfig);
}
// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)
@Override
public void prepare() {
originalConfig.set(Constant.TABLE_NUMBER_MARK, 1);
//实跑先不支持 权限 检验
//this.commonRdbmsWriterJob.privilegeValid(this.originalConfig, DATABASE_TYPE);
this.commonRdbmsWriterJob.prepare(this.originalConfig);
}
@Override
public List<Configuration> split(int mandatoryNumber) {
return this.commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);
}
// 一般来说,是需要推迟到 task 中进行post 的执行(单表情况例外)
@Override
public void post() {
this.commonRdbmsWriterJob.post(this.originalConfig);
}
@Override
public void destroy() {
this.commonRdbmsWriterJob.destroy(this.originalConfig);
}
}
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory
.getLogger(Task.class);
private Configuration writerSliceConfig;
private CommonRdbmsWriter.Task commonRdbmsWriterTask;
private int fetchSize = 500;
DataSource dataSource;
static final String JDBC_DRIVER="com.vertica.jdbc.Driver";
public static String INSERT_SQL = "insert /*+ direct */ into %s (%s) values (%s)";
public static String PREFIX_TMP_TABLE = "tmp_";
public static final String DROP_TABLE_SQL = "drop table if exists %s";
public static final String BATCH_INSERT_SQL = "insert into %s.%s (%s)\n" +
"select %s\n";
public static final String SQL_UNION_ALL = " union all select %s \n";
public static final String CREATE_TMP_TABLE_SQL = "create table if not exists %s like %s including projections";
public static final String MERGE_SQL = "merge into %s t using %s s on (%s) when matched then %s when not matched then %s";
public static final String MERGE_UPDATE_SQL = " UPDATE SET %s";
public static final String MERGE_INSERT_SQL = " INSERT (%s) VALUES (%s)";
public static String UPDATE_SQL = "update /*+ direct */ %s set %s where %s";
public static final String SCHEMA_SQL = "select column_name,data_type,is_nullable,is_identity,data_type_id,data_type_length,table_schema,table_name from columns where table_schema='%s' and table_name='%s'";
List<String> columns;
String tables;
Map<String, String> columnType;
String schema = "public";
String primaryKey = "id";
String whereSql = "";
String mergeUpdateSql = "";
String mergeInsertSql = "";
public List<Entity> queryColumns(String schema, String table) {
List<Entity> entities = query(String.format(SCHEMA_SQL, schema, table));
entities.forEach(e -> {
columnType.put(e.getStr("column_name"), e.getStr("data_type_id"));
});
whereSql = String.format(" t.%s=s.%s ", primaryKey, primaryKey);
return entities;
}
public void doCreateParams(Configuration writerSliceConfig) {
columnType = new HashMap<>();
String username = writerSliceConfig.getString(Key.USERNAME);
String password = writerSliceConfig.getString(Key.PASSWORD);
columns = writerSliceConfig.getList(Key.COLUMN, String.class);
StringBuilder columnsMergeUpdate = new StringBuilder();
StringBuilder columnsMergeInsert = new StringBuilder();
columns.forEach(t -> {
columnsMergeUpdate.append(t).append("=s.").append(t).append(",");
columnsMergeInsert.append("s.").append(t).append(",");
});
mergeUpdateSql = String.format(MERGE_UPDATE_SQL, StrUtil.removeSuffix(columnsMergeUpdate.toString(), ","));
mergeInsertSql = String.format(MERGE_INSERT_SQL, StringUtils.join(columns, ","), StrUtil.removeSuffix(columnsMergeInsert.toString(), ","));
String keySchema = writerSliceConfig.getString(Key.SCHEMA);
if (StringUtils.isNotBlank(keySchema)) {
schema = keySchema;
}
String primaryColumne = writerSliceConfig.getString(Key.PRIMARY_KEY);
if (StringUtils.isNotBlank(primaryColumne)) {
primaryKey = primaryColumne;
}
int batchSize = writerSliceConfig.getInt(Key.BATCH_SIZE);
if (batchSize > 1) {
fetchSize = batchSize;
}
String jdbcUrl = writerSliceConfig.getString(Key.JDBC_URL);
tables = writerSliceConfig.getString(Key.TABLE);
dataSource = new DruidDataSource();
((DruidDataSource) dataSource).setUrl(jdbcUrl);
((DruidDataSource) dataSource).setUsername(username);
((DruidDataSource) dataSource).setPassword(password);
((DruidDataSource) dataSource).setDriverClassName(JDBC_DRIVER);
}
@Override
public void init() {
this.writerSliceConfig = super.getPluginJobConf();
this.commonRdbmsWriterTask = new CommonRdbmsWriter.Task(DATABASE_TYPE);
// this.commonRdbmsWriterTask.init(this.writerSliceConfig);
doCreateParams(writerSliceConfig);
LOG.info("init:"+writerSliceConfig.toJSON());
}
@Override
public void prepare() {
// this.commonRdbmsWriterTask.prepare(this.writerSliceConfig);
}
//写入数据方法
public void startWrite(RecordReceiver recordReceiver) {
// this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,
// super.getTaskPluginCollector());
Record record = null;
queryColumns(schema, tables);
// 1.先删除原来的临时表
String tmpTable = PREFIX_TMP_TABLE + tables;
String dropSql = dropTmpTable(tmpTable);
LOG.info("dropsql:" + dropSql);
exec(dropSql);
// 2.创建临时表
String genTmpSql = createTmpTable(tmpTable, tables);
LOG.info("genTmpSql:" + genTmpSql);
exec(genTmpSql);
// 3.批量写入临时表
StringBuilder batchInsertSql = new StringBuilder();
boolean firstInsert = true;
int flag = 0;
while ((record = recordReceiver.getFromReader()) != null) {
flag ++;
List<String> values = new ArrayList<>();
for (int i = 0; i < columns.size(); i++) {
String columnName = columns.get(i);
String colType = columnType.get(columnName);
Object value = record.getColumn(i).getRawData();
LOG.info("value:" + value);
if (value != null) {
values.add(transformData(colType, value.toString())+"");
} else {
values.add(null+"");
}
}
if (firstInsert) {
String sql = genBatchInsert(columns, values, tmpTable, schema);
firstInsert = false;
batchInsertSql.append(sql);
} else {
String sql = genUnionSql(values);
batchInsertSql.append(sql);
}
if (flag > fetchSize) {
// 执行保存
LOG.info("total:"+flag+",sql:" +batchInsertSql.toString());
saveBatchInsert(batchInsertSql.toString());
flag = 0;
firstInsert = true;
batchInsertSql = new StringBuilder();
}
// exec(sql);
}
saveBatchInsert(batchInsertSql.toString());
// 4.merge into
mergeRecord(tmpTable, tables, whereSql, mergeUpdateSql, mergeInsertSql);
}
public void saveBatchInsert(String batchInsertSql){
LOG.info("sql:" +batchInsertSql.toString());
if(StringUtils.isNotBlank(batchInsertSql)) {
exec(batchInsertSql);
}
}
public String transformData(String colType, String value) {
switch (colType) {
case VerticaConstant.VERTICA_TYPE.NUMBER:
return value;
case VerticaConstant.VERTICA_TYPE.VARCHAR:
case VerticaConstant.VERTICA_TYPE.LONG_VARCHAR:
//如果是字符串字段
StringBuilder values_clause = new StringBuilder();
// value = "'"+new Timestamp(Long.valueOf(value))+"'";
value = StrUtil.replace(StrUtil.toString(value), "'", "''");
values_clause.append("'").append(value).append("'");
return values_clause.toString();
case VerticaConstant.VERTICA_TYPE.TIMESTAMP:
//如果是字符串字段
StringBuilder temp = new StringBuilder();
// value = "'"+new Timestamp(Long.valueOf(value))+"'";
value = StrUtil.replace(StrUtil.toString(value), "'", "''");
value = StringUtils.removeEnd(value, "000");
temp.append("to_timestamp('").append(value).append("')");
return temp.toString();
default:
return value;
}
}
public static String dropTmpTable(String tmpTable) {
String sql = String.format(DROP_TABLE_SQL, tmpTable);
return sql;
}
public static String createTmpTable(String tmpTable, String targetTable) {
String sql = String.format(CREATE_TMP_TABLE_SQL, tmpTable, targetTable);
return sql;
}
public static String genBatchInsert(List<String> columnHolders, List<String> valueHolders, String table, String schema) {
String sql = String.format(BATCH_INSERT_SQL, schema,
table,
StringUtils.join(columnHolders, ","),
StringUtils.join(valueHolders, ","));
return sql;
}
public static String genUnionSql(List<String> valueHolders) {
String sql = String.format(SQL_UNION_ALL, StringUtils.join(valueHolders, ","));
return sql;
}
public static String getBatchWriteTemplate(List<String> columnHolders, List<String> valueHolders, String table) {
String sql = String.format(INSERT_SQL,
table,
StringUtils.join(columnHolders, ","),
StringUtils.join(valueHolders, ","));
return sql;
}
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String table) {
String sql = String.format(INSERT_SQL,
table,
StringUtils.join(columnHolders, ","),
StringUtils.join(valueHolders, ","));
return sql;
}
public List<Entity> query(String sql, String... params) {
Connection conn = null;
List<Entity> entityList = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
entityList = SqlExecutor.query(conn, sql, new EntityListHandler(), params);
conn.commit();
} catch (SQLException e) {
e.printStackTrace();
} finally {
DbUtil.close(conn);
}
return entityList;
}
protected void exec(String sql){
Connection conn = null;
int[] count = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
Statement statement = conn.createStatement();
statement.addBatch(sql);
count = statement.executeBatch();
conn.commit();
statement.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
DbUtil.close(conn);
}
}
protected void mergeRecord(String tmpTable, String table, String whereSql, String columnUpdate, String columnInsert) {
try {
Connection connection = dataSource.getConnection();
connection.setAutoCommit(false);
Statement sm = connection.createStatement();
String sql = String.format(MERGE_SQL, table, tmpTable, whereSql, columnUpdate, columnInsert);
LOG.info("mergeRecord:" + sql);
int i = sm.executeUpdate(sql);
System.out.println(i);
connection.commit();
connection.close();
sm.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
protected void doBatchInsert(Connection connection, List<Record> buffer)
throws SQLException {
PreparedStatement preparedStatement = null;
try {
connection.setAutoCommit(false);
String sql = "MERGE INTO public.base_market_info tg USING public.base_market_info_temp tem\n" +
"ON (tg.market_id = tem.market_id)\n" +
"WHEN MATCHED THEN UPDATE SET\n" +
" market_name = tem.market_name,\n" +
" market_store_id = tem.market_store_id,\n" +
" updatetime = tem.updatetime\n" +
"WHEN NOT MATCHED THEN\n" +
"INSERT VALUES\n" +
" tem.market_id,\n" +
" tem.market_name,\n" +
" tem.updatetime\n" +
" )";
preparedStatement = connection
.prepareStatement(sql);
for (Record record : buffer) {
// preparedStatement = fillPreparedStatement(
// preparedStatement, record);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
connection.commit();
} catch (SQLException e) {
e.printStackTrace();
LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为:" + e.getMessage());
} catch (Exception e) {
throw DataXException.asDataXException(
DBUtilErrorCode.WRITE_DATA_ERROR, e);
} finally {
DBUtil.closeDBResources(preparedStatement, null);
}
}
@Override
public void post() {
this.commonRdbmsWriterTask.post(this.writerSliceConfig);
}
@Override
public void destroy() {
this.commonRdbmsWriterTask.destroy(this.writerSliceConfig);
}
@Override
public boolean supportFailOver(){
String writeMode = writerSliceConfig.getString(Key.WRITE_MODE);
return "replace".equalsIgnoreCase(writeMode);
}
}
}
配置文件
plugin.json
{
"name": "verticawriter",
"class": "com.xxxx.data.plugin.writer.verticawriter.VerticaWriter",
"description": "vertica writer",
"developer": "cx"
}
plugin_job_template.json
{
"name": "verticawriter",
"parameter": {
"username": "",
"password": "",
"column": [],
"connection": [
{
"jdbcUrl": "",
"table": []
}
]
}
}
datax模板
{
"job": {
"content": [
{
"reader": {
"parameter": {
"password": "xxxx",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://xxx:3306/db"
],
"querySql": [
"select id as id,ds as ds,log_time as process_time from test"
]
}
],
"username": "xxxx"
},
"name": "mysqlreader"
},
"writer": {
"parameter": {
"password": "",
"batchSize": 1000,
"column": [
"id",
"ds",
"process_time"
],
"connection": [
{
"jdbcUrl": "jdbc:vertica://xxxx:5433/test1",
"table": [
"xxxx"
]
}
],
"writeMode": "update",
"username": ""
},
"name": "verticawriter",
"writeMode": "update"
}
}
],
"setting": {
"speed": {
"byte": 1048576,
"channel": 1
}
}
}
}
这篇文章免去了很多细节,详细步骤请参考datax扩展vertica插件
注意:本文归作者所有,未经作者允许,不得转载