Skip to content

Commit

Permalink
Merge pull request Azure#11606 from idoshabi07/fixing-32kb-function-i…
Browse files Browse the repository at this point in the history
…ssue

Fixing 32kb function issue
  • Loading branch information
v-prasadboke authored Dec 30, 2024
2 parents 01c7d14 + 16d157b commit dd34c18
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
Message_Limit = os.getenv('Message_Limit',250)
limit = int(Message_Limit)

FIELD_SIZE_LIMIT_BYTES = 1000 * 32

LOG_ANALYTICS_URI = os.environ.get('logAnalyticsUri')

if not LOG_ANALYTICS_URI or str(LOG_ANALYTICS_URI).isspace():
Expand Down Expand Up @@ -110,6 +112,49 @@ def get_cursor_by_partition(client, stream_id, partition):
cursor = response.data.value
return cursor

def check_size(queue):
data_bytes_len = len(json.dumps(queue).encode())
return data_bytes_len < FIELD_SIZE_LIMIT_BYTES


def split_big_request(queue):
if check_size(queue):
return [queue]
else:
middle = int(len(queue) / 2)
queues_list = [queue[:middle], queue[middle:]]
return split_big_request(queues_list[0]) + split_big_request(queues_list[1])

def process_large_field(event_section, field_name, field_size_limit,max_part=10):
"""Process and split large fields in the event data if they exceed the size limit."""
if field_name in event_section:
field_data = event_section[field_name]

if len(json.dumps(field_data).encode()) > field_size_limit:
# Split if field_data is a list
if isinstance(field_data, list):
queue_list = split_big_request(field_data)
for count, item in enumerate(queue_list, 1):
if count > max_part:
break
event_section[f"{field_name}Part{count}"] = item
del event_section[field_name]

# Split if field_data is a dictionary
elif isinstance(field_data, dict):
queue_list = list(field_data.keys())
for count, key in enumerate(queue_list, 1):
if count > max_part:
break
event_section[f"{field_name}Part{key}"] = field_data[key]
del event_section[field_name]
else:
pass

else:
# If within size limit, just serialize it
event_section[field_name] = json.dumps(field_data)


def process_events(client: oci.streaming.StreamClient, stream_id, initial_cursor, limit, sentinel: AzureSentinelConnector, start_ts):
cursor = initial_cursor
Expand All @@ -128,21 +173,26 @@ def process_events(client: oci.streaming.StreamClient, stream_id, initial_cursor
event = json.loads(event)
if "data" in event:
if "request" in event["data"] and event["type"] != "com.oraclecloud.loadbalancer.access":
if event["data"]["request"] is not None and "headers" in event["data"]["request"]:
event["data"]["request"]["headers"] = json.dumps(event["data"]["request"]["headers"])
if event["data"]["request"] is not None and "parameters" in event["data"]["request"]:
event["data"]["request"]["parameters"] = json.dumps(
event["data"]["request"]["parameters"])
if "response" in event["data"]:
if event["data"]["response"] is not None and "headers" in event["data"]["response"]:
event["data"]["response"]["headers"] = json.dumps(event["data"]["response"]["headers"])
if event["data"]["request"] is not None:
# Process "headers" and "parameters" in "request"
if "headers" in event["data"]["request"]:
process_large_field(event["data"]["request"], "headers", FIELD_SIZE_LIMIT_BYTES)
if "parameters" in event["data"]["request"]:
process_large_field(event["data"]["request"], "parameters", FIELD_SIZE_LIMIT_BYTES)

if "response" in event["data"] and event["data"]["response"] is not None:
# Process "headers" in "response"
if "headers" in event["data"]["response"]:
process_large_field(event["data"]["response"], "headers", FIELD_SIZE_LIMIT_BYTES)

if "additionalDetails" in event["data"]:
event["data"]["additionalDetails"] = json.dumps(event["data"]["additionalDetails"])
if "stateChange" in event["data"]:
logging.info("In data.stateChange : {}".format(event["data"]["stateChange"]))
if event["data"]["stateChange"] is not None and "current" in event["data"]["stateChange"] :
event["data"]["stateChange"]["current"] = json.dumps(
event["data"]["stateChange"]["current"])
process_large_field(event["data"], "additionalDetails", FIELD_SIZE_LIMIT_BYTES)

if "stateChange" in event["data"] and event["data"]["stateChange"] is not None:
# Process "current" in "stateChange"
if "current" in event["data"]["stateChange"]:
process_large_field(event["data"]["stateChange"], "current", FIELD_SIZE_LIMIT_BYTES)

sentinel.send(event)

sentinel.flush()
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
event_types = ["maillog","message"]
logAnalyticsUri = os.environ.get('logAnalyticsUri')

FIELD_SIZE_LIMIT_BYTES = 1000 * 32


if ((logAnalyticsUri in (None, '') or str(logAnalyticsUri).isspace())):
logAnalyticsUri = 'https://' + customer_id + '.ods.opinsights.azure.com'

Expand All @@ -32,6 +35,19 @@
if(not match):
raise Exception("ProofpointPOD: Invalid Log Analytics Uri.")

def check_size(queue):
data_bytes_len = len(json.dumps(queue).encode())
return data_bytes_len < FIELD_SIZE_LIMIT_BYTES


def split_big_request(queue):
if check_size(queue):
return [queue]
else:
middle = int(len(queue) / 2)
queues_list = [queue[:middle], queue[middle:]]
return split_big_request(queues_list[0]) + split_big_request(queues_list[1])

def main(mytimer: func.TimerRequest) -> None:
if mytimer.past_due:
logging.info('The timer is past due!')
Expand Down Expand Up @@ -107,6 +123,26 @@ def gen_chunks(self,data,event_type):
for row in chunk:
if row != None and row != '':
y = json.loads(row)
if ('msgParts' in y) and (len(json.dumps(y['msgParts']).encode()) > FIELD_SIZE_LIMIT_BYTES):
if isinstance(y['msgParts'],list):
queue_list = split_big_request(y['msgParts'])
count = 1
for q in queue_list:
columnname = 'msgParts' + str(count)
y[columnname] = q
count+=1
del y['msgParts']

elif isinstance(y['msgParts'],dict):
queue_list = list(y['msgParts'].keys())
for count, key in enumerate(queue_list, 1):
if count > 10:
break
y[f"msgParts{key}"] = y['msgParts'][key]

del y['msgParts']
else:
pass
y.update({'event_type': event_type})
obj_array.append(y)

Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
url = os.environ['SalesforceTokenUri']
logAnalyticsUri = os.environ.get('logAnalyticsUri')

FIELD_SIZE_LIMIT_BYTES = 1000 * 32

if ((logAnalyticsUri in (None, '') or str(logAnalyticsUri).isspace())):
logAnalyticsUri = 'https://' + customer_id + '.ods.opinsights.azure.com'

Expand Down Expand Up @@ -63,6 +65,18 @@ def generate_date():
past_time = current_time - datetime.timedelta(days=days_interval, hours=1)
return past_time.strftime("%Y-%m-%dT%H:%M:%SZ")

def check_size(queue):
data_bytes_len = len(json.dumps(queue).encode())
return data_bytes_len < FIELD_SIZE_LIMIT_BYTES


def split_big_request(queue):
if check_size(queue):
return [queue]
else:
middle = int(len(queue) / 2)
queues_list = [queue[:middle], queue[middle:]]
return split_big_request(queues_list[0]) + split_big_request(queues_list[1])

def pull_log_files():
past_time = generate_date()
Expand Down Expand Up @@ -136,6 +150,30 @@ def gen_chunks(file_in_tmp_path):
obj_array = []
for row in chunk:
row = enrich_event_with_user_email(row)
if 'action' in row:
if ('message' in row['action']) and (len(json.dumps(row['action']['message']).encode()) > FIELD_SIZE_LIMIT_BYTES):
if isinstance(row['action']['message'],list):
queue_list = split_big_request(row['action']['message'])
count = 1
for q in queue_list:
columnname = 'messagePart' + str(count)
row['action'][columnname] = q
count+=1
del row['action']['message']

elif isinstance(row['action']['message'],dict):
queue_list = list(row['action']['message'].keys())
for count, key in enumerate(queue_list, 1):
if count > 10:
break
row['action'][f"messagePart{key}"] = row['action']['message'][key]
del row['action']['message']

else:
pass



obj_array.append(row)
body = json.dumps(obj_array)
post_data(customer_id, shared_key, body, log_type, len(obj_array))
Expand Down

0 comments on commit dd34c18

Please sign in to comment.