diff --git a/bulkExport/glueScripts/export-script.py b/bulkExport/glueScripts/export-script.py index 64eb59e0..6a6af44c 100644 --- a/bulkExport/glueScripts/export-script.py +++ b/bulkExport/glueScripts/export-script.py @@ -31,6 +31,9 @@ groupId = None if ('--{}'.format('groupId') in sys.argv): groupId = getResolvedOptions(sys.argv, ['groupId'])['groupId'] +tenantId = None +if ('--{}'.format('tenantId') in sys.argv): + tenantId = getResolvedOptions(sys.argv, ['tenantId'])['tenantId'] job_id = args['jobId'] export_type = args['exportType'] @@ -60,12 +63,21 @@ } ) +print('Start filtering by tenantId') +# Filter by tenantId +if (tenantId is None): + filtered_tenant_id_frame = original_data_source_dyn_frame +else: + filtered_tenant_id_frame = Filter.apply(frame = original_data_source_dyn_frame, + f = lambda x: + x['_tenantId'] == tenantId) + print('Start filtering by transactionTime and Since') # Filter by transactionTime and Since datetime_since = datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%fZ") datetime_transaction_time = datetime.strptime(transaction_time, "%Y-%m-%dT%H:%M:%S.%fZ") -filtered_dates_dyn_frame = Filter.apply(frame = original_data_source_dyn_frame, +filtered_dates_dyn_frame = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") > datetime_since and datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") <= datetime_transaction_time @@ -81,6 +93,7 @@ else x["documentStatus"] in valid_document_state_to_be_read_from and x["resourceType"] in type_list ) + # Drop fields that are not needed print('Dropping fields that are not needed') data_source_cleaned_dyn_frame = DropFields.apply(frame = filtered_dates_resource_dyn_frame, paths = ['documentStatus', 'lockEndTs', 'vid', '_references']) @@ -124,7 +137,8 @@ def add_dup_resource_type(record): source_s3_file_path = item['Key'] match = re.search(regex_pattern, source_s3_file_path) new_s3_file_name = match.group(1) + "-" + match.group(2) + ".ndjson" - new_s3_file_path = job_id + '/' + new_s3_file_name + tenant_specific_path = '' if (tenantId is None) else tenantId + '/' + new_s3_file_path = tenant_specific_path + job_id + '/' + new_s3_file_name copy_source = { 'Bucket': bucket_name, diff --git a/bulkExport/state-machine-definition.yaml b/bulkExport/state-machine-definition.yaml index 7342fb41..9256c92b 100644 --- a/bulkExport/state-machine-definition.yaml +++ b/bulkExport/state-machine-definition.yaml @@ -27,21 +27,21 @@ definition: updateStatusToFailed: Type: Task Resource: !GetAtt updateStatus.Arn - Parameters: {"jobId.$":"$.jobId", "status": "failed"} + Parameters: {"globalParams.$":"$", "status": "failed"} Retry: - ErrorEquals: [ "States.ALL" ] End: true updateStatusToCanceled: Type: Task Resource: !GetAtt updateStatus.Arn - Parameters: {"jobId.$":"$.jobId", "status": "canceled"} + Parameters: {"globalParams.$":"$", "status": "canceled"} Retry: - ErrorEquals: [ "States.ALL" ] End: true updateStatusToCompleted: Type: Task Resource: !GetAtt updateStatus.Arn - Parameters: {"jobId.$":"$.jobId", "status": "completed"} + Parameters: {"globalParams.$":"$", "status": "completed"} Retry: - ErrorEquals: [ "States.ALL" ] End: true