Skip to content

Commit

Permalink
feat: add support for TeamUsers and UserGroups APIs
Browse files Browse the repository at this point in the history
Enable managing users with the new, experimental, TeamUsers and
UserGroups API
  • Loading branch information
ipcrm committed Feb 27, 2023
1 parent 9b0022c commit 274410b
Show file tree
Hide file tree
Showing 7 changed files with 329 additions and 0 deletions.
79 changes: 79 additions & 0 deletions examples/example_team_users.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
"""
Example script showing how to use the LaceworkClient class.
"""

import logging

from dotenv import load_dotenv
from laceworksdk import LaceworkClient

logging.basicConfig(level=logging.DEBUG)

load_dotenv()


def standard_user_example(client: LaceworkClient):
"""
Example of create/update/delete and group management for standard user
"""

# Create user
data = client.team_users.create("test user", "[email protected]", "test-company")
guid = data["data"]["userGuid"]
logging.debug(f'user guid created:\n{guid}')

# Get one user
client.team_users.get(guid)

# Update user
client.team_users.update(guid, user_enabled=0)

# Add user to group
client.user_groups.add_users("LACEWORK_USER_GROUP_POWER_USER", [guid])

# Remove user from group
client.user_groups.remove_users("LACEWORK_USER_GROUP_POWER_USER", [guid])

# Delete user
client.team_users.delete(guid)

def service_user_example(client: LaceworkClient):
"""
Example of create/update/delete and group management for service user
"""

# Create user
data = client.team_users.create("test service user", description="test service user", type="ServiceUser")
guid = data["data"]["userGuid"]
logging.debug(f'user guid created:\n{guid}')

# Get one user
client.team_users.get(guid)

# Update user
client.team_users.update(guid, user_enabled=0)

# Add user to group
client.user_groups.add_users("LACEWORK_USER_GROUP_POWER_USER", [guid])

# Remove user from group
client.user_groups.remove_users("LACEWORK_USER_GROUP_POWER_USER", [guid])

# Delete user
client.team_users.delete(guid)



if __name__ == "__main__":
# Instantiate a LaceworkClient instance
lacework_client = LaceworkClient()

# TeamUsers API

# Get users
lacework_client.team_users.get()

standard_user_example(lacework_client)
service_user_example(lacework_client)

4 changes: 4 additions & 0 deletions laceworksdk/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
from .v2.resource_groups import ResourceGroupsAPI
from .v2.schemas import SchemasAPI
from .v2.team_members import TeamMembersAPI
from .v2.team_users import TeamUsersAPI
from .v2.user_groups import UserGroupsAPI
from .v2.user_profile import UserProfileAPI
from .v2.vulnerabilities import VulnerabilitiesAPI
from .v2.vulnerability_exceptions import VulnerabilityExceptionsAPI
Expand Down Expand Up @@ -178,7 +180,9 @@ def __init__(self,
self.schemas = SchemasAPI(self._session)
self.suppressions = SuppressionsAPI(self._session)
self.team_members = TeamMembersAPI(self._session)
self.team_users = TeamUsersAPI(self._session)
self.tokens = TokenAPI(self._session)
self.user_groups = UserGroupsAPI(self._session)
self.user_profile = UserProfileAPI(self._session)
self.vulnerabilities = VulnerabilitiesAPI(self._session)
self.vulnerability_exceptions = VulnerabilityExceptionsAPI(self._session)
Expand Down
105 changes: 105 additions & 0 deletions laceworksdk/api/v2/team_users.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
"""
Lacework TeamUsers API wrapper.
"""

from laceworksdk.api.crud_endpoint import CrudEndpoint
import logging
logger = logging.getLogger(__name__)

class TeamUsersAPI(CrudEndpoint):
def __init__(self, session):
super().__init__(session, "TeamUsers")

def get(self, guid=None):
"""
A method to get TeamUsers objects.
:param guid: A string representing the object GUID.
:return response json
"""

return super().get(id=guid)

def get_by_guid(self, guid):
"""
A method to get a TeamUsers object by GUID.
:param guid: A string representing the object GUID.
:return response json
"""

return self.get(guid=guid)

def create(self,
name,
email=None,
company=None,
description=None,
type="StandardUser",
**request_params):
"""
A method to create a new TeamUsers standard user object.
:param name: A string representing the friendly name of the user.
:param email: A string representing the email address of the user (valid only for StandardUser).
:param company: A string representing the company of the user (valid only for StandardUser).
:param description: A description text for describing service accounts (valid only for ServiceUser).
:param type: A string representing the type of the user to create.
(StandardUser or ServiceUser)
:param request_params: Additional request parameters.
(provides support for parameters that may be added in the future)
:return response json
"""

return super().create(
name=name,
email=email,
description=description,
company=company,
type=type,
**request_params
)

def update(self,
guid,
name=None,
user_enabled=None,
description=None,
**request_params):
"""
A method to update a TeamUsers object.
:param guid: A string representing the object GUID.
:param name: A string representing the friendly name of the object.
:param userEnabled: A boolean/integer representing whether the object is enabled.
(0 or 1)
:param description: A description text for describing service accounts (only valid for service accounts).
:return response json
"""

if user_enabled is not None:
user_enabled = int(bool(user_enabled))

return super().update(
id=guid,
name=name,
user_enabled=user_enabled,
description=description,
**request_params
)

def delete(self, guid):
"""
A method to delete a TeamUsers object.
:param guid: A string representing the object GUID.
:return response json
"""

return super().delete(id=guid)
41 changes: 41 additions & 0 deletions laceworksdk/api/v2/user_groups.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
"""
Lacework UserGroups API wrapper.
"""

from laceworksdk.api.base_endpoint import BaseEndpoint

class UserGroupsAPI(BaseEndpoint):
def __init__(self, session):
super().__init__(session, "UserGroups")

def __modify_members(self, guid, user_guids, action):
json = self.build_dict_from_items(
user_guids=user_guids,
)

response = self._session.post(self.build_url(resource=guid, action=action), json=json, params=None)

return response.json()

def add_users(self, guid, user_guids):
"""
A method to add users to existing UserGroup object.
:param guid: A string representing the GUID of the UserGroup to modify.
:param user_guids: An array of user guids to add to the UserGroup object.
:return response json
"""
return self.__modify_members(guid, user_guids, "addUsers")

def remove_users(self, guid, user_guids):
"""
A method to remove users from an existing UserGroup object.
:param guid: A string representing the GUID of the UserGroup object to modify.
:param user_guids: An array of user guids to add to the UserGroup object.
:return response json
"""
return self.__modify_members(guid, user_guids, "removeUsers")
6 changes: 6 additions & 0 deletions laceworksdk/http_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ def _request(self, method, uri, **kwargs):
if "/api/v1/" in uri:
logger.warning("Lacework's v1 APIs are scheduled to be deprecated and will not allow usage of after December 2022.")

if "/api/v2/TeamUsers" in uri:
logger.warning("TeamUsers APIs is currently experimental and subject to change")

if "/api/v2/UserGroups" in uri:
logger.warning("UserGroups API is currently experimental and subject to change")

# Check for 'org' - if True, make an organization-level API call
# TODO: Remove this on v1.0 release - this is done for back compat
org = kwargs.pop("org", None)
Expand Down
45 changes: 45 additions & 0 deletions tests/api/v2/test_team_users.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
"""
Test suite for the community-developed Python SDK for interacting with Lacework APIs.
"""

import pytest

from laceworksdk.api.v2.team_users import TeamUsersAPI
from tests.api.test_crud_endpoint import CrudEndpoint


# Tests

@pytest.fixture(scope="module")
def api_object(api):
return api.team_users


@pytest.fixture(scope="module")
def api_object_create_body(random_text):
return {
"name": "John Doe",
"email": f"{random_text.lower()}@lacework.net",
"company": "Lacework",
}


@pytest.fixture(scope="module")
def api_object_update_body():
return {
"user_enabled": 0
}


class TestTeamUsers(CrudEndpoint):

OBJECT_ID_NAME = "userGuid"
OBJECT_TYPE = TeamUsersAPI

def test_api_search(self, api_object, request):
"Not implemented"
pass

def test_api_get_by_guid(self, api_object):
self._get_object_classifier_test(api_object, "guid", self.OBJECT_ID_NAME)
49 changes: 49 additions & 0 deletions tests/api/v2/test_user_groups.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@

# -*- coding: utf-8 -*-
"""
Test suite for the community-developed Python SDK for interacting with Lacework APIs.
"""

import pytest

from laceworksdk.api.v2.user_groups import UserGroupsAPI
from laceworksdk.exceptions import ApiError
from tests.api.test_base_endpoint import BaseEndpoint
from laceworksdk import LaceworkClient


# Tests

@pytest.fixture(scope="module")
def api_object(api):
return api.user_groups

@pytest.fixture(scope="module")
def test_user(api, random_text):
res = api.team_users.create(f"test{random_text}", f"noreply+{random_text}@lacework.com", "test")
guid = res["data"]["userGuid"]
yield guid
api.team_users.delete(guid)


class TestUserProfile(BaseEndpoint):

OBJECT_TYPE = UserGroupsAPI

def test_add_user(self, api_object, test_user):
response = api_object.add_users("LACEWORK_USER_GROUP_POWER_USER", [test_user])
assert "data" in response.keys()

def test_remove_user(self, api_object, test_user):
response = api_object.remove_users("LACEWORK_USER_GROUP_POWER_USER", [test_user])
assert "data" in response.keys()

def test_add_user_should_fail_with_invalid_data(self, api_object):
with pytest.raises(ApiError) as e:
api_object.add_users("LACEWORK_USER_GROUP_POWER_USER", ["fake"])
assert "400" in str(e.value)

def test_remove_user_should_fail_with_invalid_data(self, api_object):
with pytest.raises(ApiError) as e:
api_object.remove_users("LACEWORK_USER_GROUP_POWER_USER", ["fake"])
assert "500" in str(e.value)

0 comments on commit 274410b

Please sign in to comment.