Apache Calcite
大约 3 分钟
Apache Calcite
Calcite 解析SQL
依赖
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.26.0</version>
</dependency>
解析
public class CalciteParserDemo {
public static void main(String[] args) throws SqlParseException {
// 提供SQL语句
String sql = "select * from emps where id=1";
// 生成sql解析配置
SqlParser.Config parserConfig = SqlParser.config()
.withParserFactory(SqlParserImpl.FACTORY)
// 没有被特殊符号包裹的是否转换
.withUnquotedCasing(Casing.UNCHANGED)
.withQuotedCasing(Casing.UNCHANGED)
// 是否区分大小写
.withCaseSensitive(false)
// 按照mysql语法解析
.withConformance(SqlConformanceEnum.MYSQL_5);
// 创建sql解析器
SqlParser parser = SqlParser.create(sql, parserConfig);
// 执行解析
SqlNode sqlNode = parser.parseQuery();
// 输出解析结果
System.out.println(sqlNode.toString());
// 输出配置
SqlWriterConfig sqlWriterConfig = SqlPrettyWriter.config()
// 添加括号
.withAlwaysUseParentheses(true)
// 逗号后换行
.withUpdateSetListNewline(false)
// 缩进4个空格
.withQuoteAllIdentifiers(false)
// 换行方式
.withFromFolding(SqlWriterConfig.LineFolding.TALL)
// 缩进
.withIndentation(0)
// 关键字转小写
.withKeywordsLowerCase(true)
// 输出方言
.withDialect(OracleSqlDialect.DEFAULT);
System.out.println();
System.out.println(sqlNode.toSqlString(c -> sqlWriterConfig));
}
}
连接数据源
两个Mysql数据源
public class MysqlCalciteDemo {
public static void main(String[] args) throws SQLException {
String calciteInfo = "{\n" +
" \"version\": \"1.0\",\n" +
" \"defaultSchema\": \"logistics_2900\",\n" +
" \"schemas\": [\n" +
" {\n" +
" \"name\": \"logistics_2900\",\n" +
" \"type\": \"custom\",\n" +
" \"factory\": \"org.apache.calcite.adapter.jdbc.JdbcSchema$Factory\",\n" +
" \"operand\": {\n" +
" \"jdbcDriver\": \"com.mysql.cj.jdbc.Driver\",\n" +
" \"jdbcUrl\": \"jdbc:mysql://localhost:3306/logistics_2900?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai\",\n" +
" \"jdbcUser\": \"root\",\n" +
" \"jdbcPassword\": \"root\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"name\": \"hibernate_test\",\n" +
" \"type\": \"custom\",\n" +
" \"factory\": \"org.apache.calcite.adapter.jdbc.JdbcSchema$Factory\",\n" +
" \"operand\": {\n" +
" \"jdbcDriver\": \"com.mysql.cj.jdbc.Driver\",\n" +
" \"jdbcUrl\": \"jdbc:mysql://localhost:3306/hibernate_test?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai\",\n" +
" \"jdbcUser\": \"root\",\n" +
" \"jdbcPassword\": \"root\"\n" +
" }\n" +
" }\n" +
" ]\n" +
"}";
// 连接
Connection connection = DriverManager.getConnection("jdbc:calcite:lex=MYSQL;model=inline:" + calciteInfo);
// 获取statement 查询数据
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from logistics_2900.model_dynamic_fields as a " +
"left join hibernate_test.customer as b on a.id = b.id");
while (resultSet.next()) {
System.out.println(resultSet.getString(1) + " " + resultSet.getString(2) + " " + resultSet.getString(3));
}
resultSet.close();
statement.close();
connection.close();
}
}
自定义csv数据源
创建csv文件
classpath:csv/test.csv
ID:VARCHAR,NAME:VARCHAR,AGE:INTEGER
1,Alice,25
2,Bob,30
3,Charlie,35
4,David,40
5,Edith,45
6,Fred,50
7,Gina,55
8,Harry,60
9,Irene,65
10,Judy,70
11,Kate,75
12,Larry,80
13,Mary,85
测试代码
public class CsvCalciteDemo {
public static void main(String[] args) throws SQLException {
String calciteInfo = "{\n" +
" \"version\": \"1.0\",\n" +
" \"defaultSchema\": \"CSV_DB\",\n" +
" \"schemas\": [\n" +
" {\n" +
" \"name\": \"CSV_DB\",\n" +
" \"type\": \"custom\",\n" +
" \"factory\": \"com.jd.starlink.calcite.csv.CsvSchemaFactory\",\n" +
" \"operand\": {\n" +
" \"dataFile\": \"./csv/test.csv\"\n" +
" }\n" +
" }\n" +
" ]\n" +
"}";
// 连接
Connection connection = DriverManager.getConnection("jdbc:calcite:model=inline:" + calciteInfo);
// 获取statement 查询数据
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from CSV_DB.TEST as a where a.AGE in (55,25,85) ");
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
for (int i = 1; i <= columnCount; i++) {
System.out.print(metaData.getColumnName(i) + ":" + resultSet.getString(i) + " ");
}
System.out.println();
}
resultSet.close();
statement.close();
connection.close();
}
}
Schema 工厂
我们需要创建一个工厂类CsvSchemaFactory
public class CsvSchemaFactory implements SchemaFactory {
/**
* 创建Schema
* @param schemaPlus
* @param s
* @param map operand 对应的配置信息
* @return
*/
@Override
public Schema create(SchemaPlus schemaPlus, String s, Map<String, Object> map) {
return new CsvSchema(map.get("dataFile").toString());
}
}
Schema 解析实现
创建一个解析Schema的类CsvSchema
public class CsvSchema extends AbstractSchema {
private final String dataFile;
private Map<String, Table> tableMap;
public CsvSchema(String dataFile) {
this.dataFile = dataFile;
}
// tableMap 中,key为表名,v为表信息
@Override
protected Map<String, Table> getTableMap() {
URL url = Resources.getResource(dataFile);
Source source = Sources.of(url);
if (tableMap == null) {
ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
String tableName = dataFile.substring(dataFile.lastIndexOf("/") + 1, dataFile.lastIndexOf("."));
builder.put(tableName.toUpperCase(), new CsvTable(source));
tableMap = builder.build();
}
return tableMap;
}
}
Table 解析实现
创建一个Table解析类CsvTable
public class CsvTable extends AbstractTable implements ScannableTable {
private Source source;
public CsvTable(Source source) {
this.source = source;
}
/**
* 读取csv文件
* @param dataContext
* @return
*/
@Override
public Enumerable<Object[]> scan(DataContext dataContext) {
return new AbstractEnumerable<Object[]>() {
@Override
public Enumerator<Object[]> enumerator() {
return new CsvEnumerator<>(source);
}
};
}
/**
* 获取字段的类型
* @param relDataTypeFactory
* @return
*/
@Override
public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
JavaTypeFactory typeFactory = (JavaTypeFactory) relDataTypeFactory;
List<String> names = Lists.newLinkedList();
List<RelDataType> types = Lists.newLinkedList();
try (BufferedReader reader = new BufferedReader(new FileReader(source.file()))) {
String line = reader.readLine();
Lists.newArrayList(line.split(",")).forEach(c -> {
String name = c.split(":")[0];
String type = c.split(":")[1];
RelDataType sqlType = typeFactory.createSqlType(SqlTypeName.get(type));
names.add(name);
types.add(sqlType);
});
} catch (Exception e) {
e.printStackTrace();
}
return typeFactory.createStructType(Pair.zip(names, types));
}
}
数据读取实现
创建一个数据读取类CsvEnumerator
public class CsvEnumerator<E> implements Enumerator<E> {
private E current;
private BufferedReader reader;
// 改为原子类,并发安全
private AtomicInteger lineNumber = new AtomicInteger();
public CsvEnumerator(Source source) {
try {
this.reader = new BufferedReader(source.reader());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public E current() {
return current;
}
@Override
public boolean moveNext() {
try {
String line = reader.readLine();
if(lineNumber.getAndIncrement() == 0){
// 忽略第一行表头
line = reader.readLine();
}
if (line == null) {
return false;
}
current = (E) line.split(",");
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
@Override
public void reset() {
}
@Override
public void close() {
}
}
