alias esrt='uvx esrt@8.2.0'
esrt -V
alias esrt='pipx run esrt==8.2.0'
esrt -V
search
scan
request
bulk
ping
You can start an es service with docker.
docker run --name "esrt-es" --rm -itd --platform=linux/amd64 -p 9200:9200 elasticsearch:5.6.9-alpine
# install sql command and restart container:
docker exec "esrt-es" elasticsearch-plugin install https://github.com/NLPchina/elasticsearch-sql/releases/download/5.6.9.0/elasticsearch-sql-5.6.9.0.zip
docker restart "esrt-es"
Check server:
esrt es request localhost -X HEAD
# ->
# true
Create a index:
esrt es request localhost -X PUT /my-index
# ->
# {
# "acknowledged": true,
# "shards_acknowledged": true,
# "index": "my-index"
# }
If you want to esrt
quote url path for you, add flag: -Q
(--quote-url
)
Cat it:
esrt es request localhost -X GET /_cat/indices
# ->
# yellow open my-index NMHssX4qTgeMFrA3cXPoKg 5 1 0 0 324b 324b
esrt es request localhost -X GET /_cat/indices -p v=
# ->
# health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
# yellow open my-index NMHssX4qTgeMFrA3cXPoKg 5 1 0 0 810b 810b
esrt es request localhost -X GET /_cat/indices -p v= -p format=json
# ->
# [
# {
# "health": "yellow",
# "status": "open",
# "index": "my-index",
# "uuid": "NMHssX4qTgeMFrA3cXPoKg",
# "pri": "5",
# "rep": "1",
# "docs.count": "0",
# "docs.deleted": "0",
# "store.size": "810b",
# "pri.store.size": "810b"
# }
# ]
Bulk with data from file examples/bulk.ndjson
:
{ "_op_type": "index", "_index": "my-index", "_type": "type1", "_id": "1", "field1": "ii" }
{ "_op_type": "delete", "_index": "my-index", "_type": "type1", "_id": "1" }
{ "_op_type": "create", "_index": "my-index", "_type": "type1", "_id": "1", "field1": "cc" }
{ "_op_type": "update", "_index": "my-index", "_type": "type1", "_id": "1", "doc": {"field2": "uu"} }
esrt es bulk localhost -y -f examples/bulk.ndjson
# ->
# ⠋ bulk ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00 4/? ?
Read payload from stdin
. And -d
can be omitted.
esrt es bulk localhost -y <<EOF
{ "_op_type": "index", "_index": "my-index-2", "_type": "type1", "_id": "1", "field1": "11" }
{ "_op_type": "index", "_index": "my-index-2", "_type": "type1", "_id": "2", "field1": "22" }
{ "_op_type": "index", "_index": "my-index-2", "_type": "type1", "_id": "3", "field1": "33" }
EOF
# ->
# ⠋ bulk ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00 3/? ?
Piping heredoc
also works.
cat <<EOF | esrt es bulk localhost -y
{ "_op_type": "index", "_index": "my-index-2", "_type": "type1", "_id": "1", "field1": "11" }
{ "_op_type": "index", "_index": "my-index-2", "_type": "type1", "_id": "2", "field1": "22" }
{ "_op_type": "index", "_index": "my-index-2", "_type": "type1", "_id": "3", "field1": "33" }
EOF
# ->
# ⠋ bulk ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00 3/? ?
Pipe _search
result and update _index
with customized handler
to do more operations before bulk!
alias jq_es_hits="jq '.hits.hits[]'"
esrt es request localhost -X GET /my-index-2/_search | jq_es_hits -c | esrt es bulk localhost -y -w examples.my-handlers:handle # <- `examples/my-handlers.py`
# ->
# ⠹ bulk ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00 3/? ?
# examples/my-handlers.py
import json
import typing as t
from esrt es import DocHandler
# function style
def my_handler(actions: t.Iterable[str]):
for action in actions:
obj = json.loads(action)
prefix = 'new-'
if not t.cast(str, obj['_index']).startswith(prefix):
obj['_index'] = prefix + obj['_index']
yield obj
# class style
class MyHandler(DocHandler):
def handle(self, actions: t.Iterable[str]):
for action in actions:
yield self.handle_one(action)
def handle_one(self, action: str):
obj = super().handle_one(action)
prefix = 'new-'
if not t.cast(str, obj['_index']).startswith(prefix):
obj['_index'] = prefix + obj['_index']
return obj
esrt es search localhost | jq_es_hits -c
# ->
# {"_index":"my-index-2","_type":"type1","_id":"2","_score":1.0,"_source":{"field1":"22"}}
# {"_index":"new-my-index-2","_type":"type1","_id":"2","_score":1.0,"_source":{"field1":"22"}}
# {"_index":"my-index","_type":"type1","_id":"1","_score":1.0,"_source":{"field1":"cc","field2":"uu"}}
# {"_index":"my-index-2","_type":"type1","_id":"1","_score":1.0,"_source":{"field1":"11"}}
# {"_index":"new-my-index-2","_type":"type1","_id":"1","_score":1.0,"_source":{"field1":"11"}}
# {"_index":"my-index-2","_type":"type1","_id":"3","_score":1.0,"_source":{"field1":"33"}}
# {"_index":"new-my-index-2","_type":"type1","_id":"3","_score":1.0,"_source":{"field1":"33"}}
esrt es search localhost -f - <<EOF | jq_es_hits -c
{"query": {"term": {"_index": "new-my-index-2"}}}
EOF
# ->
# {"_index":"new-my-index-2","_type":"type1","_id":"2","_score":1.0,"_source":{"field1":"22"}}
# {"_index":"new-my-index-2","_type":"type1","_id":"1","_score":1.0,"_source":{"field1":"11"}}
# {"_index":"new-my-index-2","_type":"type1","_id":"3","_score":1.0,"_source":{"field1":"33"}}
esrt es scan localhost -y
# ->
# {"_index": "my-index-2", "_type": "type1", "_id": "2", "_score": null, "_source": {"field1": "22"}, "sort": [0]}
# {"_index": "new-my-index-2", "_type": "type1", "_id": "2", "_score": null, "_source": {"field1": "22"}, "sort": [0]}
# {"_index": "my-index", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "cc", "field2": "uu"}, "sort": [0]}
# {"_index": "my-index-2", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "11"}, "sort": [0]}
# {"_index": "new-my-index-2", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "11"}, "sort": [0]}
# {"_index": "my-index-2", "_type": "type1", "_id": "3", "_score": null, "_source": {"field1": "33"}, "sort": [0]}
# {"_index": "new-my-index-2", "_type": "type1", "_id": "3", "_score": null, "_source": {"field1": "33"}, "sort": [0]}
esrt es scan localhost -y -f - <<EOF
{"query": {"term": {"field1": "cc"}}}
EOF
# ->
# {"_index": "my-index", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "cc", "field2": "uu"}, "sort": [0]}
# examples/create-massive-docs.py
import json
import uuid
if __name__ == '__main__':
for i, _ in enumerate(range(54321), start=1):
d = {
'_index': 'my-index-a',
'_id': i,
'_type': 'type1',
'_source': {'field1': str(uuid.uuid4())},
}
print(json.dumps(d))
python examples/create-massive-docs.py | tee _.ndjson | esrt es bulk localhost -y -c 10000
# ->
# ⠋ bulk ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:26 654321/? 24504/s
cat _.ndjson # <- 79M
head _.ndjson
# ->
# {"_index": "my-index-a", "_type": "type1", "_id": "70004", "_score": null, "_source": {"field1": "7fc553c1-d09f-4793-bc28-2a16a6050ef4"}, "sort": [0]}
# {"_index": "my-index-a", "_type": "type1", "_id": "80002", "_score": null, "_source": {"field1": "7fddf2f7-195f-4964-81f1-bb32d63be8b0"}, "sort": [0]}
# {"_index": "my-index-2", "_type": "type1", "_id": "2", "_score": null, "_source": {"field1": "22"}, "sort": [0]}
# {"_index": "my-index-a", "_type": "type1", "_id": "70003", "_score": null, "_source": {"field1": "2a08f0e0-cdbd-47d3-b7e3-ee8fd1e27ff8"}, "sort": [0]}
# {"_index": "new-my-index-2", "_type": "type1", "_id": "2", "_score": null, "_source": {"field1": "22"}, "sort": [0]}
# {"_index": "my-index", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "cc", "field2": "uu"}, "sort": [0]}
# {"_index": "my-index-2", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "11"}, "sort": [0]}
# {"_index": "new-my-index-2", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "11"}, "sort": [0]}
# {"_index": "my-index-2", "_type": "type1", "_id": "3", "_score": null, "_source": {"field1": "33"}, "sort": [0]}
# {"_index": "new-my-index-2", "_type": "type1", "_id": "3", "_score": null, "_source": {"field1": "33"}, "sort": [0]}
# examples/copy-more-docs.py
from copy import deepcopy
import json
import typing as t
import uuid
if __name__ == '__main__':
for i, _ in enumerate(range(54321), start=1):
d = {
'_index': 'my-index-b',
'_id': i,
'_type': 'type1',
'_source': {'field1': str(uuid.uuid4())},
}
print(json.dumps(d))
def handle(actions: t.Iterable[str]):
for action in actions:
d: dict[str, t.Any] = json.loads(action)
yield d
d2 = deepcopy(d)
d2['_source']['field1'] += '!!!'
d2['_source']['field2'] = str(uuid.uuid4())
yield d2
python examples/copy-more-docs.py | esrt es bulk localhost -y -w examples.copy-more-docs:handle
# ->
# ⠏ bulk ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:05 108642/? 18963/s