Skip to content

Commit

Permalink
bulk insert batch requests (#3628)
Browse files Browse the repository at this point in the history
* bulk insert batch requests

* address PR comments
  • Loading branch information
michaelcheah authored Oct 2, 2021
1 parent 25fcabc commit 62d9b55
Showing 1 changed file with 48 additions and 23 deletions.
71 changes: 48 additions & 23 deletions components/seldon-request-logger/app/default_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import array
import traceback
from flask import jsonify
from elasticsearch import Elasticsearch, helpers

MAX_PAYLOAD_BYTES = 300000
app = Flask(__name__)
Expand Down Expand Up @@ -146,29 +147,8 @@ def process_and_update_elastic_doc(
if type(new_content_part["instance"]) == type([]) and not (new_content_part["dataType"] == "json"):
# if we've a list then this is batch
# we assume first dimension is always batch

no_items_in_batch = len(new_content_part["instance"])
index = 0
elements = None
if "elements" in new_content_part:
elements = new_content_part["elements"]

for num, item in enumerate(new_content_part["instance"],start=0):

item_body = doc_body.copy()

item_body[message_type]["instance"] = item

if type(elements) == type([]) and len(elements) > num:
item_body[message_type]["elements"] = elements[num]

item_request_id = build_request_id_batched(
request_id, no_items_in_batch, index
)
added_content.append(upsert_doc_to_elastic(
elastic_object, message_type, item_body, item_request_id, index_name
))
index = index + 1
bulk_upsert_doc_to_elastic(elastic_object, message_type, doc_body,
doc_body[message_type].copy(), request_id, index_name)
else:
#not batch so don't batch elements either
if "elements" in new_content_part and type(new_content_part["elements"]) == type([]):
Expand Down Expand Up @@ -301,6 +281,51 @@ def upsert_doc_to_elastic(
return new_content


def bulk_upsert_doc_to_elastic(
elastic_object: Elasticsearch, message_type, doc_body, new_content_part, request_id, index_name
):
log_mapping.get_log_metadata(elastic_object, message_type, doc_body, request_id, index_name)
no_items_in_batch = len(new_content_part["instance"])
elements = None
if "elements" in new_content_part:
elements = new_content_part["elements"]

def gen_data():
for num, item in enumerate(new_content_part["instance"], start=0):

item_body = doc_body.copy()
item_body[message_type]["instance"] = item
if type(elements) == type([]) and len(elements) > num:
item_body[message_type]["elements"] = elements[num]

item_request_id = build_request_id_batched(
request_id, no_items_in_batch, num
)

print(
"bulk upserting to doc "
+ index_name
+ "/"
+ (log_helper.DOC_TYPE_NAME if log_helper.DOC_TYPE_NAME != None else "_doc")
+ "/"
+ request_id
+ " adding "
+ message_type
)

yield {
"_index": index_name,
"_type": log_helper.DOC_TYPE_NAME,
"_op_type": "update",
"_id": item_request_id,
"_source": {"doc_as_upsert": True, "doc": item_body},
}

helpers.bulk(
elastic_object, gen_data(), refresh=True
)


# take request or response part and process it by deriving metadata
def process_content(message_type, content, headers):

Expand Down

0 comments on commit 62d9b55

Please sign in to comment.