Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicit mapping for elasticsearch Indexes in request logger component #3166

Merged
merged 5 commits into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/seldon-request-logger/app/default_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import sys
import log_helper
import log_mapping
from collections.abc import Iterable
import array

Expand Down Expand Up @@ -176,6 +177,7 @@ def build_request_id_batched(request_id, no_items_in_batch, item_index):
def upsert_doc_to_elastic(
elastic_object, message_type, upsert_body, request_id, index_name
):
log_mapping.get_log_metadata(elastic_object, message_type, upsert_body, request_id, index_name)
upsert_doc = {
"doc_as_upsert": True,
"doc": upsert_body,
Expand Down Expand Up @@ -427,6 +429,7 @@ def extractRow(


def createElelmentsArray(X: np.ndarray, names: list):
# TODO: Fetch deployment metadata and create nested elements array for PROBA and ONE_HOT types
results = None
if isinstance(X, np.ndarray):
if len(X.shape) == 1:
Expand Down
139 changes: 139 additions & 0 deletions components/seldon-request-logger/app/log_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import log_helper

default_mapping = {
"properties": {
"@timestamp": {
"type": "date"
},
"Ce-Endpoint": {
"type": "keyword"
},
"Ce-Inferenceservicename": {
"type": "keyword"
},
"Ce-Modelid": {
"type": "keyword"
},
"Ce-Namespace": {
"type": "keyword"
},
"RequestId": {
"type": "keyword"
},
"ServingEngine": {
"type": "keyword"
},
"request": {
"properties": {
"ce-source": {
"type": "keyword",
"index": "false"
},
"ce-time": {
"type": "date",
"index": "false"
},
"dataType": {
"type": "keyword"
}
}
},
"response": {
"properties": {
"ce-source": {
"type": "keyword",
"index": "false"
},
"ce-time": {
"type": "date",
"index": "false"
},
"dataType": {
"type": "keyword"
}
}
}
}
}


def get_log_metadata(elastic_object, message_type, upsert_body, request_id, index_name):
index_exist = elastic_object.indices.exists(index=index_name)
if not index_exist:
print("Index doesn't exists. Creating index with mapping for ", index_name)
try:
mapping_body = get_index_mapping(index_name, upsert_body)
elastic_object.indices.create(
index=index_name,
body={"mappings": mapping_body}
)
print("Created index with mapping ->", index_name)
except Exception as ex:
print(ex)


def get_index_mapping(index_name, upsert_body):
index_mapping = default_mapping.copy()
inferenceservice_name = upsert_body[log_helper.INFERENCESERVICE_HEADER_NAME] if log_helper.INFERENCESERVICE_HEADER_NAME in upsert_body else ""
namespace_name = upsert_body[log_helper.NAMESPACE_HEADER_NAME] if log_helper.NAMESPACE_HEADER_NAME in upsert_body else ""
serving_engine = upsert_body["ServingEngine"] if "ServingEngine" in upsert_body else "seldon"

metadata = fetch_metadata(
namespace_name, serving_engine, inferenceservice_name)
if not metadata:
return index_mapping
else:
print("Retrieved metadata for index", index_name)
if "requests" in metadata:
req_mapping = get_field_mapping(metadata["requests"])
if req_mapping != None:
index_mapping["properties"]["request"]["properties"]["elements"] = req_mapping

if "responses" in metadata:
resp_mapping = get_field_mapping(metadata["responses"])
if resp_mapping != None:
index_mapping["properties"]["response"]["properties"]["elements"] = resp_mapping
return index_mapping


def get_field_mapping(metadata):
props = {}
if not metadata:
return None
else:
for elm in metadata:
props = update_props_element(props, elm)
return None if not props else {"properties": props}


def fetch_metadata(namespace, serving_engine, inferenceservice_name):
# TODO: Fetch real metadata
return None


def update_props_element(props, elm):
if not ("type" in elm):
props[elm["name"]] = {
"type": "float" # TODO: Use data type if available
}
return props

else:
if elm["type"] == "CATEGORICAL":
props[elm["name"]] = {
"type": "keyword"
}
return props
if elm["type"] == "TEXT":
props[elm["name"]] = {
"type": "text"
}
return props
if elm["type"] == "PROBA" or elm["type"] == "ONE_HOT":
props[elm["name"]] = get_field_mapping(elm["schema"])
return props
else: # For REAL,TENSOR
props[elm["name"]] = {
"type": "float" # TODO: Use data type if available
}
return props