From 93ec9247e66f2700e5bc129502f8eff6fe51da85 Mon Sep 17 00:00:00 2001 From: Andrew Pope Date: Thu, 16 Jun 2022 13:28:50 -0600 Subject: [PATCH] fixes dynamodb batch dropping missing entities Signed-off-by: Andrew Pope --- sdk/python/feast/infra/online_stores/dynamodb.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 406bee525f..50709fa3d4 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -221,6 +221,9 @@ def online_read( entity_ids_iter = iter(entity_ids) while True: batch = list(itertools.islice(entity_ids_iter, batch_size)) + batch_result: List[ + Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]] + ] = [] # No more items to insert if len(batch) == 0: break @@ -243,20 +246,23 @@ def online_read( for tbl_res in table_responses: entity_id = tbl_res["entity_id"] while entity_id != batch[entity_idx]: - result.append((None, None)) + batch_result.append((None, None)) entity_idx += 1 res = {} for feature_name, value_bin in tbl_res["values"].items(): val = ValueProto() val.ParseFromString(value_bin.value) res[feature_name] = val - result.append((datetime.fromisoformat(tbl_res["event_ts"]), res)) + batch_result.append( + (datetime.fromisoformat(tbl_res["event_ts"]), res) + ) entity_idx += 1 # Not all entities in a batch may have responses # Pad with remaining values in batch that were not found - batch_size_nones = ((None, None),) * (len(batch) - len(result)) - result.extend(batch_size_nones) + batch_size_nones = ((None, None),) * (len(batch) - len(batch_result)) + batch_result.extend(batch_size_nones) + result.extend(batch_result) return result def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):