Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(IP-1950): msk -> rds로 가는 람다 추가 #7

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
# pip install aws-cdk
aws-cdk-lib==2.10.0
aws-cdk-lib==2.41.0
constructs>=10.0.0,<11.0.0

# pip install boto3
boto3==1.14.13
boto3~=1.25.3
botocore==1.17.13

# pip install elasticsearch
elasticsearch>=7.0.0,<7.11
opensearch-py==1.1.0
requests==2.23.0
requests-aws4auth==0.9

# pip install crypto
pycryptodome==3.15.0

setuptools~=60.2.0

awswrangler~=2.17.0
pandas~=1.5.1
23 changes: 23 additions & 0 deletions src/main/python/MskToRds/config/ad_payment_by_creative_rt.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"tableName": "ad_payment_by_creative_rt_test02",
"numericColumns": [
"vat_amount"
],
"datetimeColumns": [
"window_start"
],
"datetimeFormat": "%Y-%m-%d %H:00:00",
"groupBy": [
"ws_idx",
"creative_idx",
"product_idx",
"vat_amount",
"window_start"
],
"columnToSum": "vat_amount",
"renameColumn": {
"window_start": "dt",
"vat_amount": "price"
},
"secretId": "beluga-data-mart/master"
}
83 changes: 83 additions & 0 deletions src/main/python/MskToRds/msk_to_rds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import base64
import itertools
import json
import os
import traceback

import awswrangler as wr
import pandas as pd

CONFIG_FILE = os.getenv("CONFIG_FILE") # 파일경로/파일명.json


def lambda_handler(event, context):
"""
kafka topic -> rds
ad_payment_by_creative -> ad_payment_by_creative_rt

MSK 권한 추가 필요
https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/lambda-intro-execution-role.html#permissions-executionrole-features
https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/with-msk.html#msk-permissions-iam-policy#msk-permissions-iam-policy

Secret Manager 추가 필요
https://docs.aws.amazon.com/mediaconnect/latest/ug/iam-policy-examples-asm-secrets.html

RDS 권한 필요
"""
try:
config = json.load(open(CONFIG_FILE, "r"))

"""
현재 카프카만 고려하여 데이터프레임 추출 중
다른 이벤트 소스 고려해야할 경우 event.eventSource 사용해서 분기 가능
"""
records = list(event["records"].values())
messages = list(
map(
lambda record: json.loads(base64.b64decode(
record["value"]).decode("utf-8")),
itertools.chain.from_iterable(records)
)
)
print(f"number of messages: {len(messages)}")
df = pd.DataFrame(
messages
)

for datetime_column in config["datetimeColumns"]:
df[datetime_column] = pd.to_datetime(
arg=df[datetime_column],
utc=True # KST 원할 경우 False
)
df[datetime_column] = df[datetime_column].dt.strftime(config["datetimeFormat"])

for numeric_column in config["numericColumns"]:
df[numeric_column] = pd.to_numeric(df[numeric_column])

df = df.groupby(
by=config["groupBy"],
group_keys=True,
as_index=False
)
df = df[config["columnToSum"]].sum()
df.rename(
columns=config["renameColumn"],
inplace=True
)

secret_id = config["secretId"]
con = wr.mysql.connect(secret_id=secret_id)
wr.mysql.to_sql(
df=df,
table=config["tableName"],
schema=con.db.decode("utf-8"),
con=con,
use_column_names=True,
mode="upsert_duplicate_key" # 단순 insert는 append 사용
)
con.close()

except Exception as ex:
print(event)
print(context)
traceback.print_exc()