Skip to content

Commit

Permalink
Merge pull request #6 from code42/fix-test-command
Browse files Browse the repository at this point in the history
  • Loading branch information
Juliya Smith authored Jun 26, 2020
2 parents 3c67ed7 + ff8860e commit 7b67339
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 47 deletions.
60 changes: 37 additions & 23 deletions Packs/Code42/Integrations/Code42/Code42.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,89 +162,98 @@ class Code42Client(BaseClient):

def __init__(self, sdk, base_url, auth, verify=True, proxy=False):
super().__init__(base_url, verify=verify, proxy=proxy)
# Create the Code42 SDK instance
self._sdk = sdk or py42.sdk.from_local_account(base_url, auth[0], auth[1])
# Allow sdk parameter for unit testing.
# Otherwise, lazily load the SDK so that the TEST Command can effectively check auth.
self._sdk = sdk
self._sdk_factory = lambda: py42.sdk.from_local_account(base_url, auth[0], auth[1]) if not self._sdk else None
py42.settings.set_user_agent_suffix("Cortex XSOAR")

def _get_sdk(self):
if self._sdk is None:
self._sdk = self._sdk_factory()
return self._sdk

def add_user_to_departing_employee(self, username, departure_date=None, note=None):
user_id = self.get_user_id(username)
self._sdk.detectionlists.departing_employee.add(user_id, departure_date=departure_date)
self._get_sdk().detectionlists.departing_employee.add(
user_id, departure_date=departure_date
)
if note:
self._sdk.detectionlists.update_user_notes(user_id, note)
self._get_sdk().detectionlists.update_user_notes(user_id, note)
return user_id

def remove_user_from_departing_employee(self, username):
user_id = self.get_user_id(username)
self._sdk.detectionlists.departing_employee.remove(user_id)
self._get_sdk().detectionlists.departing_employee.remove(user_id)
return user_id

def get_all_departing_employees(self):
res = []
pages = self._sdk.detectionlists.departing_employee.get_all()
pages = self._get_sdk().detectionlists.departing_employee.get_all()
for page in pages:
employees = page["items"]
res.extend(employees)
return res

def add_user_to_high_risk_employee(self, username, note=None):
user_id = self.get_user_id(username)
self._sdk.detectionlists.high_risk_employee.add(user_id)
self._get_sdk().detectionlists.high_risk_employee.add(user_id)
if note:
self._sdk.detectionlists.update_user_notes(user_id, note)
self._get_sdk().detectionlists.update_user_notes(user_id, note)
return user_id

def remove_user_from_high_risk_employee(self, username):
user_id = self.get_user_id(username)
self._sdk.detectionlists.high_risk_employee.remove(user_id)
self._get_sdk().detectionlists.high_risk_employee.remove(user_id)
return user_id

def add_user_risk_tags(self, username, risk_tags):
risk_tags = _try_convert_str_list_to_list(risk_tags)
user_id = self.get_user_id(username)
self._sdk.detectionlists.add_user_risk_tags(user_id, risk_tags)
self._get_sdk().detectionlists.add_user_risk_tags(user_id, risk_tags)
return user_id

def remove_user_risk_tags(self, username, risk_tags):
risk_tags = _try_convert_str_list_to_list(risk_tags)
user_id = self.get_user_id(username)
self._sdk.detectionlists.remove_user_risk_tags(user_id, risk_tags)
self._get_sdk().detectionlists.remove_user_risk_tags(user_id, risk_tags)
return user_id

def get_all_high_risk_employees(self, risk_tags=None):
risk_tags = _try_convert_str_list_to_list(risk_tags)
res = []
pages = self._sdk.detectionlists.high_risk_employee.get_all()
pages = self._get_sdk().detectionlists.high_risk_employee.get_all()
for page in pages:
res.extend(_get_all_high_risk_employees_from_page(page, risk_tags))
return res

def fetch_alerts(self, start_time, event_severity_filter):
query = _create_alert_query(event_severity_filter, start_time)
res = self._sdk.alerts.search(query)
res = self._get_sdk().alerts.search(query)
return res["alerts"]

def get_alert_details(self, alert_id):
res = self._sdk.alerts.get_details(alert_id)["alerts"]
res = self._get_sdk().alerts.get_details(alert_id)["alerts"]
if not res:
raise Exception("No alert found with ID {0}.".format(alert_id))
return res[0]

def resolve_alert(self, id):
self._sdk.alerts.resolve(id)
self._get_sdk().alerts.resolve(id)
return id

def get_current_user(self):
res = self._sdk.users.get_current()
res = self._get_sdk().users.get_current()
return res

def get_user_id(self, username):
res = self._sdk.users.get_by_username(username)["users"]
res = self._get_sdk().users.get_by_username(username)["users"]
if not res:
raise Exception("No user found with username {0}.".format(username))
return res[0]["userUid"]

def search_file_events(self, payload):
res = self._sdk.securitydata.search_file_events(payload)
res = self._get_sdk().securitydata.search_file_events(payload)
return res["fileEvents"]


Expand Down Expand Up @@ -357,7 +366,7 @@ class ObservationToSecurityQueryMapper(object):
exposure_type_map = {
"PublicSearchableShare": ExposureType.IS_PUBLIC,
"PublicLinkShare": ExposureType.SHARED_VIA_LINK,
"SharedOutsideTrustedDomain": "OutsideTrustedDomains"
"SharedOutsideTrustedDomain": "OutsideTrustedDomains",
}

def __init__(self, observation, actor):
Expand Down Expand Up @@ -811,11 +820,16 @@ def securitydata_search_command(client, args):


def test_module(client):
if client.get_current_user():
try:
# Will fail if unauthorized
client.get_current_user()
return "ok"
return "Invalid credentials or host address. Check that the username and password are correct, \
that the host is available and reachable, and that you have supplied the full scheme, \
domain, and port (e.g. https://myhost.code42.com:4285)"
except Exception:
return (
"Invalid credentials or host address. Check that the username and password are correct, that the host "
"is available and reachable, and that you have supplied the full scheme, domain, and port "
"(e.g. https://myhost.code42.com:4285)."
)


def main():
Expand Down
12 changes: 12 additions & 0 deletions Packs/Code42/Integrations/Code42/Code42_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,18 @@ def get_empty_detectionlist_response(mocker, base_text):
"""TESTS"""


def test_client_lazily_inits_sdk(mocker):
mocker.patch("py42.sdk.from_local_account")

# test that sdk does not init during ctor
client = Code42Client(sdk=None, base_url=MOCK_URL, auth=MOCK_AUTH, verify=False, proxy=None)
assert client._sdk is None

# test that sdk init from first method call
client.get_user_id("Test")
assert client._sdk is not None


def test_client_when_no_alert_found_raises_exception(code42_sdk_mock):
code42_sdk_mock.alerts.get_details.return_value = (
"""{'type$': 'ALERT_DETAILS_RESPONSE', 'alerts': []}"""
Expand Down
62 changes: 38 additions & 24 deletions Packs/Code42/Integrations/Code42/integration-Code42.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ configuration:
defaultvalue: console.us.code42.com
type: 0
required: true
- display: ""
- display: "Username"
name: credentials
defaultvalue: ""
type: 9
Expand Down Expand Up @@ -227,88 +227,97 @@ script:
def __init__(self, sdk, base_url, auth, verify=True, proxy=False):
super().__init__(base_url, verify=verify, proxy=proxy)
# Create the Code42 SDK instance
self._sdk = sdk or py42.sdk.from_local_account(base_url, auth[0], auth[1])
# Allow sdk parameter for unit testing.
# Otherwise, lazily load the SDK so that the TEST Command can effectively check auth.
self._sdk = sdk
self._sdk_factory = lambda: py42.sdk.from_local_account(base_url, auth[0], auth[1]) if not self._sdk else None
py42.settings.set_user_agent_suffix("Cortex XSOAR")
def _get_sdk(self):
if self._sdk is None:
self._sdk = self._sdk_factory()
return self._sdk
def add_user_to_departing_employee(self, username, departure_date=None, note=None):
user_id = self.get_user_id(username)
self._sdk.detectionlists.departing_employee.add(user_id, departure_date=departure_date)
self._get_sdk().detectionlists.departing_employee.add(
user_id, departure_date=departure_date
)
if note:
self._sdk.detectionlists.update_user_notes(user_id, note)
self._get_sdk().detectionlists.update_user_notes(user_id, note)
return user_id
def remove_user_from_departing_employee(self, username):
user_id = self.get_user_id(username)
self._sdk.detectionlists.departing_employee.remove(user_id)
self._get_sdk().detectionlists.departing_employee.remove(user_id)
return user_id
def get_all_departing_employees(self):
res = []
pages = self._sdk.detectionlists.departing_employee.get_all()
pages = self._get_sdk().detectionlists.departing_employee.get_all()
for page in pages:
employees = page["items"]
res.extend(employees)
return res
def add_user_to_high_risk_employee(self, username, note=None):
user_id = self.get_user_id(username)
self._sdk.detectionlists.high_risk_employee.add(user_id)
self._get_sdk().detectionlists.high_risk_employee.add(user_id)
if note:
self._sdk.detectionlists.update_user_notes(user_id, note)
self._get_sdk().detectionlists.update_user_notes(user_id, note)
return user_id
def remove_user_from_high_risk_employee(self, username):
user_id = self.get_user_id(username)
self._sdk.detectionlists.high_risk_employee.remove(user_id)
self._get_sdk().detectionlists.high_risk_employee.remove(user_id)
return user_id
def add_user_risk_tags(self, username, risk_tags):
user_id = self.get_user_id(username)
self._sdk.detectionlists.add_user_risk_tags(user_id, risk_tags)
self._get_sdk().detectionlists.add_user_risk_tags(user_id, risk_tags)
return user_id
def remove_user_risk_tags(self, username, risk_tags):
user_id = self.get_user_id(username)
self._sdk.detectionlists.remove_user_risk_tags(user_id, risk_tags)
self._get_sdk().detectionlists.remove_user_risk_tags(user_id, risk_tags)
return user_id
def get_all_high_risk_employees(self, risk_tags=None):
if isinstance(risk_tags, str):
risk_tags = [risk_tags]
res = []
pages = self._sdk.detectionlists.high_risk_employee.get_all()
pages = self._get_sdk().detectionlists.high_risk_employee.get_all()
for page in pages:
res.extend(_get_all_high_risk_employees_from_page(page, risk_tags))
return res
def fetch_alerts(self, start_time, event_severity_filter):
query = _create_alert_query(event_severity_filter, start_time)
res = self._sdk.alerts.search(query)
res = self._get_sdk().alerts.search(query)
return res["alerts"]
def get_alert_details(self, alert_id):
res = self._sdk.alerts.get_details(alert_id)["alerts"]
res = self._get_sdk().alerts.get_details(alert_id)["alerts"]
if not res:
raise Exception("No alert found with ID {0}.".format(alert_id))
return res[0]
def resolve_alert(self, id):
self._sdk.alerts.resolve(id)
self._get_sdk().alerts.resolve(id)
return id
def get_current_user(self):
res = self._sdk.users.get_current()
res = self._get_sdk().users.get_current()
return res
def get_user_id(self, username):
res = self._sdk.users.get_by_username(username)["users"]
res = self._get_sdk().users.get_by_username(username)["users"]
if not res:
raise Exception("No user found with username {0}.".format(username))
return res[0]["userUid"]
def search_file_events(self, payload):
res = self._sdk.securitydata.search_file_events(payload)
res = self._get_sdk().securitydata.search_file_events(payload)
return res["fileEvents"]
Expand Down Expand Up @@ -422,7 +431,7 @@ script:
exposure_type_map = {
"PublicSearchableShare": ExposureType.IS_PUBLIC,
"PublicLinkShare": ExposureType.SHARED_VIA_LINK,
"SharedOutsideTrustedDomain": "OutsideTrustedDomains"
"SharedOutsideTrustedDomain": "OutsideTrustedDomains",
}
def __init__(self, observation, actor):
Expand Down Expand Up @@ -892,11 +901,16 @@ script:
def test_module(client):
if client.get_current_user():
try:
# Will fail if unauthorized
client.get_current_user()
return "ok"
return "Invalid credentials or host address. Check that the username and password are correct, \
that the host is available and reachable, and that you have supplied the full scheme, \
domain, and port (e.g. https://myhost.code42.com:4285)"
except Exception:
return (
"Invalid credentials or host address. Check that the username and password are correct, that the host "
"is available and reachable, and that you have supplied the full scheme, domain, and port "
"(e.g. https://myhost.code42.com:4285)."
)
def main():
Expand Down

0 comments on commit 7b67339

Please sign in to comment.