目录

  1. 说明
  2. druid和elasticsearch-sql梳理
    1. 流程
    2. 总结
  3. Mybatis查询过程梳理
    1. 流程
    2. 总结
  4. elasticsearch和mybatis整合
    1. 说明
    2. 构造元数据信息
    3. 重写物理连接的创建方法
    4. 重写DruidPooledPreparedStatement中execute和setXXX方法
    5. 重写ElasticSearchConnection中prepareStatement创建,resultSet获取和资源释放等方法
    6. 修改ElasticSearchDruidPooledConnection.java
    7. 其它修改
    8. 创建datasrouce-es.xml并import到apppicationContext.xml
  5. 关于分页查询
    1. ElasticsearchServiceBase.java
    2. ElasticsearchPaginationInterceptor.java
  6. sql支持
  7. maven依赖
  8. 后续

说明

elasticearch即es本身没有JDBC驱动,是无法像mybatis管理数据库连接池进行数据库操作的。在GitHub上有个elasticsearch-sql的项目,作为es的插件后能够使用sql对es进行数据检索,项目提供了一个支持JDBC的实验特性,在druid的基础上实现了部分JDBC的功能。GitHub上目前仅给出了一个使用用例

2017-8-12修改:
重写了该项目,分离了Druid连接池,不再限制使用Druid,添加了Driver,项目已共享至GitHub elasticsearch-jdbc,欢迎fork

1
2
3
4
5
6
7
8
9
10
11
12
13
Properties properties = new Properties();
properties.put("url", "jdbc:elasticsearch://192.168.70.128:9300/");
DruidDataSource dds = (DruidDataSource) ElasticSearchDruidDataSourceFactory.createDataSource(properties);
Connection connection = dds.getConnection();
PreparedStatement ps = connection.prepareStatement("SELECT * from radiott");
ResultSet resultSet = ps.executeQuery();
List<String> result = new ArrayList<String>();
while (resultSet.next()) {
System.out.println(resultSet.getString("id") + "," + resultSet.getString("name"));
}
ps.close();
connection.close();
dds.close();

druid和elasticsearch-sql梳理

流程

主要是在Druid连接池上进行了继承改造,实现了部分JDBC规范

  1. DruidDataSource dds = (DruidDataSource) ElasticSearchDruidDataSourceFactory.createDataSource(properties);

创建一个继承自DruidDataSource的ElasticSearchDruidDataSource类,重写创建物理连接的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void init() throws SQLException {
...
// init connections
for (int i = 0, size = getInitialSize(); i < size; ++i) {
Connection conn = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, conn);
connections[poolingCount] = holder;
incrementPoolingCount();
}
...
}
@Override
public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
Connection conn = new ElasticSearchConnection(url);
createCount.incrementAndGet();
return conn;
}

ElasticSearchConnection继承自Connection接口,在构造方法中实现client的实例化,创建一个es的客户端。重写prepareStatement方法,返回一个PreparedStatement接口的空实现。因为prepareStatement返回的是PreparedStatement的空实现,所以并没有PreparedStatement的实际功能,仅仅是为了符合JDBC的规范。所有操作都是通过client完成。

  1. Connection connection = dds.getConnection();

重写创建内部连接getConnectionInternal的代码,在druid初始化时创建初始连接时,在内部就是调用的getConnectionInternal方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public DruidPooledConnection getConnection() throws SQLException {
return getConnection(maxWait);
}
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init();
return getConnectionDirect(maxWaitMillis);
}
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
...
poolableConnection = getConnectionInternal(maxWaitMillis);
...
}
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
...
DruidConnectionHolder holder;
...
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
...
DruidPooledConnection poolalbeConnection = new ElasticSearchDruidPooledConnection(holder);
return poolalbeConnection;
}

DruidPooledConnection是一个很重要的类,它保存在DruidDataSource属性connections数组中,在连接池初始化时或后续通过createPhysicalConnection方法创建的真实连接就保存在
DruidPooledConnection实例的conn属性中。

通过holder创建一个DruidPooledConnection poolalbeConnection = new ElasticSearchDruidPooledConnection(holder)示例,我们通过getConnection()方法获取到的连接就是poolalbeConnection实例。==DruidConnectionHolder的实例holder保存在DruidPooledConnection实例的holder属性中==。在DruidPooledConnection的构造函数中,执行了一次this.conn = holder.getConnection();即把holder保存的==实际物理连接Connection实例保存在DruidPooledConnection的conn属性中==。

  1. PreparedStatement ps = connection.prepareStatement(“SELECT * from radiott”);

创建一个继承自DruidPooledConnection的类ElasticSearchDruidPooledConnection,重写prepareStatement方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
checkState();
PreparedStatementHolder stmtHolder = null;
DruidPooledPreparedStatement.PreparedStatementKey key = new DruidPooledPreparedStatement.PreparedStatementKey(sql, getCatalog(), PreparedStatementPool.MethodType.M1);
boolean poolPreparedStatements = holder.isPoolPreparedStatements();
if (poolPreparedStatements) {
stmtHolder = holder.getStatementPool().get(key);
}
if (stmtHolder == null) {
try {
stmtHolder = new PreparedStatementHolder(key, conn.prepareStatement(sql));
holder.getDataSource().incrementPreparedStatementCount();
} catch (SQLException ex) {
handleException(ex);
}
}
initStatement(stmtHolder);
DruidPooledPreparedStatement rtnVal = new ElasticSearchDruidPooledPreparedStatement(this, stmtHolder);
holder.addTrace(rtnVal);
return rtnVal;
}

方法中创建了一个很重要的实例stmtHolder = new PreparedStatementHolder(key, conn.prepareStatement(sql))。==该实例作为参数传入ElasticSearchDruidPooledPreparedStatement的构造函数中最后保存在DruidPooledPreparedStatement的属性holder中==。

这个stmtHolder在和mybatis整合的过程中相当重要,它保存了当前连接中的statement(通过conn.prepareStatement(sql)创建),在这个statement中,有几个重要的方法getResultSet()、getUpdateCount()、getMoreResults(),mybatis最后在理结果时会调用。

最后,将创建的DruidPooledPreparedStatement实例rtnVal保存在holder(DruidConnectionHolder的实例)中,返回rtnVal。

  1. ResultSet resultSet = ps.executeQuery();

创建一个继承自DruidPooledPreparedStatement的类ElasticSearchDruidPooledPreparedStatement,重写构造方法和executeQuery方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Client client = null;
public ElasticSearchDruidPooledPreparedStatement(DruidPooledConnection conn, PreparedStatementHolder holder) throws SQLException {
super(conn, holder);
this.client = ((ElasticSearchConnection) conn.getConnection()).getClient();
}
@Override
public ResultSet executeQuery() throws SQLException {
...
try {
ObjectResult extractor = getObjectResult(true, getSql(), false, false, true);
List<String> headers = extractor.getHeaders();
List<List<Object>> lines = extractor.getLines();
ResultSet rs = new ElasticSearchResultSet(this, headers, lines);
if (rs == null) {
return null;
}
DruidPooledResultSet poolableResultSet = new DruidPooledResultSet(this, rs);
addResultSetTrace(poolableResultSet);
return poolableResultSet;
} catch (Throwable t) {
throw checkException(t);
} finally {
conn.afterExecute();
}
}
private ObjectResult getObjectResult(boolean flat, String query, boolean includeScore, boolean includeType, boolean includeId) throws SqlParseException, SQLFeatureNotSupportedException, Exception, CsvExtractorException {
SearchDao searchDao = new org.nlpcn.es4sql.SearchDao(client);
//String rewriteSQL = searchDao.explain(getSql()).explain().explain();
QueryAction queryAction = searchDao.explain(query);
Object execution = QueryActionElasticExecutor.executeAnyAction(searchDao.getClient(), queryAction);
return new ObjectResultsExtractor(includeScore, includeType, includeId).extractResults(execution, flat);
}
@Override
public int executeUpdate() throws SQLException {
throw new SQLException("executeUpdate not support in ElasticSearch");
}

在构造函数中取得实际物理连接Connection实例的自定义es客户端client。在自定义方法getObjectResult中连接es返回实际的数据。

重写executeQuery方法,在获得实际的es数据后,包装成ResultSet对象并返回,供外部遍历用。

executeQuery中有一个非常重要的类ElasticSearchResultSet,该类实现ResultSet接口并实现了next()、getMetaData()和几个getXXX()方法,该类是一个普通类,内部用iterator和current表示数据集和当前遍历的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ElasticSearchResultSet implements ResultSet {
Iterator<List<Object>> iterator;
List<Object> current = null;
List<String> headers = null;
private ResultSetMetaData metaData;
public ElasticSearchResultSet(Statement statement, final List<String> headers, final List<List<Object>> lines) {
this.iterator = lines.iterator();
this.headers = headers;
metaData = new ElasticSearchResultSetMetaDataBase(headers);
}
@Override
public boolean next() throws SQLException {
boolean hasNext = iterator.hasNext();
if (hasNext) {
current = iterator.next();
}
return hasNext;
}
...
@Override
public ResultSetMetaData getMetaData() throws SQLException {
return metaData;
}
@Override
public String getString(int columnIndex) throws SQLException {
return (String) current.get(columnIndex);
}
...
}

在ElasticSearchResultSet 内部,构造了一个ElasticSearchResultSetMetaDataBase实例作为数据集的元数据。该类继承自ResultSetMetaDataBase类。

1
2
3
4
5
6
7
8
9
10
11
public class ElasticSearchResultSetMetaDataBase extends ResultSetMetaDataBase {
private final List<ColumnMetaData> columns = new ArrayList<ColumnMetaData>();
public ElasticSearchResultSetMetaDataBase(List<String> headers) {
for(String column:headers){
ColumnMetaData columnMetaData = new ColumnMetaData();
columnMetaData.setColumnLabel(column);
columnMetaData.setColumnName(column);
columns.add(columnMetaData);
}
}
}

tip:数据列的信息保存在columns属性中,该属性作为ElasticSearchResultSetMetaDataBase的私有属性但未提供相关的访问方法,在最后和mybatis整合过程中mybatis是读取不到resultset的元数据信息的,整合的时候需要改。

总结

  1. 由连接池负责创建实际的物理连接==ElasticSearchConnection实例connection== ==》return conn
  2. 连接池创建==DruidConnectionHolder实例connHolder==,用conn属性保存第一步创建的connection,将自身实例connHolder保存在连接池的connections数组中
  3. 执行getConnection方法,在内部调用方法getConnectionInternal,从连接池connections中取得connHolder创建==DruidPooledConnection实例pooledConnection==,将connHolder保存在pooledConnection的holder属性中,将connHolder保存的connection取出保存在pooledConnection的conn属性中 ==》 return pooledConnection
  4. 执行pooledConnection的prepareStatement方法,在方法执行过程中,执行conn.prepareStatement(sql)方法获得==Statement实例stmt==,用stmt做入参创建==PreparedStatementHolder实例stmtHolder==,将stmt实例保存在statement属性中。用pooledConnection和stmtHolder做入参创建==DruidPooledPreparedStatement实例rtnVal==,将pooledConnection保存在rtnVal的conn属性中,stmtHolder保存在holder属性中,从stmtHolder取出statement保存在stmt属性中 ==》rtnVal
  5. 执行DruidPooledPreparedStatement实例rtnVal方法executeQuery方法获得resultSet结果集。
  6. 遍历结果集获取数据。

Mybatis查询过程梳理

流程

执行session.selectList(xxx,xxx)=>…=>defaultSqlsession.selectList(xxx)=>…=>BaseExcutor.query()=>BaseExcutor.queryFromDatabase()=>…=>SimpleExecutor.doQuery=>执行拦截器=>RoutingStatementHandler.query()=>PreparedStatementHandler.query()
在PreparedStatementHandler.query()中

1
2
3
4
5
public <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException {
PreparedStatement ps = (PreparedStatement) statement;
ps.execute(); //执行sql操作
return resultSetHandler.<E> handleResultSets(ps); //处理结果
}

sql操作

ps.execute()=>DruidPooledPreparedStatement.execute()=>…jdbc驱动执行sql从数据库取数据等操作….=>层层返回到execute()

处理结果

resultSetHandler. handleResultSets(ps)=>DefaultResultSetHandler.handleResultSets(Statement stmt)

DefaultResultSetHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public List<Object> handleResultSets(Statement stmt) throws SQLException {
ErrorContext.instance().activity("handling results").object(mappedStatement.getId());
final List<Object> multipleResults = new ArrayList<Object>();
int resultSetCount = 0;
ResultSetWrapper rsw = getFirstResultSet(stmt); //获取结果集并处理结果元数据
List<ResultMap> resultMaps = mappedStatement.getResultMaps();
int resultMapCount = resultMaps.size();
validateResultMapsCount(rsw, resultMapCount);
while (rsw != null && resultMapCount > resultSetCount) {
ResultMap resultMap = resultMaps.get(resultSetCount);
handleResultSet(rsw, resultMap, multipleResults, null);
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}
String[] resultSets = mappedStatement.getResulSets();
if (resultSets != null) {
while (rsw != null && resultSetCount < resultSets.length) {
ResultMapping parentMapping = nextResultMaps.get(resultSets[resultSetCount]);
if (parentMapping != null) {
String nestedResultMapId = parentMapping.getNestedResultMapId();
ResultMap resultMap = configuration.getResultMap(nestedResultMapId);
handleResultSet(rsw, resultMap, null, parentMapping);
}
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}
}
return collapseSingleResultList(multipleResults);
}

在上面方法中继续执行(DefaultResultSetHandler类中)

ResultSetWrapper rsw = getFirstResultSet(stmt)=>DefaultResultSetHandler.getFirstResultSet()

1
2
3
4
5
private ResultSetWrapper getFirstResultSet(Statement stmt) throws SQLException {
ResultSet rs = stmt.getResultSet(); //获取结果集
...
return rs != null ? new ResultSetWrapper(rs, configuration) : null; //包装结果集并返回
}

ResultSet rs = stmt.getResultSet()=>DruidStatement.getResultSet()=>ResultSet rs = stmt.getResultSet()=>进入到Connection实例,执行通过prepareStatement方法创建的statement实例中的getResultSet()方法取得resultSet结果集。

返回到上面代码中ResultSet rs = stmt.getResultSet()结果,执行下一步

new ResultSetWrapper(rs, configuration)=>ResultSetWrapper构造方法

ResultSetWrapper.java

1
2
3
4
5
6
7
8
9
10
11
12
public ResultSetWrapper(ResultSet rs, Configuration configuration) throws SQLException {
super();
this.typeHandlerRegistry = configuration.getTypeHandlerRegistry();
this.resultSet = rs;
final ResultSetMetaData metaData = rs.getMetaData();
final int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
columnNames.add(configuration.isUseColumnLabel() ? metaData.getColumnLabel(i) : metaData.getColumnName(i)); //获取列名
jdbcTypes.add(JdbcType.forCode(metaData.getColumnType(i))); //获取列jdbc类型
classNames.add(metaData.getColumnClassName(i)); //获取列Java类型
}
}

上面循环中三行代码处理结果集中的元数据在处理列jdbc类型和列Java类型中,会分别执行ResultSetMetaDataBase类中的getColumnType()和getColumnClassName()方法
ResultSetMetaDataBase.java

1
2
3
4
5
6
7
8
@Override
public int getColumnType(int column) throws SQLException {
return getColumn(column).getColumnType();
}
@Override
public String getColumnClassName(int column) throws SQLException {
return getColumn(column).getColumnClassName();
}

处理完成后程序开始返回返回到DefaultResultSetHandler.的handleResultSets()中继续执行下一步List resultMaps = mappedStatement.getResultMaps()获取mybatis中的结果集映射类型。执行while循环

handleResultSet(rsw, resultMap, multipleResults, null)=>DefaultResultSetHandler.handleRowValues()=>DefaultResultSetHandler.handleRowValuesForSimpleResultMap()=>DefaultResultSetHandler.getRowValue()=>DefaultResultSetHandler.applyAutomaticMappings()

在applyAutomaticMappings方法中有两行代码分别处理类型和值

1
2
3
4
5
6
7
8
private boolean applyAutomaticMappings(ResultSetWrapper rsw, ResultMap resultMap, MetaObject metaObject, String columnPrefix) throws SQLException {
...
final TypeHandler<?> typeHandler = rsw.getTypeHandler(propertyType, columnName); //获取类型
final Object value = typeHandler.getResult(rsw.getResultSet(), columnName); //获取值
...
metaObject.setValue(property, value); //设置值
...
}

在获取值得过程中,最终会调用ResultSet实例的getXXX方法获取具体值

在结果集映射完成后,返回到while循环执行下一步,直到结果集处理完成,层层返回处理的结果。一个查询过程完成。

总结

Mybatis执行查询的大体流程为:selectList()=>execute()=>handleResultSets()=>返回结果

在handleResultSets()过程中主要分为三步:

  1. 取数据,包装成结果集
  2. 处理元数据,进一步包装结果集
  3. 结果集映射返回Java类型,如Map

    elasticsearch和mybatis整合

    说明

    因为es并没有实际的JDBC驱动,elasticsearch-sql仅仅完成了从es取数据并包装成ResultSet的过程,即Mybatis第一步过程。
    整合过程中需要完成Mybatis的第二布和第三步。同时完善资源的释放。补充JDBC规范中必须的setXXX完成?占位符参数替换(在Mybatis分页拦截插件测试中这个很重要)。

在elasticsearch-sql的源码中,共7个类

整合过程中新创建的类都必须放在com.alibaba.druid.pool包下

  1. ElasticSearchDruidDataSourceFactory.java—————数据源工厂类,用户创建数据源实例
  2. ElasticSearchDruidDataSource.java———————-数据源
  3. ElasticSearchConnection.java—————————物理connection连接
  4. ElasticSearchDruidPooledConnection.java—————-连接池中的连接
  5. ElasticSearchDruidPooledPreparedStatement.java———prepareStatement
  6. ElasticSearchResultSet.java—————————-ResultSet结果集
  7. ElasticSearchResultSetMetaDataBase.java—————-元数据
    暂时没在GitHub上找到这个实验项目的源码,整合过程中需要修改源码的部分都是通过继承或拷贝部分代码的方式

    构造元数据信息

    在es-sql的源码中,ElasticSearchResultSetMetaDataBase中使用了一个私有属性columns存储元数据,实际上在Mybatis是通过其父类ResultSetMetaDataBase中的私有属性columns获取元数据信息的。所以需要将构造的元数据填充到ResultSetMetaDataBase的columns中,重写ElasticSearchResultSetMetaDataBase.java

MybatisElasticSearchResultSetMetaDataBase.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MybatisElasticSearchResultSetMetaDataBase extends ResultSetMetaDataBase {
public MybatisElasticSearchResultSetMetaDataBase(List<String> headers){
super();
List<ColumnMetaData> columns = getColumns();
columns.clear();
for(String column:headers){
ColumnMetaData columnMetaData = new ColumnMetaData();
columnMetaData.setColumnLabel(column);
columnMetaData.setColumnName(column);
columns.add(columnMetaData);
}
}
@Override
public int getColumnType(int column) throws SQLException {
return Types.LONGVARCHAR; //JDBC类型为LONGVARCHAR
}
@Override
public String getColumnClassName(int column) throws SQLException {
return "java.lang.String"; //对应Java的String类型
}
}

所有的数据类型都当做String处理

复制一份ElasticSearchResultSet.java改为MybatisElasticSearchResultSet.java,修改构造方法中this.metaData = new ==MybatisElasticSearchResultSetMetaDataBase==(headers),重写MybatisElasticSearchResultSet.java的close方法和部分getXXX方法

MybatisElasticSearchResultSet.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//测试的时候重不重写没发现有什么区别,MybatisElasticSearchResultSet就只是一个实现ResultSet接口的普通类,保险起见做下清空
@Override
public void close() throws SQLException {
iterator = null;
current = null;
headers = null;
metaData = null;
}
/**LONGVARCHAR被当做Clob处理*/
@Override
public Clob getClob(int columnIndex) throws SQLException {
Object value = current.get(columnIndex);
if(value == null){
return null;
}
return new MockClob(value.toString().getBytes());
}
@Override
public Clob getClob(String columnLabel) throws SQLException {
Object value = current.get(headers.indexOf(columnLabel));
if(value == null){
return null;
}
return new MockClob(value.toString().getBytes());
}

重写物理连接的创建方法

实验项目的创建物理连接是把if判断注释了的,测试的时候发现连接不能正确释放,会报类型转换错误

1
2
3
4
5
6
7
8
9
@Override
public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
Connection conn = new MybatisElasticSearchConnection(url);
if (getProxyFilters().size() > 0) {
conn = new FilterChainImpl(this).wrap(conn,info);
}
createCount.incrementAndGet();
return conn;
}

重写DruidPooledPreparedStatement中execute和setXXX方法

Mybatis查询默认执行的是execute方法而不是executeQuery,所以需要继承ElasticSearchDruidPooledPreparedStatement添加该方法,修改executeQuery方法中 ResultSet rs = new ElasticSearchResultSet(this, headers, lines);为 ResultSet rs = new MybatisElasticSearchResultSet(this, headers, lines);

需要重写setXXX方法,在使用prepareStatement.setXXX(index,value)进行?参数替换时调用
修改client的创建方式

MybatisElasticSearchDruidPooledPreparedStatement.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
public class MybatisElasticSearchDruidPooledPreparedStatement extends DruidPooledPreparedStatement {
LinkedList<String> sqlList = new LinkedList<String>();//存放分解的sql片段
Client client = null;
public MybatisElasticSearchDruidPooledPreparedStatement(DruidPooledConnection conn, PreparedStatementHolder holder)throws SQLException {
super(conn,holder);
this.client = ((ElasticSearchConnection)getOriginalConnection(conn.getConnection())).getClient();
sqlList.clear();
buildSqlList(getSql());
}
private Connection getOriginalConnection(Connection conn){
if(conn instanceof ConnectionProxyImpl){
return ((ConnectionProxyImpl)conn).getConnectionRaw();
}else{//conn instanceof ElasticSearchConnection
return conn;
}
}
//分解sql,如select * from tableName where a=? and b=?分解成 ["select * from tableName where a=","?"," and b=","?"]
private void buildSqlList(String sql){
int index = sql.indexOf("?");
if(index<0){
sqlList.add(sql);
return;
}
String preStr = sql.substring(0,index);
sqlList.add(preStr);
sqlList.add("?");
String afterStr = sql.substring(index+1);
buildSqlList(afterStr);
}
//下面setXXX方法用于替换?参数
@Override
public void setInt(int parameterIndex, int x) throws SQLException {
sqlList.set(2*parameterIndex-1,String.valueOf(x));
}
@Override
public void setString(int parameterIndex, String x) throws SQLException {
sqlList.set(2*parameterIndex-1,"'"+x+"'");
}
@Override
public void setDouble(int parameterIndex, double x) throws SQLException {
sqlList.set(2*parameterIndex-1,String.valueOf(x));
}
@Override
public void setDate(int parameterIndex, java.sql.Date x) throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
setString(parameterIndex,sdf.format(x));
}
@Override
public void close() throws SQLException {
sqlList.clear();//做下清空操作
super.close();
}
@Override
public ResultSet executeQuery() throws SQLException {
checkOpen();
incrementExecuteCount();
transactionRecord(getSql());
oracleSetRowPrefetch();
conn.beforeExecute();
try {
ObjectResult extractor = getObjectResult(true,StringUtils.join(sqlList.toArray(),""), false, false, true);
List<String> headers = extractor.getHeaders();
List<List<Object>> lines = extractor.getLines();
ResultSet rs = new MybatisElasticSearchResultSet(this, headers, lines);
if (rs == null) {
return null;
}
DruidPooledResultSet poolableResultSet = new DruidPooledResultSet(this, rs);
addResultSetTrace(poolableResultSet);
return poolableResultSet;
} catch (Throwable t) {
throw checkException(t);
} finally {
conn.afterExecute();
}
}
@Override
public boolean execute() throws SQLException {
checkOpen();
incrementExecuteCount();
transactionRecord(getSql());
oracleSetRowPrefetch();
conn.beforeExecute();
DruidPooledResultSet poolableResultSet = null;
ResultSet rs = null;
try {
ObjectResult extractor = getObjectResult(true, StringUtils.join(sqlList.toArray(),""), false, false, true);
List<String> headers = extractor.getHeaders();
List<List<Object>> lines = extractor.getLines();
rs = new MybatisElasticSearchResultSet(this, headers, lines);
poolableResultSet = new DruidPooledResultSet(this, rs);
addResultSetTrace(poolableResultSet);
PreparedStatement stmt = this.getPreparedStatementHolder().getStatement();
Method method = stmt.getClass().getDeclaredMethod("setResultSet",ResultSet.class);
method.invoke(stmt,poolableResultSet);
} catch (Throwable t) {
throw checkException(t);
} finally {
conn.afterExecute();
}
return true;
}
//getObjectResult方法在父类中定义为private,不能继承,复制一份
private ObjectResult getObjectResult(boolean flat, String query, boolean includeScore, boolean includeType, boolean includeId) throws SqlParseException, SQLFeatureNotSupportedException, Exception, CsvExtractorException {
SearchDao searchDao = new org.nlpcn.es4sql.SearchDao(client);
QueryAction queryAction = searchDao.explain(query);
Object execution = QueryActionElasticExecutor.executeAnyAction(searchDao.getClient(), queryAction);
return new ObjectResultsExtractor(includeScore, includeType, includeId).extractResults(execution, flat);
}

重写ElasticSearchConnection中prepareStatement创建,resultSet获取和资源释放等方法

mybatis在获取结果集时会调用PrepareStatement实例中的getResultSet()方法和getUpdateCount()方法,在mybatis执行execute方法时,我们需要通过es client获取的数据包装成结果集放在PrepareStatement实例中暂存,在调用getResultSet()时返回暂存的实例

MybatisElasticSearchConnection.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class MybatisElasticSearchConnection extends ElasticSearchConnection {
private final static Log LOG = LogFactory.getLog(MybatisElasticSearchConnection.class);
public MybatisElasticSearchConnection(String jdbcUrl){
super(jdbcUrl);
}
@Override
public void close() throws SQLException {
this.getClient().close();
LOG.info("关闭连接");
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
return new PreparedStatement() {
private ResultSet rs = null;//暂存通过es client获取的结果集
public void setResultSet(ResultSet rs){//自定义方法,不是JDBC规范中的方法,在prepareStatement的execute方法中通过反射调用
this.rs = rs;
}
@Override
public ResultSet getResultSet() throws SQLException {
return rs;
}
@Override
public int getUpdateCount() throws SQLException {
return -1; //es-sql只支持查询,直接返回-1,否则resultset为null时mybatis会无限循环(mybatis当成update或insert操作,一直等待结果返回)
}
@Override
public void close() throws SQLException {
if(rs!=null){
rs.close();
rs = null;
}
//getClient().close();//应该在关闭连接时才关闭client
}
...
//其它方法默认原es-sql代码实现
...
}
}
}

在es-sql中将关闭client的方法放在statement的close中,在使用连接池时,client的关闭应该随connection关闭而关闭,不应该是statement。

修改ElasticSearchDruidPooledConnection.java

创建MybatisElasticSearchDruidPooledConnection,继承自ElasticSearchDruidPooledConnection,创建一个私有方法getOriginalConnection(conn),该方法是获取真实的连接。
拷贝父类两个prepareStatement方法,修改stmtHolder的创建方式,
将DruidPooledPreparedStatement rtnVal = new ==ElasticSearchDruidPooledPreparedStatement==(this, stmtHolder);改为
DruidPooledPreparedStatement rtnVal = new ==MybatisElasticSearchDruidPooledPreparedStatement==(this, stmtHolder);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
...
stmtHolder = new PreparedStatementHolder(key, getOriginalConnection(conn).prepareStatement(sql));
...
DruidPooledPreparedStatement rtnVal = new MybatisElasticSearchDruidPooledPreparedStatement(this, stmtHolder);
...
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency){
...
stmtHolder = new PreparedStatementHolder(key, getOriginalConnection(conn).prepareStatement(sql, resultSetType,resultSetConcurrency));
...
DruidPooledPreparedStatement rtnVal = new MybatisElasticSearchDruidPooledPreparedStatement(this, stmtHolder);
...
}
private Connection getOriginalConnection(Connection conn){
if(conn instanceof ConnectionProxyImpl){
return ((ConnectionProxyImpl)conn).getConnectionRaw();
}else{//conn instanceof ElasticSearchConnection
return conn;
}
}

其它修改

1.

创建MybatisElasticSearchDruidDataSource,完全拷贝自ElasticSearchDruidDataSource,全局文本替换==ElasticSearch=>MybatisElasticSearch==
2.
创建MybatisElasticSearchDruidDataSourceFactory,完全拷贝自ElasticSearchDruidDataSourceFactory,修改DataSource的创建方式DruidDataSource dataSource = new ==MybatisElasticSearchDruidDataSource==();

创建datasrouce-es.xml并import到apppicationContext.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:jpa="http://www.springframework.org/schema/data/jpa"
xmlns:security="http://www.springframework.org/schema/security"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="defaulteDataSource-es" class="com.alibaba.druid.pool.MybatisElasticSearchDruidDataSource" init-method="init" destroy-method="close">
<property name="url" value="jdbc:elasticsearch://192.168.70.128:9300/" />
<!-- 初始化连接大小 -->
<property name="initialSize" value="2" />
<!-- 连接池最小空闲 -->
<property name="minIdle" value="2" />
<!-- 连接池最大使用连接数量 -->
<property name="maxActive" value="100" />
<!-- 获取连接最大等待时间 -->
<property name="maxWait" value="60000" />
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="validationQuery" value="select 1 from dual" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
<property name="poolPreparedStatements" value="false" />
<property name="maxPoolPreparedStatementPerConnectionSize" value="20" />
<!-- 打开removeAbandoned功能 -->
<property name="removeAbandoned" value="true" />
<!-- 1800秒,也就是30分钟 -->
<property name="removeAbandonedTimeout" value="1800" />
<!-- 关闭abanded连接时输出错误日志 -->
<property name="logAbandoned" value="true" />
<!-- 监控数据库 -->
<property name="filters" value="stat,log4j" />
<property name="connectionProperties" value="druid.stat.slowSqlMillis=5000" />
</bean>
<bean id="defaultSQLSessionFactory-es" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="defaulteDataSource-es" />
<property name="configLocation" value="classpath:mybatis-config.xml"></property>
<property name="mapperLocations">
<array>
<value>classpath*:bigdata/**/model/es/*.xml</value>
</array>
</property>
</bean>
<bean id="defaultSQLSessionTemplate-es" class="org.mybatis.spring.SqlSessionTemplate">
<constructor-arg index="0" ref="defaultSQLSessionFactory-es" />
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name = "dataSource" ref="defaulteDataSource-es"/>
</bean>
</beans>

关于分页查询

es-sql支持分页查询,分页方式同mysql。在项目分页插件PaginationInterceptor中,如查询表radiott,获取数据总数的sql是

1
select count(0) as COUNT_ALL from (select * from radiott) tmp_count

这种方式在es-sql中是不支持的,会报以下错误

1
2
3
4
5
6
7
8
9
Caused by: java.lang.ClassCastException: com.alibaba.druid.sql.ast.statement.SQLSubqueryTableSource cannot be cast to com.alibaba.druid.sql.ast.statement.SQLJoinTableSource
at org.nlpcn.es4sql.parse.SqlParser.findFrom(SqlParser.java:227)
at org.nlpcn.es4sql.parse.SqlParser.parseSelect(SqlParser.java:48)
at org.nlpcn.es4sql.parse.SqlParser.parseSelect(SqlParser.java:35)
at org.nlpcn.es4sql.query.ESActionFactory.create(ESActionFactory.java:61)
at org.nlpcn.es4sql.SearchDao.explain(SearchDao.java:46)
at com.alibaba.druid.pool.MybatisElasticSearchDruidPooledPreparedStatement.getObjectResult(MybatisElasticSearchDruidPooledPreparedStatement.java:86)
at com.alibaba.druid.pool.MybatisElasticSearchDruidPooledPreparedStatement.execute(MybatisElasticSearchDruidPooledPreparedStatement.java:64)
... 32 more

可以使用select count(*) as COUNT_ALL from radiott,可以count(字段名),不能是count(0),count(0)返回的是0。

ElasticsearchServiceBase.java

基本拷贝自ServiceBase.java,替换session为defaultSQLSessionTemplate-es,修改几处代码,在入参添加了一个属性datasource_type为elasticsearch,标识为es查询,用于在分页插件中区分。其中,分页时es允许的最大查询数量是10000。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
protected IMapEntry<String, Object> qryPage(String key, IMapEntry<String, Object> param) {
param.put("datasource_type","elasticsearch");
IMapEntry<String, Object> result = qryPageInfo(key, param);
result.put("data_list", qryPageList(key, param));
return result;
}
protected List<IMapEntry<String, Object>> qryPageList(String key, IMapEntry<String, Object> param) {
int page_req = 1;
int page_num = 10000; //es允许的最大查询数量是10000
try {
page_req = param.getInteger("page_req");
} catch (Exception e) {
logger.debug("分页参数-page_req 异常。");
} finally {
if (page_req < 1) {
page_req = 1;
}
}
try {
page_num = param.getInteger("page_num");
if(page_num>10000)page_num=10000;
} catch (Exception e) {
logger.debug("分页参数-page_num 异常。");
} finally {
if (page_num < 1) {
page_num = 10000;
}
}
return session.selectList(key, param, new RowBounds(page_req, page_num));
}
protected IMapEntry<String, Object> qryPageInfo(String key, IMapEntry<String, Object> param) {
...
//计算行数时如2行,返回的是字符串2.0
rec_cnt = Double.valueOf(countMap.get("count_all")).intValue();
...
}

ElasticsearchPaginationInterceptor.java

基本拷贝自PaginationInterceptor,计算行数的sql为selct count(*) from xxx的方式。去掉oracle的相关判断。添加数据源的判断,不拦截非elasticsearch查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
public class ElasticsearchPaginationInterceptor implements Interceptor {
public static final String DO_QUERY_COUNT_FLG = "_DO_QUERY_COUNT_FLG_";
private final static Log log = LogFactory.getLog(ElasticsearchPaginationInterceptor.class);
public static final ObjectFactory DEFAULT_OBJECT_FACTORY = new DefaultObjectFactory();
public static final ObjectWrapperFactory DEFAULT_OBJECT_WRAPPER_FACTORY = new DefaultObjectWrapperFactory();
@Override
public Object intercept(Invocation invocation) throws Throwable {
StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
MetaObject metaStatementHandler = MetaObject.forObject(statementHandler, DEFAULT_OBJECT_FACTORY, DEFAULT_OBJECT_WRAPPER_FACTORY);
DefaultParameterHandler defaultParameterHandler = (DefaultParameterHandler) metaStatementHandler.getValue("delegate.parameterHandler");
Object parameterObj = defaultParameterHandler.getParameterObject();
if (!(parameterObj instanceof Map)) {
log.debug("查询时未使用Map类型的参数,直接查询不拦截");
return invocation.proceed();
}
Map parameterMap = (Map) defaultParameterHandler.getParameterObject();
Object datasource_type = parameterMap.get("datasource_type");
if(datasource_type==null||StringUtils.isBlank(datasource_type.toString())||!"elasticsearch".equals(datasource_type.toString())){//不是elasticsearch查询
log.debug("非ElasticSearch查询,直接查询不拦截");
return invocation.proceed();
}
Object sidx = parameterMap.get("_sidx");
Object sord = parameterMap.get("_sord");
String originalSql = (String) metaStatementHandler.getValue("delegate.boundSql.sql");
if (sidx != null && sord != null) {
originalSql = originalSql + " order by " + sidx + " " + sord;
}
// 取得SqlId
// String originalSqlId = (String)
// metaStatementHandler.getValue("delegate.mappedStatement.id");
// 分页Count查询
Object countFlg = parameterMap.get(DO_QUERY_COUNT_FLG);
if (countFlg != null && new Boolean(ObjectUtils.toString(countFlg, "false"))) {
if (log.isDebugEnabled()) {
log.debug("分页查询,统计总数。");
}
int fromIndex = originalSql.toLowerCase().indexOf("from");
String countSql = "select count(*) as COUNT_ALL "+originalSql.substring(fromIndex);
metaStatementHandler.setValue("delegate.boundSql.sql", countSql);
//如果上一次有minRow,maxRow则移除
BoundSql boundSql = statementHandler.getBoundSql();
List<ParameterMapping> parameterMappings =new ArrayList<ParameterMapping>();
parameterMappings.addAll(boundSql.getParameterMappings());
if (parameterMappings != null) {
for (int i = 0; i < parameterMappings.size(); i++) {
ParameterMapping parameterMapping = parameterMappings.get(i);
if (parameterMapping.getMode() != ParameterMode.OUT) {
Object value;
String propertyName = parameterMapping.getProperty();
if("maxRow".equals(propertyName)||"minRow".equals(propertyName)){
boundSql.getParameterMappings().remove(parameterMapping);
log.debug("remove:"+propertyName);
}
}
}
}
return invocation.proceed();
}
RowBounds rowBounds = (RowBounds) metaStatementHandler.getValue("delegate.rowBounds");
if (rowBounds == null || rowBounds == RowBounds.DEFAULT) {
if (log.isDebugEnabled()) {
log.debug("列表查询。");
}
BoundSql boundSql = statementHandler.getBoundSql();
return invocation.proceed();
}
if (log.isDebugEnabled()) {
log.debug("分页查询,第" + rowBounds.getOffset() + "页; 页数据量:" + rowBounds.getLimit() + "。");
}
Configuration configuration = (Configuration) metaStatementHandler.getValue("delegate.configuration");
String dialectType = configuration.getVariables().getProperty("dialect");
if(StringUtils.isBlank(dialectType)){
throw new RuntimeException("the value of the dialect property in mybatis-config.xml is not defined : " + configuration.getVariables().getProperty("dialect"));
}
Dialect.Type databaseType = Dialect.Type.valueOf(Dialect.Type.class, dialectType.toUpperCase());
Dialect dialect = new MysqlDialect();;
int minRow = rowBounds.getLimit() * (rowBounds.getOffset() - 1);
int maxRow = rowBounds.getLimit();
/* 计算查询行信息 */
metaStatementHandler.setValue("delegate.boundSql.sql", dialect.getLimitString(originalSql, rowBounds.getOffset(), rowBounds.getLimit()));
// 重置分页信息
metaStatementHandler.setValue("delegate.rowBounds.offset", RowBounds.NO_ROW_OFFSET);
metaStatementHandler.setValue("delegate.rowBounds.limit", RowBounds.NO_ROW_LIMIT);
// 分页参数?化
BoundSql boundSql = statementHandler.getBoundSql();
/* 添加分页参数 */
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
if (parameterMappings == null || (parameterMappings.size() == 0 && !(parameterMappings instanceof ArrayList))) {
parameterMappings = Lists.newArraylist();
Field parameterMappingsField = boundSql.getClass().getDeclaredField("parameterMappings");
parameterMappingsField.setAccessible(true);
parameterMappingsField.set(boundSql, parameterMappings);
}
parameterMappings.add(new ParameterMapping.Builder(configuration, "minRow", Integer.class).build());
parameterMappings.add(new ParameterMapping.Builder(configuration, "maxRow", Integer.class).build());
parameterMap.put("minRow", minRow);
parameterMap.put("maxRow", maxRow);
return invocation.proceed();
}
@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}
@Override
public void setProperties(Properties properties) {
}
private String paramMap2String(Map parameterMap) {
StringBuilder sbReturn = new StringBuilder();
if (parameterMap != null && !parameterMap.isEmpty()) {
sbReturn.append("<br/>参数:<br/>");
Set<Map.Entry> en = parameterMap.entrySet();
for (Map.Entry entry : en) {
sbReturn.append("&nbsp;&nbsp;&nbsp;&nbsp;").append(entry.getKey()).append(" : [").append(entry.getValue()).append("]<br/>");
}
}
return sbReturn.toString();
}
}

sql支持

支持列表:https://github.com/NLPchina/elasticsearch-sql

  • select a as a1 from xxx 和select a as “a1” from xxx的区别
    两种查询方式的列名是不一样的,第一种是a1,取出方式:map.getString(“a1”),第二种是”a1”,取出方式:map.getString(“\\”a1\\””)。
  • 不支持对多张join on的表进行count操作,会一直返回null

    es-sql对sql的支持度不如关系型数据库,编写sql时最好先用客户端工具测试

    maven依赖

    整合过程中除了spring和mybatis外还依赖以下库

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.0.16</version>
    </dependency>
    <dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>2.4.5</version>
    </dependency>
    <dependency>
    <groupId>org.nlpcn</groupId>
    <artifactId>elasticsearch-sql</artifactId>
    <version>2.4.1.0</version>
    </dependency>
    <dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>18.0</version>
    </dependency>

后续

以上的改动是在GitHub上elasticsearch-sql的一个实验项目的基础上修改而来,感觉很多地方不合理,很多代码并不符合JDBC规范,创建连接的方式是直接修改数据源代码而不是写Driver。先这样做,先把功能需求实现了,后面有时间再研究下JDBC规范,考虑重写下。