java使用datax扩展verticawriter

小小编辑 1年前 ⋅ 249 阅读

创建verticawriter module

datax的目录结构都比较类似,可以直接复制mysqlwriter改为verticawriter 1691462709451.jpg

替换pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.alibaba.datax</groupId>
        <artifactId>datax-all</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>verticawriter</artifactId>
    <name>verticawriter</name>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-common</artifactId>
            <version>${datax-project-version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>plugin-rdbms-util</artifactId>
            <version>${datax-project-version}</version>
        </dependency>
        <dependency>
            <groupId>com.vertica</groupId>
            <artifactId>vertica-jdbc</artifactId>
            <version>9.3.1-0</version>
        </dependency>
        <!-- hutool -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.6.7</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.2.6</version>
        </dependency>
        <dependency>
            <groupId>org.rocksdb</groupId>
            <artifactId>rocksdbjni</artifactId>
            <version>6.22.1.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- compiler plugin -->
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${jdk-version}</source>
                    <target>${jdk-version}</target>
                    <encoding>${project-sourceEncoding}</encoding>
                </configuration>
            </plugin>
            <!-- assembly plugin -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptors>
                        <descriptor>src/main/assembly/package.xml</descriptor>
                    </descriptors>
                    <finalName>datax</finalName>
                </configuration>
                <executions>
                    <execution>
                        <id>dwzip</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

java代码

VerticaConstant

public class VerticaConstant {

    public interface VERTICA_TYPE {
        String NUMBER = "6";
        String VARCHAR = "9";
        String LONG_VARCHAR = "115";
        String TIMESTAMP = "13";
    }
}

VerticaWriter

import cn.hutool.core.util.StrUtil;
import cn.hutool.db.DbUtil;
import cn.hutool.db.Entity;
import cn.hutool.db.handler.EntityListHandler;
import cn.hutool.db.sql.SqlExecutor;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class VerticaWriter extends Writer {

    private static final DataBaseType DATABASE_TYPE = DataBaseType.Vertica;

    public static class Job extends Writer.Job {
        private static final Logger LOG = LoggerFactory
                .getLogger(Job.class);
        private Configuration originalConfig = null;
        private CommonRdbmsWriter.Job commonRdbmsWriterJob;

        @Override
        public void preCheck(){
            this.init();
            this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE);
        }

        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();
            this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
            this.commonRdbmsWriterJob.init(this.originalConfig);
        }

        // 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)
        @Override
        public void prepare() {
            originalConfig.set(Constant.TABLE_NUMBER_MARK, 1);
            //实跑先不支持 权限 检验
            //this.commonRdbmsWriterJob.privilegeValid(this.originalConfig, DATABASE_TYPE);
            this.commonRdbmsWriterJob.prepare(this.originalConfig);
        }

        @Override
        public List<Configuration> split(int mandatoryNumber) {
            return this.commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);
        }

        // 一般来说,是需要推迟到 task 中进行post 的执行(单表情况例外)
        @Override
        public void post() {
            this.commonRdbmsWriterJob.post(this.originalConfig);
        }

        @Override
        public void destroy() {
            this.commonRdbmsWriterJob.destroy(this.originalConfig);
        }

    }

    public static class Task extends Writer.Task {

        private static final Logger LOG = LoggerFactory
                .getLogger(Task.class);
        private Configuration writerSliceConfig;
        private CommonRdbmsWriter.Task commonRdbmsWriterTask;

        private int fetchSize = 500;

        DataSource dataSource;

        static final String JDBC_DRIVER="com.vertica.jdbc.Driver";
        public static String INSERT_SQL = "insert /*+ direct */ into %s (%s) values (%s)";
        public static String PREFIX_TMP_TABLE = "tmp_";
        public static final String DROP_TABLE_SQL = "drop table if exists %s";
        public static final String BATCH_INSERT_SQL = "insert into %s.%s (%s)\n" +
                "select %s\n";
        public static final String SQL_UNION_ALL = " union all select %s \n";
        public static final String CREATE_TMP_TABLE_SQL = "create table if not exists %s like %s including projections";
        public static final String MERGE_SQL = "merge into %s t using %s s on (%s) when matched then %s when not matched then %s";
        public static final String MERGE_UPDATE_SQL = " UPDATE SET %s";
        public static final String MERGE_INSERT_SQL = " INSERT (%s) VALUES (%s)";


        public static String UPDATE_SQL = "update /*+ direct */ %s set %s where %s";
        public static final String SCHEMA_SQL = "select column_name,data_type,is_nullable,is_identity,data_type_id,data_type_length,table_schema,table_name from columns where table_schema='%s' and table_name='%s'";


        List<String> columns;
        String tables;
        Map<String, String> columnType;
        String schema = "public";
        String primaryKey = "id";
        String whereSql = "";
        String mergeUpdateSql = "";
        String mergeInsertSql = "";

        public List<Entity> queryColumns(String schema, String table) {
            List<Entity> entities = query(String.format(SCHEMA_SQL, schema, table));
            entities.forEach(e -> {
                columnType.put(e.getStr("column_name"), e.getStr("data_type_id"));

            });
            whereSql = String.format(" t.%s=s.%s ", primaryKey, primaryKey);
            return entities;
        }

        public void doCreateParams(Configuration writerSliceConfig) {
            columnType = new HashMap<>();
            String username = writerSliceConfig.getString(Key.USERNAME);
            String password = writerSliceConfig.getString(Key.PASSWORD);
            columns = writerSliceConfig.getList(Key.COLUMN, String.class);
            StringBuilder columnsMergeUpdate = new StringBuilder();
            StringBuilder columnsMergeInsert = new StringBuilder();
            columns.forEach(t -> {
                columnsMergeUpdate.append(t).append("=s.").append(t).append(",");
                columnsMergeInsert.append("s.").append(t).append(",");
            });
            mergeUpdateSql = String.format(MERGE_UPDATE_SQL, StrUtil.removeSuffix(columnsMergeUpdate.toString(), ","));
            mergeInsertSql = String.format(MERGE_INSERT_SQL, StringUtils.join(columns, ","), StrUtil.removeSuffix(columnsMergeInsert.toString(), ","));

            String keySchema = writerSliceConfig.getString(Key.SCHEMA);
            if (StringUtils.isNotBlank(keySchema)) {
                schema = keySchema;
            }
            String primaryColumne = writerSliceConfig.getString(Key.PRIMARY_KEY);
            if (StringUtils.isNotBlank(primaryColumne)) {
                primaryKey = primaryColumne;
            }
            int batchSize = writerSliceConfig.getInt(Key.BATCH_SIZE);
            if (batchSize > 1) {
                fetchSize = batchSize;
            }
            String jdbcUrl = writerSliceConfig.getString(Key.JDBC_URL);
            tables = writerSliceConfig.getString(Key.TABLE);
            dataSource = new DruidDataSource();
            ((DruidDataSource) dataSource).setUrl(jdbcUrl);
            ((DruidDataSource) dataSource).setUsername(username);
            ((DruidDataSource) dataSource).setPassword(password);
            ((DruidDataSource) dataSource).setDriverClassName(JDBC_DRIVER);

        }

        @Override
        public void init() {
            this.writerSliceConfig = super.getPluginJobConf();
            this.commonRdbmsWriterTask = new CommonRdbmsWriter.Task(DATABASE_TYPE);
//            this.commonRdbmsWriterTask.init(this.writerSliceConfig);
            doCreateParams(writerSliceConfig);
            LOG.info("init:"+writerSliceConfig.toJSON());
        }

        @Override
        public void prepare() {
//            this.commonRdbmsWriterTask.prepare(this.writerSliceConfig);
        }

        //写入数据方法
        public void startWrite(RecordReceiver recordReceiver) {
//            this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,
//                    super.getTaskPluginCollector());
            Record record = null;
            queryColumns(schema, tables);
            // 1.先删除原来的临时表
            String tmpTable = PREFIX_TMP_TABLE + tables;
            String dropSql = dropTmpTable(tmpTable);
            LOG.info("dropsql:" + dropSql);
            exec(dropSql);
            // 2.创建临时表
            String genTmpSql = createTmpTable(tmpTable, tables);
            LOG.info("genTmpSql:" + genTmpSql);
            exec(genTmpSql);
            // 3.批量写入临时表
            StringBuilder batchInsertSql = new StringBuilder();
            boolean firstInsert = true;
            int flag = 0;
            while ((record = recordReceiver.getFromReader()) != null) {
                flag ++;
                List<String> values = new ArrayList<>();
                for (int i = 0; i < columns.size(); i++) {
                    String columnName = columns.get(i);
                    String colType = columnType.get(columnName);
                    Object value = record.getColumn(i).getRawData();
                    LOG.info("value:" + value);
                    if (value != null) {
                        values.add(transformData(colType, value.toString())+"");
                    } else {
                        values.add(null+"");
                    }

                }

                if (firstInsert) {
                    String sql = genBatchInsert(columns, values, tmpTable, schema);
                    firstInsert = false;
                    batchInsertSql.append(sql);
                } else {
                    String sql = genUnionSql(values);
                    batchInsertSql.append(sql);
                }
                if (flag > fetchSize) {

                    // 执行保存
                    LOG.info("total:"+flag+",sql:" +batchInsertSql.toString());
                    saveBatchInsert(batchInsertSql.toString());

                    flag = 0;
                    firstInsert = true;
                    batchInsertSql = new StringBuilder();
                }
//                exec(sql);
            }
            saveBatchInsert(batchInsertSql.toString());
            // 4.merge into
            mergeRecord(tmpTable, tables, whereSql, mergeUpdateSql, mergeInsertSql);
        }

        public void saveBatchInsert(String batchInsertSql){
            LOG.info("sql:" +batchInsertSql.toString());
            if(StringUtils.isNotBlank(batchInsertSql)) {
                exec(batchInsertSql);
            }
        }

        public String transformData(String colType, String value) {
            switch (colType) {
                case VerticaConstant.VERTICA_TYPE.NUMBER:
                    return value;
                case VerticaConstant.VERTICA_TYPE.VARCHAR:
                case VerticaConstant.VERTICA_TYPE.LONG_VARCHAR:
                    //如果是字符串字段
                    StringBuilder values_clause = new StringBuilder();
//                    value = "'"+new Timestamp(Long.valueOf(value))+"'";
                   value = StrUtil.replace(StrUtil.toString(value), "'", "''");
                    values_clause.append("'").append(value).append("'");
                    return values_clause.toString();
                case VerticaConstant.VERTICA_TYPE.TIMESTAMP:
                    //如果是字符串字段
                    StringBuilder temp = new StringBuilder();
//                    value = "'"+new Timestamp(Long.valueOf(value))+"'";
                    value = StrUtil.replace(StrUtil.toString(value), "'", "''");
                    value = StringUtils.removeEnd(value, "000");
                    temp.append("to_timestamp('").append(value).append("')");
                    return temp.toString();
                default:
                    return value;
            }
        }

        public static String dropTmpTable(String tmpTable) {
            String sql = String.format(DROP_TABLE_SQL, tmpTable);
            return sql;
        }

        public static String createTmpTable(String tmpTable, String targetTable) {
            String sql = String.format(CREATE_TMP_TABLE_SQL, tmpTable, targetTable);
            return sql;
        }

        public static String genBatchInsert(List<String> columnHolders, List<String> valueHolders, String table, String schema) {
            String sql = String.format(BATCH_INSERT_SQL, schema,
                    table,
                    StringUtils.join(columnHolders, ","),
                    StringUtils.join(valueHolders, ","));
            return sql;
        }

        public static String genUnionSql(List<String> valueHolders) {
            String sql = String.format(SQL_UNION_ALL,  StringUtils.join(valueHolders, ","));
            return sql;
        }

        public static String getBatchWriteTemplate(List<String> columnHolders, List<String> valueHolders, String table) {
            String sql = String.format(INSERT_SQL,
                    table,
                    StringUtils.join(columnHolders, ","),
                    StringUtils.join(valueHolders, ","));
            return sql;
        }

        public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String table) {
            String sql = String.format(INSERT_SQL,
                    table,
                    StringUtils.join(columnHolders, ","),
                    StringUtils.join(valueHolders, ","));
            return sql;
        }


        public List<Entity> query(String sql, String... params) {
            Connection conn = null;
            List<Entity> entityList = null;
            try {
                conn = dataSource.getConnection();
                conn.setAutoCommit(false);
                entityList = SqlExecutor.query(conn, sql, new EntityListHandler(), params);
                conn.commit();
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                DbUtil.close(conn);
            }
            return entityList;
        }

        protected void exec(String sql){
            Connection conn = null;
            int[] count = null;
            try {
                conn = dataSource.getConnection();
                conn.setAutoCommit(false);

                Statement statement = conn.createStatement();

                statement.addBatch(sql);
                count = statement.executeBatch();
                conn.commit();

                statement.close();
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                DbUtil.close(conn);
            }
        }

        protected void mergeRecord(String tmpTable, String table, String whereSql, String columnUpdate, String columnInsert) {
            try {
                Connection connection = dataSource.getConnection();
                connection.setAutoCommit(false);
                Statement sm = connection.createStatement();
                String sql = String.format(MERGE_SQL, table, tmpTable, whereSql, columnUpdate, columnInsert);
                LOG.info("mergeRecord:" + sql);
                int i = sm.executeUpdate(sql);
                System.out.println(i);
                connection.commit();
                connection.close();
                sm.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        protected void doBatchInsert(Connection connection, List<Record> buffer)
                throws SQLException {
            PreparedStatement preparedStatement = null;
            try {
                connection.setAutoCommit(false);
                String sql = "MERGE INTO public.base_market_info tg USING public.base_market_info_temp tem\n" +
                        "ON (tg.market_id = tem.market_id)\n" +
                        "WHEN MATCHED THEN UPDATE SET\n" +
                        "    market_name = tem.market_name,\n" +
                        "    market_store_id = tem.market_store_id,\n" +
                        "    updatetime = tem.updatetime\n" +
                        "WHEN NOT MATCHED THEN\n" +
                        "INSERT VALUES\n" +
                        "        tem.market_id,\n" +
                        "        tem.market_name,\n" +
                        "        tem.updatetime\n" +
                        "    )";
                preparedStatement = connection
                        .prepareStatement(sql);

                for (Record record : buffer) {
//                    preparedStatement = fillPreparedStatement(
//                            preparedStatement, record);
                    preparedStatement.addBatch();
                }
                preparedStatement.executeBatch();
                connection.commit();
            } catch (SQLException e) {
                e.printStackTrace();
                LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为:" + e.getMessage());
            } catch (Exception e) {
                throw DataXException.asDataXException(
                        DBUtilErrorCode.WRITE_DATA_ERROR, e);
            } finally {
                DBUtil.closeDBResources(preparedStatement, null);
            }
        }

        @Override
        public void post() {
            this.commonRdbmsWriterTask.post(this.writerSliceConfig);
        }

        @Override
        public void destroy() {
            this.commonRdbmsWriterTask.destroy(this.writerSliceConfig);
        }

        @Override
        public boolean supportFailOver(){
            String writeMode = writerSliceConfig.getString(Key.WRITE_MODE);
            return "replace".equalsIgnoreCase(writeMode);
        }

    }

}

配置文件

plugin.json

{
  "name": "verticawriter",
  "class": "com.xxxx.data.plugin.writer.verticawriter.VerticaWriter",
  "description": "vertica writer",
  "developer": "cx"
}

plugin_job_template.json

{
    "name": "verticawriter",
    "parameter": {
        "username": "",
        "password": "",
        "column": [],
        "connection": [
            {
            "jdbcUrl": "",
            "table": []
            }
        ]
    }
}

datax模板

{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "password": "xxxx",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://xxx:3306/db"
                                ],
                                "querySql": [
                                    "select id as id,ds as ds,log_time as process_time from test"
                                ]
                            }
                        ],
                        "username": "xxxx"
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "parameter": {
                        "password": "",
                        "batchSize": 1000,
                        "column": [
                            "id",
                            "ds",
                            "process_time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:vertica://xxxx:5433/test1",
                                "table": [
                                    "xxxx"
                                ]
                            }
                        ],
                        "writeMode": "update",
                        "username": ""
                    },
                    "name": "verticawriter",
                    "writeMode": "update"
                }
            }
        ],
        "setting": {
            "speed": {
                "byte": 1048576,
                "channel": 1
            }
        }
    }
}

这篇文章免去了很多细节,详细步骤请参考datax扩展vertica插件