跳至主要內容

Apache Calcite

程序员李某某大约 3 分钟

Apache Calcite

Calcite 解析SQL

依赖

<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <version>1.26.0</version>
</dependency>

解析

public class CalciteParserDemo {
    public static void main(String[] args) throws SqlParseException {
        // 提供SQL语句
        String sql = "select * from emps where id=1";
        // 生成sql解析配置
        SqlParser.Config parserConfig = SqlParser.config()
                .withParserFactory(SqlParserImpl.FACTORY)
                // 没有被特殊符号包裹的是否转换
                .withUnquotedCasing(Casing.UNCHANGED)
                .withQuotedCasing(Casing.UNCHANGED)
                // 是否区分大小写
                .withCaseSensitive(false)
                // 按照mysql语法解析
                .withConformance(SqlConformanceEnum.MYSQL_5);

        // 创建sql解析器
        SqlParser parser = SqlParser.create(sql, parserConfig);
        // 执行解析
        SqlNode sqlNode = parser.parseQuery();
        // 输出解析结果
        System.out.println(sqlNode.toString());
        // 输出配置
        SqlWriterConfig sqlWriterConfig = SqlPrettyWriter.config()
                // 添加括号
                .withAlwaysUseParentheses(true)
                // 逗号后换行
                .withUpdateSetListNewline(false)
                // 缩进4个空格
                .withQuoteAllIdentifiers(false)
                // 换行方式
                .withFromFolding(SqlWriterConfig.LineFolding.TALL)
                // 缩进
                .withIndentation(0)
                // 关键字转小写
                .withKeywordsLowerCase(true)
                // 输出方言
                .withDialect(OracleSqlDialect.DEFAULT);
        System.out.println();
        System.out.println(sqlNode.toSqlString(c -> sqlWriterConfig));

    }
}

连接数据源

两个Mysql数据源

public class MysqlCalciteDemo {

    public static void main(String[] args) throws SQLException {
        String calciteInfo = "{\n" +
                "  \"version\": \"1.0\",\n" +
                "  \"defaultSchema\": \"logistics_2900\",\n" +
                "  \"schemas\": [\n" +
                "    {\n" +
                "      \"name\": \"logistics_2900\",\n" +
                "      \"type\": \"custom\",\n" +
                "      \"factory\": \"org.apache.calcite.adapter.jdbc.JdbcSchema$Factory\",\n" +
                "      \"operand\": {\n" +
                "        \"jdbcDriver\": \"com.mysql.cj.jdbc.Driver\",\n" +
                "        \"jdbcUrl\": \"jdbc:mysql://localhost:3306/logistics_2900?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai\",\n" +
                "        \"jdbcUser\": \"root\",\n" +
                "        \"jdbcPassword\": \"root\"\n" +
                "      }\n" +
                "    },\n" +
                "    {\n" +
                "      \"name\": \"hibernate_test\",\n" +
                "      \"type\": \"custom\",\n" +
                "      \"factory\": \"org.apache.calcite.adapter.jdbc.JdbcSchema$Factory\",\n" +
                "      \"operand\": {\n" +
                "        \"jdbcDriver\": \"com.mysql.cj.jdbc.Driver\",\n" +
                "        \"jdbcUrl\": \"jdbc:mysql://localhost:3306/hibernate_test?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai\",\n" +
                "        \"jdbcUser\": \"root\",\n" +
                "        \"jdbcPassword\": \"root\"\n" +
                "      }\n" +
                "    }\n" +
                "  ]\n" +
                "}";
        // 连接
        Connection connection = DriverManager.getConnection("jdbc:calcite:lex=MYSQL;model=inline:" + calciteInfo);
        // 获取statement 查询数据
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery("select * from logistics_2900.model_dynamic_fields as a "  +
                "left join hibernate_test.customer as b on a.id = b.id");
        while (resultSet.next()) {
            System.out.println(resultSet.getString(1) + " " + resultSet.getString(2) + " " + resultSet.getString(3));
        }
        resultSet.close();
        statement.close();
        connection.close();

    }
}

自定义csv数据源

创建csv文件

classpath:csv/test.csv

ID:VARCHAR,NAME:VARCHAR,AGE:INTEGER
 1,Alice,25
 2,Bob,30
 3,Charlie,35
 4,David,40
 5,Edith,45
 6,Fred,50
 7,Gina,55
 8,Harry,60
 9,Irene,65
 10,Judy,70
 11,Kate,75
 12,Larry,80
 13,Mary,85

测试代码

public class CsvCalciteDemo {

    public static void main(String[] args) throws SQLException {
        String calciteInfo = "{\n" +
                "  \"version\": \"1.0\",\n" +
                "  \"defaultSchema\": \"CSV_DB\",\n" +
                "  \"schemas\": [\n" +
                "    {\n" +
                "      \"name\": \"CSV_DB\",\n" +
                "      \"type\": \"custom\",\n" +
                "      \"factory\": \"com.jd.starlink.calcite.csv.CsvSchemaFactory\",\n" +
                "      \"operand\": {\n" +
                "        \"dataFile\": \"./csv/test.csv\"\n" +
                "      }\n" +
                "    }\n" +
                "  ]\n" +
                "}";
        // 连接
        Connection connection = DriverManager.getConnection("jdbc:calcite:model=inline:" + calciteInfo);
        // 获取statement 查询数据
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery("select * from CSV_DB.TEST as a where a.AGE in (55,25,85) ");

        ResultSetMetaData metaData = resultSet.getMetaData();
        int columnCount = metaData.getColumnCount();

        while (resultSet.next()) {
            for (int i = 1; i <= columnCount; i++) {
                System.out.print(metaData.getColumnName(i) + ":" + resultSet.getString(i) + " ");
            }
            System.out.println();
        }
        resultSet.close();
        statement.close();
        connection.close();

    }
}

Schema 工厂

我们需要创建一个工厂类CsvSchemaFactory

public class CsvSchemaFactory implements SchemaFactory {


    /**
     * 创建Schema
     * @param schemaPlus
     * @param s
     * @param map operand 对应的配置信息
     * @return
     */
    @Override
    public Schema create(SchemaPlus schemaPlus, String s, Map<String, Object> map) {


        return new CsvSchema(map.get("dataFile").toString());
    }
}

Schema 解析实现

创建一个解析Schema的类CsvSchema

public class CsvSchema extends AbstractSchema {
    private final String dataFile;
    private Map<String, Table> tableMap;

    public CsvSchema(String dataFile) {
        this.dataFile = dataFile;
    }

    // tableMap 中,key为表名,v为表信息
    @Override
    protected Map<String, Table> getTableMap() {
        URL url = Resources.getResource(dataFile);
        Source source = Sources.of(url);
        if (tableMap == null) {
            ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
            String tableName = dataFile.substring(dataFile.lastIndexOf("/") + 1, dataFile.lastIndexOf("."));
            builder.put(tableName.toUpperCase(), new CsvTable(source));
            tableMap = builder.build();
        }
        return tableMap;
    }
}

Table 解析实现

创建一个Table解析类CsvTable

public class CsvTable extends AbstractTable implements ScannableTable {

    private Source source;

    public CsvTable(Source source) {
        this.source = source;
    }

    /**
     * 读取csv文件
     * @param dataContext
     * @return
     */
    @Override
    public Enumerable<Object[]> scan(DataContext dataContext) {
        return new AbstractEnumerable<Object[]>() {
            @Override
            public Enumerator<Object[]> enumerator() {
                return new CsvEnumerator<>(source);
            }
        };
    }

    /**
     * 获取字段的类型
     * @param relDataTypeFactory
     * @return
     */
    @Override
    public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
        JavaTypeFactory typeFactory = (JavaTypeFactory) relDataTypeFactory;
        List<String> names = Lists.newLinkedList();
        List<RelDataType> types = Lists.newLinkedList();
        try (BufferedReader reader = new BufferedReader(new FileReader(source.file()))) {
            String line = reader.readLine();
            Lists.newArrayList(line.split(",")).forEach(c -> {
                String name = c.split(":")[0];
                String type = c.split(":")[1];
                RelDataType sqlType = typeFactory.createSqlType(SqlTypeName.get(type));
                names.add(name);
                types.add(sqlType);
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
        return typeFactory.createStructType(Pair.zip(names, types));
    }
}

数据读取实现

创建一个数据读取类CsvEnumerator

public class CsvEnumerator<E> implements Enumerator<E> {
    private E current;
    private BufferedReader reader;
    // 改为原子类,并发安全
    private AtomicInteger lineNumber = new AtomicInteger();


    public CsvEnumerator(Source source) {
        try {
            this.reader = new BufferedReader(source.reader());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public E current() {
        return current;
    }

    @Override
    public boolean moveNext() {
        try {
            String line = reader.readLine();
            if(lineNumber.getAndIncrement() == 0){
                // 忽略第一行表头
                line = reader.readLine();
            }
            if (line == null) {
                return false;
            }
            current = (E) line.split(",");
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }

        return true;
    }

    @Override
    public void reset() {

    }

    @Override
    public void close() {

    }
}
上次编辑于:
贡献者: 李元昊