-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlambda_mediaconvert_job.py
83 lines (72 loc) · 3.22 KB
/
lambda_mediaconvert_job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import json
import os
import boto3
from botocore.exceptions import ClientError
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['DYNAMODB_TABLE'])
sqs = boto3.client('sqs')
s3 = boto3.client('s3')
def handler(event, context):
for record in event['Records']:
message = json.loads(record['body'])
detail = message['detail']
object_key = detail['userMetadata']['id']
bucket = detail['userMetadata']['bucket']
try:
# video has been processed, delete it from raw video bucket
s3.delete_object(Bucket=bucket, Key=object_key)
print(f"Deleted object {object_key} from bucket {bucket}")
except ClientError as e:
print(f"Error deleting object from S3: {e.response['Error']['Message']}")
# update the processing status in DynamoDB
if detail['status'] == 'COMPLETE':
status = 'done'
elif detail['status'] == 'ERROR':
status = 'failed'
else:
print(f"Unknown status: {detail['status']}")
return {
'statusCode': 500,
'body': json.dumps('Unknown MediaConvert job status')
}
# Extract duration from HLS_GROUP output details
duration = None
for output_group in detail.get('outputGroupDetails', []):
if output_group.get('type') == 'HLS_GROUP':
for output in output_group.get('outputDetails', []):
duration_ms = output.get('durationInMs')
if duration_ms:
duration = int(duration_ms / 1000) # Convert to seconds and remove fractional part
break
if duration:
break
try:
update_expression = 'SET #status = :status'
expression_attribute_values = {':status': status}
expression_attribute_names = {'#status': 'status'}
if duration is not None:
update_expression += ', videoDuration = :duration'
expression_attribute_values[':duration'] = duration
response = table.update_item(
Key={'id': object_key},
UpdateExpression=update_expression,
ConditionExpression='attribute_exists(id)',
ExpressionAttributeNames=expression_attribute_names,
ExpressionAttributeValues=expression_attribute_values,
ReturnValues='UPDATED_NEW'
)
print(f"Updated item {object_key} status to {status} and duration to {duration} seconds")
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
print(f"Item with id {object_key} does not exist in the table")
else:
print(f"Error updating DynamoDB: {e.response['Error']['Message']}")
# Remove the processed message from the queue
sqs.delete_message(
QueueUrl=os.environ['SQS_QUEUE_URL'],
ReceiptHandle=record['receiptHandle']
)
return {
'statusCode': 200,
'body': json.dumps('MediaConvert job details processed and removed from queue')
}