目录

  1. 说明
  2. 定义rest结构
  3. maven依赖
  4. 封装JestUtil.java
  5. 构建元数据和数据集
    1. JestMetaData.java
    2. JestResultSet.java
  6. 修改JDBC操作逻辑
    1. 定义操作类型
    2. 修改statement操作
      1. 修改executeQuery
      2. 修改execute
      3. 其它修改
  7. 依赖配置项
  8. 测试

说明

官方提供了完整的Java API和Rest API(文档地址),并提供了TransportClient客户端实现了Java API,Rest API因为使用http协议调用,并没有提供相应的客户端。

在前文 “elasticserch-sql和mybatis整合记录” 实现了elasticsearch-sql和mybatis的整合,但是elasticsearch-sql提供的功能相当有限,仅支持极其简单的查询,不支持增加、删除和修改。Rest API功能强大,在实际应用中可能会因前台参数的不同而组合不同的条件,拼接字符串又太麻烦,如果实现Rest API和mybatis整合,让mybatis强大的动态条件功能组合rest API,会让代码更简单更容易维护。

Rest API客户端使用Jest,具体详情和文档参考GitHub Jest主页。在之前的基础上修改整合。

2017-8-12修改:
重写了该项目,分离了Druid连接池,不再限制使用Druid,添加了Driver,项目已共享至GitHub elasticsearch-jdbc,欢迎fork

定义rest结构

基本结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<!--添加-->
<insert id="jestInsert" parameterType="java.lang.Object">
PUT ${index}/${type}/${_id}?op_type=create
{
"id":"${id}",
"name":"${name}"
}
</insert>
<!--查询-->
<select id="qryTest2" parameterType="java.lang.Object" resultType="com.xxx.util.HashMapLowerCase">
POST ${index}/${type}/_search?
{
"query": {
"prefix": {
"name": "te"
}
}
}
</select>
<!--查询2,通过文档_id获取-->
<select id="qryTest2" parameterType="java.lang.Object" resultType="com.xxx.util.HashMapLowerCase">
GET ${index}/${type}/${id}
</select>
<!--修改-->
<update id="jestUpdate" parameterType="java.lang.Object">
POST ${index}/${type}/${id}/_update?
{
"doc": {
"name": "${name}"
}
}
</update>
<!--删除-->
<delete id="jestDeletee" parameterType="java.lang.Object">
DELETE ${index}/${type}/${id}
</delete>

上面基本列出了CRUD操作,为了方便描述,xml中的内容统称为==restBody==,以修改为例,POST为==协议==,index/type/id/_update为==uri==,?和第一个{间的内容为==参数==(即添加示例中的op_type=create,可选),{xxx}为==restMapping==,这个就是发到elasticsearch的结构体(JSON字符串)。

协议的不同,uri也会不同,参数都是可选的,协议与uri的关系如下:

  • PUT:insert操作。

    uri必须含有index/type[/id],id可选,默认使用es的自增长id,restMapping不能为空
    
  • POST:select或update操作。

    如果是update,uri必须满足`index/type/id/_update`格式。id可以传入多个批量更新(如更新多个文档的state),使用英文`,`分隔    
    如果是select,index/type都是可选,也可以传入多个index和type,用英文`,`分隔,如:index1,index2/type1,type2/_search,不能有id,必须带_search,所以select操作只能是下面几种uri
    
    • _search:不指定index和type
    • index/_search:只指定index
    • index/type/_search:同时指定index和type
  • DELETE:delete操作。

    uri必须满足`index/type/id`这种格式。id可以传入多个批量删除,使用英文`,`分隔,不能根据条件删除,所以传入restMapping是无效的
    
  • GET:通过文档id获取。

    只能通过单个文档id获取,,uri必须满足`index/type/id`格式,只能指定一个id,如果需要同时通过多个id查询,使用[idsQuery](https://www.elastic.co/guide/en/elasticsearch/reference/2.4/query-dsl-ids-query.html),传入restMapping无效。
    

==协议不区分大小写,参数可选。但是无论是否有参数,只要有restMapping,?必须带上==

注意
所有的增删改查都可以包裹在

如果增删改是包裹在对应的中,通过session.insert、session.update、session.delete操作返回的整形数字不是关系型数据库中影响的行数,而是http状态码,2xx系列状态码表示操作成功,如果操作失败,并不会返回具体的错误信息,程序也不会报错。如果需要具体的操作信息,只能使用

maven依赖

1
2
3
4
5
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<version>2.0.0</version>
</dependency>

封装JestUtil.java

com.alibaba.druid包下创建package jest,下面所述新建的Java类都放在该包下。对jest客户端进行封装,提供一些解析restBody的工具,具体代码如下:

JestUtil.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
package com.alibaba.druid.jest;
import com.tydic.exception.SystemException;
import com.tydic.util.SpringBeanUtil;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.*;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Created by wjj on 2017/6/21.
*/
public class JestUtil {
private JestUtil(){}
private static class ClientHolder{
public static JestClient jestClient = createJestClient();
private static JestClient createJestClient(){
String jdbcUrl = SpringBeanUtil.getInstance().getLocalProperty("datasource.es.http.host");//从配置文件中获取配置信息
if(StringUtils.isBlank(jdbcUrl)){
throw new SystemException("缺少datasource.es.host配置项,创建JestClient失败");
}
JestClientFactory factory = new JestClientFactory();
String[] hostAndPortArray = jdbcUrl.split(",");
String maxActive = SpringBeanUtil.getInstance().getLocalProperty("datasource.es.maxActive","20");
factory.setHttpClientConfig(new HttpClientConfig
.Builder(Arrays.asList(hostAndPortArray))
.multiThreaded(true)
//Per default this implementation will create no more than 2 concurrent connections per given route
// .defaultMaxTotalConnectionPerRoute(5)
// and no more 20 connections in total
.maxTotalConnection(Integer.parseInt(maxActive))
.build());
return factory.getObject();
}
}
public static JestClient getJestClient(){
return ClientHolder.jestClient;
}
/**
* 获取增删改的操作结果
* @param map
* @return
*/
public static JestResult getExecuteResult(Map map){
return (JestResult) map.get(JestMetaData.COLUMN_LABEL);
}
/**
* 获取查询的操作结果
* @param map
* @return
*/
public static SearchResult getQueryResult(Map map){
return (SearchResult) map.get(JestMetaData.COLUMN_LABEL);
}
/**
* 插入额外参数
* @param builder
* @param parameter
* @throws Exception
*/
private static void setParameters(Object builder,Map parameter) throws Exception{
if(parameter==null||parameter.isEmpty()){
return;
}
Method method = builder.getClass().getMethod("setParameter",String.class,Object.class);
Iterator<Map.Entry<String,Object>> i = parameter.entrySet().iterator();
while (i.hasNext()){
Map.Entry<String,Object> e = i.next();
method.invoke(builder,e.getKey(),e.getValue());
}
}
/**
* 插入数据,文档_id由elasticsearch自动生成
* @param jsonStr 文档json字符串
* @param index 索引
* @param type 类型
* @return
* @throws IOException
*/
public static JestResult insert(String jsonStr,String index,String type)throws Exception{
return insert(jsonStr,index,type,new HashMap());
}
/**
* 插入数据,文档_id由elasticsearch自动生成
* @param jsonStr 文档json字符串
* @param index 索引
* @param type 类型
* @param parameter 额外参数
* @return
* @throws IOException
*/
public static JestResult insert(String jsonStr,String index,String type,Map<String,Object> parameter)throws Exception{
Index.Builder builder = new Index.Builder(jsonStr).index(index).type(type).refresh(true);
setParameters(builder,parameter);
Index jIndex = builder.build();
return getJestClient().execute(jIndex);
}
/**
* 插入数据,使用指定的id作为文档_id
* @param jsonStr 文档json字符串
* @param index 索引
* @param type 类型
* @param id
* @return
* @throws IOException
*/
public static JestResult insert(String jsonStr,String index,String type,String id)throws Exception{
return insert(jsonStr,index,type,id,null);
}
/**
* 插入数据,使用指定的id作为文档_id
* @param jsonStr 文档json字符串
* @param index 索引
* @param type 类型
* @param id
* @param parameter 额外参数
* @return
* @throws IOException
*/
public static JestResult insert(String jsonStr,String index,String type,String id,Map<String,Object> parameter)throws Exception{
Index.Builder builder = new Index.Builder(jsonStr).index(index).type(type).id(id).refresh(true);
setParameters(builder,parameter);
Index jIndex = builder.build();
return getJestClient().execute(jIndex);
}
/**
* 查询
* @param jsonStr json字符串
* @param indexes 索引
* @param types 类型
* @return
* @throws IOException
*/
public static SearchResult query(String jsonStr,String[]indexes,String[] types)throws Exception{
return query(jsonStr,indexes,types,null);
}
/**
* 查询
* @param jsonStr json字符串
* @param indexes 索引
* @param types 类型
* @param parameter 额外参数
* @return
* @throws IOException
*/
public static SearchResult query(String jsonStr,String[]indexes,String[] types,Map<String,Object> parameter)throws Exception{
Search.Builder searchBuilder = new Search.Builder(jsonStr);
if(indexes!=null&&indexes.length>0){
searchBuilder.addIndex(Arrays.asList(indexes));
}
if(types!=null&&types.length>0){
searchBuilder.addType(Arrays.asList(types));
}
setParameters(searchBuilder,parameter);
SearchResult result = getJestClient().execute(searchBuilder.build());
return result;
}
/**
* 根据文档id获取文档
* @param index 索引
* @param type 类型
* @param id 文档id
* @return
* @throws IOException
*/
public static JestResult get(String index,String type,String id)throws IOException{
Get get = new Get.Builder(index,id).type(type).build();
return getJestClient().execute(get);
}
/**
* 更新
* @param jsonStr 文档json字符串
* @param index 索引
* @param type 类型
* @param id 文档id
* @return
* @throws IOException
*/
public static JestResult update(String jsonStr,String index,String type,String id)throws IOException{
Update update = new Update.Builder(jsonStr).index(index).type(type).id(id).refresh(true).build();
JestResult result = getJestClient().execute(update);
return result;
}
/**
* 批量更新
* @param jsonStr 文档json字符串
* @param index 索引
* @param type 类型
* @param ids 文档id
* @return
* @throws IOException
*/
public static JestResult update(String jsonStr,String index,String type,String ...ids)throws IOException{
Bulk.Builder bulkBuilder = new Bulk.Builder().defaultIndex(index).defaultType(type).refresh(true);
for(String id:ids){
bulkBuilder.addAction(new Update.Builder(jsonStr).index(index).type(type).id(id).build());
}
JestResult result = getJestClient().execute(bulkBuilder.build());
return result;
}
/**
* 删除
* @param index 索引
* @param type 类型
* @param id 文档id
* @return
* @throws IOException
*/
public static JestResult delete(String index,String type,String id) throws IOException{
Delete delete = new Delete.Builder(id).index(index).type(type).refresh(true).build();
return getJestClient().execute(delete);
}
/**
* 批量删除
* @param index 索引
* @param type 类型
* @param ids 文档id
* @return
* @throws IOException
*/
public static JestResult delete(String index,String type,String ...ids) throws IOException{
Bulk.Builder bulkBuilder = new Bulk.Builder().defaultIndex(index).defaultType(type).refresh(true);
for(String id:ids){
bulkBuilder.addAction(new Delete.Builder(id).index(index).type(type).build());
}
return getJestClient().execute(bulkBuilder.build());
}
/**
* 检查rest操作类型
* @param restBody
* @return JestType
*/
public static JestType checkOperateType(String restBody){
if(StringUtils.isBlank(restBody)){
throw new SystemException("rest body 不能为空");
}
int i = StringUtils.indexOf(restBody,"?");
if(i==-1){
i=restBody.length();
}
String head = StringUtils.substring(restBody,0,i).trim();
if(StringUtils.startsWithIgnoreCase(head,"put")){
return JestType.ADD;
}else if(StringUtils.startsWithIgnoreCase(head,"post")){
if(head.contains("_search")){
return JestType.SELECT;
}else if(head.contains("_update")){
return JestType.UPDATE;
}else{
return JestType.ADD;
}
}else if(StringUtils.startsWithIgnoreCase(head,"get")){
return JestType.GET;
}else if(StringUtils.startsWithIgnoreCase(head,"delete")){
return JestType.DELETE;
}else{
throw new SystemException("不合法的rest body 格式,只能以put,post,get,delete开头");
}
}
/**
* 获取结构体
* @param restBody
* @return
*/
public static String getRestMapping(String restBody){
int i = StringUtils.indexOf(restBody,"{");
return StringUtils.substring(restBody,i).trim();
}
/**
* 获取参数
* @param restBody
* @return
*/
public static Map<String,Object> getParameters(String restBody){
int i1 = StringUtils.indexOf(restBody,"?");
int i2 = StringUtils.indexOf(restBody,"{");
if(i1==-1)i1=0;
if(i2==-1)i2=restBody.length();
String pStr = StringUtils.substring(restBody,i1+1,i2).trim();
Map<String,Object> map = null;
if(StringUtils.isNoneBlank(pStr)){
map = new HashMap<>();
String[] pairs = pStr.split("&");
for(String pair:pairs){
String[] ps = pair.split("=",2);
String key = ps[0].trim();
String value = ps.length==2?ps[1].trim():"";
map.put(key,value);
}
}
return map;
}
/**
* 获取URI
* @param restBody
* @return
*/
private static String getUri(String restBody){
int i = StringUtils.indexOf(restBody,"?");
if(i==-1){
i=restBody.length();
}
String head = StringUtils.substring(restBody,0,i).trim();
String regx = "(put|post|get|delete)\\s+(\\S+)";
Pattern pattern = Pattern.compile(regx,Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(head);
if(matcher.find()){
return matcher.group(2).trim();
}else{
return "";
}
}
/**
* 获取索引
* @param restBody
* @return
*/
public static String[] getIndexes(String restBody){
String uri = getUri(restBody);
if(StringUtils.isBlank(uri)||uri.equals("_search")){
return null;
}
String [] strs = uri.split("/");
return strs[0].trim().split(",");
}
/**
* 获取类型
* @param restBody
* @return
*/
public static String[] getTypes(String restBody){
String uri = getUri(restBody);
String [] strs = uri.split("/");
if(strs.length==2&&StringUtils.isNotBlank(strs[1])&&strs[1].equals("_search")){
return null;
}
if(strs.length>2){
return strs[1].trim().split(",");
}
return null;
}
/**
* 获取文档id
* @param restBody
* @return
*/
public static String getId(String restBody){
String uri = getUri(restBody);
String [] strs = uri.split("/");
JestType op_type = checkOperateType(restBody);
if(op_type==JestType.SELECT){
return null;
}else if(op_type==JestType.UPDATE&&uri.contains("_update")){//update必须满足 index/type/id/_update格式
if(strs.length!=4){
throw new SystemException("错误的rest body uri格式");
}else{
return strs[2].trim();
}
}else if(op_type==JestType.ADD&&strs.length==2){ //put添加文档,如果不指定id返回null
return null;
}else{ //delete,get必须满足 index/type/id格式,put可以指定id替代es生成id。
if(strs.length!=3){
throw new SystemException("错误的rest body uri格式");
}else {
return strs[2].trim();
}
}
}
}

构建元数据和数据集

Jest客户端执行rest操作返回的是JestResult或SearchResult对象,不同于sql对关系型数据库操作,elasticsearch的异常不会被抛出,查询操作返回的数据也不是二维表,可能包含多层结构(视restMapping结构)。为了统一方便操作,将返回对象封装在Map结构中,键名为“jest_result”。

JestMetaData.java

构造元数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.alibaba.druid.jest;
import com.alibaba.druid.util.jdbc.ResultSetMetaDataBase;
import io.searchbox.client.JestResult;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
/**
* Created by wjj on 2017/6/22.
*/
public class JestMetaData extends ResultSetMetaDataBase {
public static final String COLUMN_LABEL = "jest_result";
public static final String COLUMN_NAME = "jest_result";
public JestMetaData(JestResult jestResult){
super();
List<ColumnMetaData> columns = getColumns();
columns.clear();
ColumnMetaData columnMetaData = new ColumnMetaData();
columnMetaData.setColumnLabel(COLUMN_LABEL);
columnMetaData.setColumnName(COLUMN_NAME);
columns.add(columnMetaData);
}
@Override
public int getColumnType(int column) throws SQLException {
return Types.OTHER; //返回Types.OTHER,java会通过getObject方法获取
}
@Override
public String getColumnClassName(int column) throws SQLException {
return "java.lang.Object";
}
}

JestResultSet.java

构造数据集,重写遍历方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class JestResultSet implements ResultSet {
private JestResult jestResult; //保存jest操作返回的结果
private ResultSetMetaData metaData;
private boolean hasNext = false;
public JestResultSet(JestResult jestResult){
this.jestResult = jestResult;
hasNext = true;
this.metaData = new JestMetaData(jestResult);
}
@Override
public boolean next() throws SQLException {
if(hasNext){
hasNext = false;
return true;
}
return false;
}
@Override
public void close() throws SQLException {
this.metaData = null;
this.jestResult = null;
this.hasNext = false;
}
@Override
public ResultSetMetaData getMetaData() throws SQLException {
return metaData;
}
@Override
public Object getObject(int columnIndex) throws SQLException {
return this.jestResult;
}
@Override
public Object getObject(String columnLabel) throws SQLException {
return this.jestResult;
}
@Override
public int findColumn(String columnLabel) throws SQLException {
return ((ResultSetMetaDataBase) metaData).findColumn(columnLabel);
}
public int findColumn(String columnName) throws SQLException {
for (int i = 0; i < columns.size(); ++i) {
ColumnMetaData column = columns.get(i);
if (column.getColumnName().equals(columnName)) {
return i + 1;
}
}
throw new SQLException("column '" + columnName + "' not found.");
}
@Override
public <T> T getObject(int columnIndex, Class<T> type) throws SQLException {
return (T)jestResult;
}
@Override
public <T> T getObject(String columnLabel, Class<T> type) throws SQLException {
return (T)jestResult;
}
//其余方法默认实现
...
}

修改JDBC操作逻辑

定义操作类型

JestType.java

1
2
3
4
5
6
7
8
9
10
/**
* Created by wjj on 2017/6/22.
*/
public enum JestType {
ADD,
SELECT,
GET,
UPDATE,
DELETE
}

修改statement操作

主要是修改MybatisElasticSearchDruidPooledPreparedStatement.java类

修改executeQuery

判断输入的是sql语句还是rest api结构,如果是rest操作,调用rest客户端执行操作,将返回的结果包装成结果集返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public ResultSet executeQuery() throws SQLException {
String sql = getSql();
checkOpen();
incrementExecuteCount();
transactionRecord(sql);
oracleSetRowPrefetch();
conn.beforeExecute();
try {
if(isExecuteSql(sql)){ //执行sql
ObjectResult extractor = getObjectResult(true, StringUtils.join(sqlList.toArray(),""), false, false, true);
List<String> headers = extractor.getHeaders();
List<List<Object>> lines = extractor.getLines();
ResultSet rs = new MybatisElasticSearchResultSet(this, headers, lines);
if (rs == null) {
return null;
}
DruidPooledResultSet poolableResultSet = new DruidPooledResultSet(this, rs);
addResultSetTrace(poolableResultSet);
return poolableResultSet;
}else{ //执行rest api
JestResult result = getJestResult(sql);
ResultSet rs = new JestResultSet(result);
DruidPooledResultSet poolableResultSet = new DruidPooledResultSet(this, rs);
addResultSetTrace(poolableResultSet);
return poolableResultSet;
}
} catch (Throwable t) {
throw checkException(t);
} finally {
conn.afterExecute();
}
}

修改execute

mybatis无论是select、insert、update还是delete都是调用的是execute方法。所以rest操作需要在该方法中根据上面定义的协议判断执行的操作类型,调用JestUtil中对应的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Override
public boolean execute() throws SQLException {
String sql = getSql();
if(isExecuteSql(sql)){
executeSql(sql);
}else{
executeJest(sql);
}
return true;
}
/**
* 使用jest客户端执行rest查询
*/
private void executeJest(String restBody)throws SQLException{
checkOpen();
incrementExecuteCount();
oracleSetRowPrefetch();
conn.beforeExecute();
try {
JestResult result = getJestResult(restBody);
ResultSet rs = new JestResultSet(result);
DruidPooledResultSet poolableResultSet = new DruidPooledResultSet(this, rs);
addResultSetTrace(poolableResultSet);
PreparedStatement stmt = this.getPreparedStatementHolder().getStatement();
Method method = stmt.getClass().getDeclaredMethod("setResultSet",ResultSet.class);
method.invoke(stmt,poolableResultSet);
} catch (Throwable t) {
throw checkException(t);
} finally {
conn.afterExecute();
}
}
/**
* 调用jest客户端执行查询
* @param restBody
* @return
* @throws Exception
*/
public JestResult getJestResult(String restBody) throws Exception{
JestType op_type = JestUtil.checkOperateType(restBody);
String restMapping = JestUtil.getRestMapping(restBody);
String[] indexes = JestUtil.getIndexes(restBody);
String[] types = JestUtil.getTypes(restBody);
String id = JestUtil.getId(restBody);
Map<String,Object> parameter = JestUtil.getParameters(restBody);
JestResult result = null;
switch (op_type){
case ADD:
result = StringUtils.isBlank(id)?
JestUtil.insert(restMapping,indexes[0],types[0],parameter):
JestUtil.insert(restMapping,indexes[0],types[0],id,parameter);
break;
case SELECT:
result = JestUtil.query(restMapping,indexes,types,parameter);
break;
case GET:
result = JestUtil.get(indexes[0],types[0],id);
break;
case UPDATE:
result = JestUtil.update(restMapping,indexes[0],types[0],id.split(","));
break;
case DELETE:
result = JestUtil.delete(indexes[0],types[0],id.split(","));
break;
}
return result;
}

其它修改

修改executeUpdate方法,elasticsearch-sql只支持查询,所以如果是sql,抛出异常,如果是rest,执行对应操作。

最终全部的MybatisElasticSearchDruidPooledPreparedStatement.java代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/**
* Created by wjj on 2017/5/25.
*/
public class MybatisElasticSearchDruidPooledPreparedStatement extends ElasticSearchDruidPooledPreparedStatement {
private final static Log LOG = LogFactory.getLog(MybatisElasticSearchDruidPooledPreparedStatement.class);
LinkedList<String> sqlList = new LinkedList<String>();//存放分解的sql片段
public MybatisElasticSearchDruidPooledPreparedStatement(DruidPooledConnection conn, PreparedStatementHolder holder)throws SQLException {
super(conn,holder);
sqlList.clear();
buildSqlList(getSql());
}
//分解sql,如select * from tableName where a=? and b=?分解成 ["select * from tableName where a=","?"," and b=","?"]
private void buildSqlList(String sql){
int index = sql.indexOf("?");
if(index<0){
sqlList.add(sql);
return;
}
String preStr = sql.substring(0,index);
sqlList.add(preStr);
sqlList.add("?");
String afterStr = sql.substring(index+1);
buildSqlList(afterStr);
}
//下面setXXX方法用于替换?参数
@Override
public void setInt(int parameterIndex, int x) throws SQLException {
sqlList.set(2*parameterIndex-1,String.valueOf(x));
}
@Override
public void setString(int parameterIndex, String x) throws SQLException {
sqlList.set(2*parameterIndex-1,"'"+x+"'");
}
@Override
public void setDouble(int parameterIndex, double x) throws SQLException {
sqlList.set(2*parameterIndex-1,String.valueOf(x));
}
@Override
public void setDate(int parameterIndex, java.sql.Date x) throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
setString(parameterIndex,sdf.format(x));
}
@Override
public void close() throws SQLException {
sqlList.clear();//做下清空操作
super.close();
}
@Override
public ResultSet executeQuery() throws SQLException {
String sql = getSql();
checkOpen();
incrementExecuteCount();
transactionRecord(sql);
oracleSetRowPrefetch();
conn.beforeExecute();
try {
if(isExecuteSql(sql)){ //执行sql
ObjectResult extractor = getObjectResult(true, StringUtils.join(sqlList.toArray(),""), false, false, true);
List<String> headers = extractor.getHeaders();
List<List<Object>> lines = extractor.getLines();
ResultSet rs = new MybatisElasticSearchResultSet(this, headers, lines);
if (rs == null) {
return null;
}
DruidPooledResultSet poolableResultSet = new DruidPooledResultSet(this, rs);
addResultSetTrace(poolableResultSet);
return poolableResultSet;
}else{ //执行rest api
JestResult result = getJestResult(sql);
ResultSet rs = new JestResultSet(result);
DruidPooledResultSet poolableResultSet = new DruidPooledResultSet(this, rs);
addResultSetTrace(poolableResultSet);
return poolableResultSet;
}
} catch (Throwable t) {
throw checkException(t);
} finally {
conn.afterExecute();
}
}
@Override
public boolean execute() throws SQLException {
String sql = getSql();
if(isExecuteSql(sql)){
executeSql(sql);
}else{
executeJest(sql);
}
return true;
}
@Override
public int executeUpdate() throws SQLException {
if(isExecuteSql(getSql())){
throw new SQLException("executeUpdate not support in ElasticSearch sql");
}else{
try{
JestResult result = getJestResult(getSql());
return result.getResponseCode();
}catch (Exception e){
LOG.error("执行jest操作发生错误",e);
}
}
return -1;
}
/**
* 执行sql查询
* @throws SQLException
*/
private void executeSql(String sql)throws SQLException{
checkOpen();
incrementExecuteCount();
transactionRecord(sql);
oracleSetRowPrefetch();
conn.beforeExecute();
DruidPooledResultSet poolableResultSet = null;
ResultSet rs = null;
try {
ObjectResult extractor = getObjectResult(true, StringUtils.join(sqlList.toArray(),""), false, false, true);
List<String> headers = extractor.getHeaders();
List<List<Object>> lines = extractor.getLines();
rs = new MybatisElasticSearchResultSet(this, headers, lines);
poolableResultSet = new DruidPooledResultSet(this, rs);
addResultSetTrace(poolableResultSet);
PreparedStatement stmt = this.getPreparedStatementHolder().getStatement();
Method method = stmt.getClass().getDeclaredMethod("setResultSet",ResultSet.class);
method.invoke(stmt,poolableResultSet);
} catch (Throwable t) {
throw checkException(t);
} finally {
conn.afterExecute();
}
}
/**
* 使用jest客户端执行rest查询
*/
private void executeJest(String restBody)throws SQLException{
checkOpen();
incrementExecuteCount();
oracleSetRowPrefetch();
conn.beforeExecute();
try {
JestResult result = getJestResult(restBody);
ResultSet rs = new JestResultSet(result);
DruidPooledResultSet poolableResultSet = new DruidPooledResultSet(this, rs);
addResultSetTrace(poolableResultSet);
PreparedStatement stmt = this.getPreparedStatementHolder().getStatement();
Method method = stmt.getClass().getDeclaredMethod("setResultSet",ResultSet.class);
method.invoke(stmt,poolableResultSet);
} catch (Throwable t) {
throw checkException(t);
} finally {
conn.afterExecute();
}
}
/**
* 判断是否是执行sql操作
* @param sql
* @return true:sql操作。false:rest操作
*/
private boolean isExecuteSql(String sql){
int i = StringUtils.indexOf(sql,"?");
if(i==-1){
i=sql.length();
}
String head = StringUtils.substring(sql,0,i).trim();
if(StringUtils.startsWithIgnoreCase(head,"select")){
return true;
}
return false;
}
private ObjectResult getObjectResult(boolean flat, String query, boolean includeScore, boolean includeType, boolean includeId) throws SqlParseException, SQLFeatureNotSupportedException, Exception, CsvExtractorException {
SearchDao searchDao = new org.nlpcn.es4sql.SearchDao(client);
//String rewriteSQL = searchDao.explain(getSql()).explain().explain();
QueryAction queryAction = searchDao.explain(query);
Object execution = QueryActionElasticExecutor.executeAnyAction(searchDao.getClient(), queryAction);
return new ObjectResultsExtractor(includeScore, includeType, includeId).extractResults(execution, flat);
}
/**
* 调用jest客户端执行查询
* @param restBody
* @return
* @throws Exception
*/
public JestResult getJestResult(String restBody) throws Exception{
JestType op_type = JestUtil.checkOperateType(restBody);
String restMapping = JestUtil.getRestMapping(restBody);
String[] indexes = JestUtil.getIndexes(restBody);
String[] types = JestUtil.getTypes(restBody);
String id = JestUtil.getId(restBody);
Map<String,Object> parameter = JestUtil.getParameters(restBody);
JestResult result = null;
switch (op_type){
case ADD:
result = StringUtils.isBlank(id)?
JestUtil.insert(restMapping,indexes[0],types[0],parameter):
JestUtil.insert(restMapping,indexes[0],types[0],id,parameter);
break;
case SELECT:
result = JestUtil.query(restMapping,indexes,types,parameter);
break;
case GET:
result = JestUtil.get(indexes[0],types[0],id);
break;
case UPDATE:
result = JestUtil.update(restMapping,indexes[0],types[0],id.split(","));
break;
case DELETE:
result = JestUtil.delete(indexes[0],types[0],id.split(","));
break;
}
return result;
}
}

依赖配置项

1
2
3
4
<!--es http地址,多个地址用英文","分隔-->
<entry key="datasource.es.http.host">http://192.168.70.128:9200</entry>
<!--最大连接数-->
<entry key="datasource.es.maxActive">20</entry>

测试

mybatis xml

1
2
3
4
5
6
7
8
9
10
<select id="qryTest2" parameterType="java.lang.Object" resultType="java.util.Map">
POST ${index}/${type}/_search?
{
"query": {
"prefix": {
"name": "${name}"
}
}
}
</select>

java

1
2
3
4
5
6
7
8
ESTestServiceImpl esTestService = SpringBeanUtil.getInstance().getBean("esTestService",ESTestServiceImpl.class);
Map param = new HashMap();
param.put("index","radiott");
param.put("type","artiststt");
param.put("name","te");
Map map = esTestService.qryJest(param);
JestResult result = JestUtil.getQueryResult(map);
System.out.println(JSONUtil.toJSONString(result.getSourceAsObject(Map.class)));

打印结果

1
{"name":"test333","id":"3","es_metadata_id":"AVx7JhglJRLJihedvp1b"}