-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathface_images.py
93 lines (77 loc) · 3.08 KB
/
face_images.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import builtins
from infra.interfaces import IVpcRivStack
import aws_cdk as core
from constructs import Construct
from aws_cdk import (
aws_s3 as s3,
aws_sqs as sqs,
aws_sns as sns,
aws_s3_notifications as s3n,
aws_sns_subscriptions as subs,
)
from infra.configsettings import ConfigManager
config_mgr = ConfigManager()
class RivStorageImageStore(Construct):
'''
Represents the ImageStore construct.
'''
@property
def image_bucket(self)->s3.IBucket:
'''
Gets the bucket holding the images.
'''
return self.__image_bucket
@image_bucket.setter
def image_bucket(self,value:s3.IBucket)->None:
self.__image_bucket = value
if config_mgr.use_inventory_bucket:
@property
def inventory_bucket(self)->s3.IBucket:
'''
Gets the inventory bucket associated with the image bucket
'''
return self.__inventory_bucket
@inventory_bucket.setter
def inventory_bucket(self,value:s3.IBucket)->None:
self.__inventory_bucket = value
@property
def inventory_created(self)->sns.ITopic:
'''
Gets the notification topic that an Amazon S3 Inventory finished.
'''
return self.__inventory_created
@inventory_created.setter
def inventory_created(self,value:sns.ITopic)->None:
self.__inventory_created = value
def __init__(self, scope: Construct, id: builtins.str, riv_stack:IVpcRivStack) -> None:
super().__init__(scope, id)
self.image_bucket = s3.Bucket(self,'ImageBucket',
removal_policy= core.RemovalPolicy.RETAIN)
if config_mgr.use_inventory_bucket:
# Create the inventory bucket...
self.inventory_bucket = s3.Bucket(self,'InventoryBucket',
removal_policy= core.RemovalPolicy.DESTROY)
self.image_bucket.add_inventory(
#objects_prefix='images/',
inventory_id='{}-InventoryReport'.format('Full'),
format =s3.InventoryFormat.CSV,
frequency= s3.InventoryFrequency.DAILY,
include_object_versions= s3.InventoryObjectVersion.CURRENT,
destination= s3.InventoryDestination(
bucket=self.inventory_bucket,
bucket_owner= core.Aws.ACCOUNT_ID,
prefix=None))
# Broadcast inventory creation events...
self.inventory_created = sns.Topic(self,'InventoryCreated',
display_name='{}-ImageStore-InventoryCreated'.format(riv_stack.riv_stack_name),
topic_name='{}-ImageStore-InventoryCreated'.format(riv_stack.riv_stack_name))
self.inventory_bucket.add_event_notification(
s3.EventType.OBJECT_CREATED,
s3n.SnsDestination(topic=self.inventory_created),
s3.NotificationKeyFilter(suffix='manifest.json'))
# Persist the notification in an SQS topic to simplify debugging.
self.inventory_created_debug_queue:sqs.IQueue = sqs.Queue(self,'InventoryCreatedDebugQueue',
retention_period=core.Duration.days(14))
self.inventory_created.add_subscription(subs.SqsSubscription(
queue=self.inventory_created_debug_queue,
raw_message_delivery=True))