diff --git a/components/seldon-request-logger/app/default_logger.py b/components/seldon-request-logger/app/default_logger.py index 0ff3cc5dd9..74dad8a566 100644 --- a/components/seldon-request-logger/app/default_logger.py +++ b/components/seldon-request-logger/app/default_logger.py @@ -16,6 +16,7 @@ import array import traceback from flask import jsonify +from elasticsearch import Elasticsearch, helpers MAX_PAYLOAD_BYTES = 300000 app = Flask(__name__) @@ -146,29 +147,8 @@ def process_and_update_elastic_doc( if type(new_content_part["instance"]) == type([]) and not (new_content_part["dataType"] == "json"): # if we've a list then this is batch # we assume first dimension is always batch - - no_items_in_batch = len(new_content_part["instance"]) - index = 0 - elements = None - if "elements" in new_content_part: - elements = new_content_part["elements"] - - for num, item in enumerate(new_content_part["instance"],start=0): - - item_body = doc_body.copy() - - item_body[message_type]["instance"] = item - - if type(elements) == type([]) and len(elements) > num: - item_body[message_type]["elements"] = elements[num] - - item_request_id = build_request_id_batched( - request_id, no_items_in_batch, index - ) - added_content.append(upsert_doc_to_elastic( - elastic_object, message_type, item_body, item_request_id, index_name - )) - index = index + 1 + bulk_upsert_doc_to_elastic(elastic_object, message_type, doc_body, + doc_body[message_type].copy(), request_id, index_name) else: #not batch so don't batch elements either if "elements" in new_content_part and type(new_content_part["elements"]) == type([]): @@ -301,6 +281,51 @@ def upsert_doc_to_elastic( return new_content +def bulk_upsert_doc_to_elastic( + elastic_object: Elasticsearch, message_type, doc_body, new_content_part, request_id, index_name + ): + log_mapping.get_log_metadata(elastic_object, message_type, doc_body, request_id, index_name) + no_items_in_batch = len(new_content_part["instance"]) + elements = None + if "elements" in new_content_part: + elements = new_content_part["elements"] + + def gen_data(): + for num, item in enumerate(new_content_part["instance"], start=0): + + item_body = doc_body.copy() + item_body[message_type]["instance"] = item + if type(elements) == type([]) and len(elements) > num: + item_body[message_type]["elements"] = elements[num] + + item_request_id = build_request_id_batched( + request_id, no_items_in_batch, num + ) + + print( + "bulk upserting to doc " + + index_name + + "/" + + (log_helper.DOC_TYPE_NAME if log_helper.DOC_TYPE_NAME != None else "_doc") + + "/" + + request_id + + " adding " + + message_type + ) + + yield { + "_index": index_name, + "_type": log_helper.DOC_TYPE_NAME, + "_op_type": "update", + "_id": item_request_id, + "_source": {"doc_as_upsert": True, "doc": item_body}, + } + + helpers.bulk( + elastic_object, gen_data(), refresh=True + ) + + # take request or response part and process it by deriving metadata def process_content(message_type, content, headers):