Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Crowdstrike Connector to handle Json decode validation #11591

Merged
merged 5 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -44,39 +44,47 @@
drop_files_array = []
failed_files_array = []


def _create_sqs_client():
sqs_session = get_session()
return sqs_session.create_client(
'sqs',
region_name=AWS_REGION_NAME,
aws_access_key_id=AWS_KEY,
aws_secret_access_key=AWS_SECRET
)
'sqs',
region_name=AWS_REGION_NAME,
aws_access_key_id=AWS_KEY,
aws_secret_access_key=AWS_SECRET
)


def _create_s3_client():
s3_session = get_session()
boto_config = BotoCoreConfig(region_name=AWS_REGION_NAME, retries = {'max_attempts': 10, 'mode': 'standard'})
boto_config = BotoCoreConfig(region_name=AWS_REGION_NAME, retries={'max_attempts': 10, 'mode': 'standard'})
return s3_session.create_client(
's3',
region_name=AWS_REGION_NAME,
aws_access_key_id=AWS_KEY,
aws_secret_access_key=AWS_SECRET,
config=boto_config
)
's3',
region_name=AWS_REGION_NAME,
aws_access_key_id=AWS_KEY,
aws_secret_access_key=AWS_SECRET,
config=boto_config
)


def customize_event(line):
element = json.loads(line)
try:
element = json.loads(line) # Attempt to parse the line as JSON
except json.JSONDecodeError as e:
# Log the error and skip this line
logging.error(f"JSON decoding error for line: {line}. Error: {str(e)}")
return None # Return None so that this line will be ignored during further processing
required_fileds = [
"timestamp", "aip", "aid", "EventType", "LogonType", "HostProcessType", "UserPrincipal", "DomainName",
"RemoteAddressIP", "ConnectionDirection", "TargetFileName", "LocalAddressIP4", "IsOnRemovableDisk",
"UserPrincipal", "UserIsAdmin", "LogonTime", "LogonDomain", "RemoteAccount", "UserId", "Prevalence",
"CurrentProcess", "ConnectionDirection", "event_simpleName", "TargetProcessId", "ProcessStartTime",
"UserName", "DeviceProductId", "TargetSHA256HashData", "SHA256HashData", "MD5HashData", "TargetDirectoryName",
"TargetFileName", "FirewallRule", "TaskName", "TaskExecCommand", "TargetAddress", "TargetProcessId",
"SourceFileName", "RegObjectName", "RegValueName", "ServiceObjectName", "RegistryPath", "RawProcessId",
"event_platform", "CommandLine", "ParentProcessId", "ParentCommandLine", "ParentBaseFileName",
"GrandParentBaseFileName", "RemotePort", "VolumeDeviceType", "VolumeName", "ClientComputerName", "ProductId", "ComputerName"
]
"timestamp", "aip", "aid", "EventType", "LogonType", "HostProcessType", "UserPrincipal", "DomainName",
"RemoteAddressIP", "ConnectionDirection", "TargetFileName", "LocalAddressIP4", "IsOnRemovableDisk",
"UserPrincipal", "UserIsAdmin", "LogonTime", "LogonDomain", "RemoteAccount", "UserId", "Prevalence",
"CurrentProcess", "ConnectionDirection", "event_simpleName", "TargetProcessId", "ProcessStartTime",
"UserName", "DeviceProductId", "TargetSHA256HashData", "SHA256HashData", "MD5HashData", "TargetDirectoryName",
"TargetFileName", "FirewallRule", "TaskName", "TaskExecCommand", "TargetAddress", "TargetProcessId",
"SourceFileName", "RegObjectName", "RegValueName", "ServiceObjectName", "RegistryPath", "RawProcessId",
"event_platform", "CommandLine", "ParentProcessId", "ParentCommandLine", "ParentBaseFileName",
"GrandParentBaseFileName", "RemotePort", "VolumeDeviceType", "VolumeName", "ClientComputerName", "ProductId", "ComputerName"
]
required_fields_data = {}
custom_fields_data = {}
for key, value in element.items():
Expand All @@ -90,6 +98,7 @@ def customize_event(line):
event["custom_fields_message"] = custom_fields_data_text
return event


def sort_files_by_bucket(array_obj):
array_obj = sorted(array_obj, key=itemgetter('bucket'))
sorted_array = []
Expand All @@ -100,6 +109,7 @@ def sort_files_by_bucket(array_obj):
sorted_array.append({'bucket': key, 'files': temp_array})
return sorted_array


async def main(mytimer: func.TimerRequest):
global drop_files_array, failed_files_array
drop_files_array.clear()
Expand Down Expand Up @@ -131,7 +141,8 @@ async def main(mytimer: func.TimerRequest):
if 'Messages' in response:
for msg in response['Messages']:
body_obj = json.loads(msg["Body"])
logging.info("Got message with MessageId {}. Start processing {} files from Bucket: {}. Path prefix: {}. Timestamp: {}.".format(msg["MessageId"], body_obj["fileCount"], body_obj["bucket"], body_obj["pathPrefix"], body_obj["timestamp"]))
logging.info("Got message with MessageId {}. Start processing {} files from Bucket: {}. Path prefix: {}. Timestamp: {}.".format(
msg["MessageId"], body_obj["fileCount"], body_obj["bucket"], body_obj["pathPrefix"], body_obj["timestamp"]))
await download_message_files(body_obj, session, retrycount=0)
logging.info("Finished processing {} files from MessageId {}. Bucket: {}. Path prefix: {}".format(body_obj["fileCount"], msg["MessageId"], body_obj["bucket"], body_obj["pathPrefix"]))
try:
Expand All @@ -151,19 +162,20 @@ async def main(mytimer: func.TimerRequest):

if len(failed_files_array) > 0:
logging.info("list of files that were not processed after defined no. of retries: {}".format(failed_files_array))



async def process_file(bucket, s3_path, client, semaphore, session, retrycount):
async with semaphore:
total_events = 0
logging.info("Start processing file {}".format(s3_path))
sentinel = AzureSentinelConnectorAsync(
session,
LOG_ANALYTICS_URI,
WORKSPACE_ID,
SHARED_KEY,
LOG_TYPE,
queue_size=MAX_BUCKET_SIZE
)
session,
LOG_ANALYTICS_URI,
WORKSPACE_ID,
SHARED_KEY,
LOG_TYPE,
queue_size=MAX_BUCKET_SIZE
)
try:
response = await client.get_object(Bucket=bucket, Key=s3_path)
s = ''
Expand All @@ -173,36 +185,34 @@ async def process_file(bucket, s3_path, client, semaphore, session, retrycount):
for n, line in enumerate(lines):
if n < len(lines) - 1:
if line:
try:
event = customize_event(line)
except ValueError as e:
logging.error('Error while loading json Event at s value {}. Error: {}'.format(line, str(e)))
raise e
event = customize_event(line)
if event is None: # Skip malformed lines
continue
await sentinel.send(event)
s = line
if s:
try:
event = customize_event(line)
except ValueError as e:
logging.error('Error while loading json Event at s value {}. Error: {}'.format(line, str(e)))
raise e
event = customize_event(line)
if event is None: # Skip malformed lines
return
await sentinel.send(event)
await sentinel.flush()
total_events += sentinel.successfull_sent_events_number
logging.info("Finish processing file {}. Sent events: {}".format(s3_path, sentinel.successfull_sent_events_number))
logging.info("Finish processing file {}. Sent events: {}".format(
s3_path, sentinel.successfull_sent_events_number))
except Exception as e:
if(retrycount<=0):
logging.warn("Processing file {} was failed. Error: {}".format(s3_path,e))
if (retrycount <= 0):
logging.warn("Processing file {} was failed. Error: {}".format(s3_path, e))
drop_files_array.append({'bucket': bucket, 'path': s3_path})
else:
logging.warn("Processing file {} was failed after defined no. of retries. Error: {}".format(s3_path,e))
logging.warn("Processing file {} was failed after defined no. of retries. Error: {}".format(s3_path, e))
failed_files_array.append({'bucket': bucket, 'path': s3_path})


async def download_message_files(msg, session, retrycount):
semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSING_FILES)
async with _create_s3_client() as client:
cors = []
for s3_file in msg['files']:
cors.append(process_file(msg['bucket'], s3_file['path'], client, semaphore, session, retrycount))
cors.append(process_file(
msg['bucket'], s3_file['path'], client, semaphore, session, retrycount))
await asyncio.gather(*cors)
Loading