7. 关于ES数据读写那点事儿

【ElasticSearch系列连载】7. 关于ES数据读写那点事儿

1 对文档建索引

1.1 自定义文档ID

如果数据本身有自己的唯一标记,那么在建立索引时可以使用id来指定文档的id。

如下,使用curl在your_index索引下写入一个id=1001的文档。

1
2
3
4
5
curl -H "Content-Type: application/json" -XPOST 'http://es00:9200/your_index/_doc/1001' -d '
{
"field": "内容"
}
'

返回如下

1
2
3
4
5
6
7
{
"_index": "your_index",
"_type": "_doc",
"_id": "1001",
"_version": 1,
"result": "created"
}

可以看到除了index, type和id字段,还有一个version字段。

在ES中每个文档都有一个自己的version编号,每当文档发生变化时,version就会增长。

1.2 使用自增ID

如果文档没有唯一标识,也可以让ES帮你自动生成文档ID,对应请求的方式也要发生变化:使用POST方法。

1
2
3
4
5
curl -H "Content-Type: application/json" -XPOST 'http://es00:9200/your_index/_doc' -d'
{
"field": "内容"
}
'

会自动生成一个id,使用base64编码的UUID。

1
2
3
4
5
6
7
{
"_index": "your_index",
"_type": "_doc",
"_id": "cZeQEYIBJnF0z3Du5UvX",
"_version": 1,
"result": "created"
}

2 获取文档

使用GET请求获取id为1001的文档。

1
http://es00:9200/your_index/_doc/1001?pretty

返回的数据体在_source字段中。

如果你查找的文档不存在,found字段会变为false(同时返回的HTTP状态码为404)。

1
2
3
4
5
6
7
8
9
10
{
"_index" : "your_index",
"_type" : "_doc",
"_id" : "1001",
"_version" : 1,
"found" : true,
"_source" : {
"field" : "内容"
}
}

3 只获取文档的部分字段

正常情况下GET请求会将文档的所有字段都进行返回。但如果你只需要部分字段的话,可以通过在URL中使用_source参数来控制需要返回的字段。

1
http://es00:9200/your_index/_doc/1001?pretty&_source=field

_source中只包含了需要的字段部分。

1
2
3
4
5
6
7
8
9
10
{
"_index" : "your_index",
"_type" : "_doc",
"_id" : "1001",
"_version" : 1,
"found" : true,
"_source" : {
"field" : "内容"
}
}

如果只是想要获取数据体而不需要其他辅助信息,那么使用_source即可。

1
http://es00:9200/your_index/_doc/1001/_source

返回如下,直接是数据体

1
2
3
{
"field": "内容"
}

4 文档更新

首先,文档数据在ES中是不可修改的。所以当我们需要对一个已有的文档进行更新时,会自动进行如下的系列操作:

  1. 获取要更新的文档
  2. 根据修改请求对其在临时存储中进行修改
  3. 删掉要更新的文档
  4. 重建文档

即:先删除,再新建

请求方式和新建文档一样。

1
2
3
4
5
curl -H "Content-Type: application/json" -XPOST 'http://es00:9200/your_index/_doc/1001' -d '
{
"field": "内容"
}
'

返回如下

1
2
3
4
5
6
7
{
"_index": "your_index",
"_type": "_doc",
"_id": "1001",
"_version": 1,
"result": "created"
}

5 创建全新文档

如果我们只希望在文档不存在时新建文档,而不是无条件覆盖的话,可以在请求时添加_create,这样的话,如果这个文档id已经存在则不会新建成功。

比如请求如下,仍然新建id为1001的文档。

1
2
3
4
5
curl -H "Content-Type: application/json" -XPOST 'http://es00:9200/your_index/_doc/1001/_create' -d '
{
"field": "内容"
}
'

返回如下,409

1
2
3
4
5
6
7
8
9
10
{
"error": {
"type": "version_conflict_engine_exception",
"reason": "[1001]: version conflict, document already exists (current version [2])",
"index_uuid": "a9ZncSd_TJWK411z1Bb76g",
"shard": "0",
"index": "your_index"
},
"status": 409
}

6 删除文档

进行如下DELETE请求即可进行id=1001的文档删除操作。

1
curl -XDELETE 'http://es00:9200/your_index/_doc/1001'

如果文档存在,返回如下(deleted)

1
2
3
4
5
6
7
{
"_index": "your_index",
"_type": "_doc",
"_id": "1001",
"_version": 3,
"result": "deleted"
}

如果文档不存在,返回如下(not_found, 404)

1
2
3
4
5
6
7
{
"_index": "your_index",
"_type": "_doc",
"_id": "1001",
"_version": 3,
"result": "not_found"
}

7 处理冲突

如上文所述,如果我们要更新一个文档,需要先读取到原始的文档,对其进行临时修改,然后重新新建这个文档。

即:如果两个人同时获取 & 修改文档,谁最后修改,谁的就生效。

大部分情况下都不影响:

  1. 比如数据是从关系型数据库同步过来的,无所谓冲突
  2. 比如虽然两个人都是修改,但就是按照先后顺序就好了,业务上可以承受另一个人的修改不生效

但是有时候就会影响,比如售卖一个东西,如图

两个人同时购买时,两个网站此时获取到的库存都是100,购买时两个网站都试图将100减一变成99,一起更新,最后的库存变成99(应该是98)。

不难发现,当更新变动的频率越快,以及读取数据到更新数据的周期越长时,就越容易出现上面问题。

在大数据的领域,通常有两种方式保障并发更新的正确性。

  1. 悲观并发控制

    这个是关系型数据库广泛使用的,假设冲突修改会经常发生,所以会很严格的控制对数据资源的访问来防止冲突的发生。比如:在读取数据时会将目标数据上锁,确保只有这个线程可以修改这条数据,期间其他线程无法对这条数据进行操作

  2. 乐观并发控制

    也是ES使用的并发控制模式。这种模式假设冲突不经常发生,所以在尝试数据修改时不会加锁、不会阻碍其他线程对数据的访问。但是,如果在读取到更新期间发现数据有过更新记录,则可以阻止这次的更新操作并报错。剩下的交给调用方去判断:是继续使用新的数据再来一次更新

8 文档版本控制

ES是一个分布式系统,所以当一个文档发生变化时,ES需要将这个文档的变化推送到各个节点中去,这个过程是并发的,所以ES需要有一个机制保证一个文档的旧版本不会覆盖新版本。通过在更新数据时添加long类型的version字段可以保障这一点:如果ES发现此次更新数据携带的version低于目前数据的version则将会拒绝。

1
2
3
4
5
curl -H "Content-Type: application/json" -XPOST 'http://es00:9200/your_index/_doc/1001?version=1&version_type=external' -d '
{
"field": "内容"
}
'

9 对文档进行局部更新

通过使用_update方法即可对文档的部分字段进行局部更新,但是我们要知道,这个局部更新也没有违背”ES中的数据都是只读“的原则,也是进行了如下系列的操作:

  1. 获取要更新的文档
  2. 根据修改请求对其在临时存储中进行修改
  3. 删掉要更新的文档
  4. 重建文档
1
2
3
4
5
6
7
curl -H "Content-Type: application/json" -XPOST 'http://es00:9200/your_index/_doc/1001/_update' -d '
{
"doc":{
"field": "更新内容"
}
}
'

10 使用脚本更新文档

我们可以使用ES自带的脚本语言Groovy进行文档的更新。

1 新建一个条数据

1
2
3
4
5
6
curl -H "Content-Type: application/json" -XPOST 'http://es00:9200/your_index/_doc/1001' -d '
{
"tag": ["book", "tool"],
"count": 100
}
'

2 count + 20

1
2
3
4
5
curl -H "Content-Type: application/json" -XPOST 'http://es00:9200/your_index/_doc/1001/_update' -d '
{
"script" : "ctx._source.count+=20"
}
'

3 upsert
如果担心更新的目标没有对应字段(比如上面的count),可以使用upsert指定默认值。

1
2
3
4
5
6
7
8
curl -H "Content-Type: application/json" -XPOST 'http://es00:9200/your_index/_doc/1002/_update' -d '
{
"script" : "ctx._source.count+=20",
"upsert": {
"count": 1
}
}
'

11 更新冲突重试

通过指定retry_on_conflict字段,即可在更新失败时进行自动重试,减少丢数据的风险。

1
2
3
4
5
6
7
8
curl -H "Content-Type: application/json" -XPOST 'http://es00:9200/your_index/_doc/1002/_update?retry_on_conflict=5' -d '
{
"script" : "ctx._source.count+=20",
"upsert": {
"count": 1
}
}
'

12 批量获取不同类型的文档数据

当你需要同时获取不同索引的数据时,可以使用mget方法,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
curl -H "Content-Type: application/json" -XGET 'http://es00:9200/_mget' -d '
{
"docs": [
{
"_index": "your_index",
"_id": "cZeQEYIBJnF0z3Du5UvX"
},
{
"_index": "test",
"_id": "cJd-EYIBJnF0z3DuYUuz"
}
]
}
'

批量执行建议

通常一次请求文档数量在1000-5000个,请求体大概控制在5-15MB。