Skip to content

Commit

Permalink
update es query script
Browse files Browse the repository at this point in the history
  • Loading branch information
shaunahu committed Nov 13, 2024
1 parent d0a5939 commit 06ee59c
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 18 deletions.
94 changes: 77 additions & 17 deletions data_discovery_ai/utils/es_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,92 @@ def connect_es(config: configparser.ConfigParser) -> Elasticsearch:

def search_es(client: Elasticsearch):
index = "es-indexer-staging"
start_index = 0
start = 0
dataframes = []
batch_size = 100
resp = client.search(index=index, body={"query": {"match_all": {}}})
total_number = resp["hits"]["total"]["value"]

# get document count
count_resp = client.count(index=index)
total_number = count_resp["count"]
rounds = (total_number + batch_size - 1) // batch_size

# TODO: refactor the search query method.
for round in tqdm(range(rounds), desc="searching elasticsearch"):
search_query = {
"size": batch_size,
"from": start_index,
"query": {"match_all": {}},
}

resp = client.search(index=index, body=search_query)

data = resp["hits"]["hits"]
# get the first pit
first_pit_resp = client.open_point_in_time(
index=index,
keep_alive="1m",
)
first_pit = first_pit_resp["id"]

# the first search
first_query_body = {
"size": 100,
"query": {
"match_all": {}
},
"pit": {
"id": first_pit,
"keep_alive": "1m"
},
"sort": [
{
"id.keyword": "asc"
}
]
}
first_query_resp = client.search(body=first_query_body)

data = first_query_resp["hits"]["hits"]
# set search after value
current_last_result = data[-1]["sort"]

# save the first search result
df = pd.json_normalize(data)
dataframes.append(df)


# set current pit as the first one
current_pit = first_query_resp["pit_id"]

# conduct further search
for round in tqdm(range(1, rounds), desc="searching elasticsearch"):
qurry_body = {
"size": 100,
"query": {
"match_all": {}
},
"pit": {
"id": current_pit,
"keep_alive": "1m"
},
"sort": [
{
"id.keyword": "asc"
}
],
"search_after": current_last_result,
"track_total_hits": False
}
query_resp = client.search(body=qurry_body)

data = query_resp["hits"]["hits"]
# set search after value
current_last_result = data[-1]["sort"]

# save the first search result
df = pd.json_normalize(data)
dataframes.append(df)

start_index += 1
# set current pit as the first one
current_pit = query_resp["pit_id"]

round += 1
time.sleep(1)
time.sleep(10)

raw_data = pd.concat(dataframes, ignore_index=True)
# close pit
resp = client.close_point_in_time(
id=current_pit,
)
print(resp)

raw_data = pd.concat(dataframes, ignore_index=True)
return raw_data
22 changes: 22 additions & 0 deletions extras/es_script.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,27 @@ POST /es-indexer-edge/_search
}
```

## Find all records with paginate search
1. the initial search by sort
```
POST /es-indexer-staging/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"summaries.creation": "asc"
}
]
}
```
2. set pit
```
POST /es-indexer-staging/_pit?keep_alive=1m
```

3. search with `search_after`

References:
[1] [how do elastic search show all the hits for query](https://stackoverflow.com/questions/64871466/how-do-elastic-search-show-all-the-hits-for-query)
2 changes: 1 addition & 1 deletion tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def test():
isDataChanged=False,
usePretrainedModel=False,
description=item_description,
selected_model="development",
selected_model="experimental",
)


Expand Down

0 comments on commit 06ee59c

Please sign in to comment.