Skip to content

Commit

Permalink
Fix keycloak client
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-pimenta-DME committed Dec 7, 2023
1 parent 87dc0a4 commit 98d1a96
Showing 1 changed file with 78 additions and 17 deletions.
95 changes: 78 additions & 17 deletions identityutils/keycloak_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ def __init__(self, server_url, realm, username, password):
verify=self.server_url.startswith('https'),
timeout=10)
self.keycloak_admin = KeycloakAdmin(connection=openid_connection)
self.keycloak_uma = KeycloakUMA(connection=openid_connection)
self.set_realm(realm)
admin_client_id = self.keycloak_admin.get_client_id('admin-cli')
admin_client = self.keycloak_admin.get_client(admin_client_id)
Expand All @@ -31,13 +30,11 @@ def __init__(self, server_url, realm, username, password):
verify=self.server_url.startswith('https'),
timeout=10)
self.keycloak_admin = KeycloakAdmin(connection=openid_connection)
self.keycloak_uma = KeycloakUMA(connection=openid_connection)

def set_realm(self, realm, skip_exists=True):
if realm != 'master':
self.keycloak_admin.create_realm(payload={"realm": self.realm, "enabled": True}, skip_exists=skip_exists)
self.keycloak_admin.realm_name = self.realm
self.keycloak_uma.realm_name = self.realm

def import_realm(self, realm: dict) -> dict:
return self.keycloak_admin.import_realm(realm)
Expand Down Expand Up @@ -241,7 +238,10 @@ def get_resources(self, client_id):
_client_id = self.keycloak_admin.get_client_id(client_id)
return self.keycloak_admin.get_client_authz_resources(_client_id)

def get_resource_id(self, name: str = "",
def get_resource_id(self,
client_id,
client_secret,
name: str = "",
exact_name: bool = False,
uri: str = "",
owner: str = "",
Expand All @@ -251,11 +251,22 @@ def get_resource_id(self, name: str = "",
maximum: int = -1, ) -> [str]:
"""Gets a resource id from attributes
"""
return self.keycloak_uma.resource_set_list_ids(name=name, exact_name=exact_name, uri=uri, owner=owner,
openid_connection = KeycloakOpenIDConnection(
server_url=self.server_url,
client_id=client_id,
client_secret_key=client_secret,
realm_name=self.keycloak_admin.realm_name,
verify=self.server_url.startswith('https'),
timeout=10)
client_uma = KeycloakUMA(connection=openid_connection)
return client_uma.resource_set_list_ids(name=name, exact_name=exact_name, uri=uri, owner=owner,
resource_type=resource_type, scope=scope, first=first,
maximum=maximum)

def __query_resources(self, name: str = "",
def __query_resources(self,
client_id,
client_secret,
name: str = "",
exact_name: bool = False,
uri: str = "",
owner: str = "",
Expand Down Expand Up @@ -306,22 +317,46 @@ def __query_resources(self, name: str = "",
query["max"] = maximum
query["deep"] = True

data_raw = self.keycloak_uma.connection.raw_get(
self.keycloak_uma.uma_well_known["resource_registration_endpoint"], **query
openid_connection = KeycloakOpenIDConnection(
server_url=self.server_url,
client_id=client_id,
client_secret_key=client_secret,
realm_name=self.keycloak_admin.realm_name,
verify=self.server_url.startswith('https'),
timeout=10)
client_uma = KeycloakUMA(connection=openid_connection)
data_raw = client_uma.connection.raw_get(
client_uma.uma_well_known["resource_registration_endpoint"], **query
)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])

def get_resource(self, resource_id: str):
return self.keycloak_uma.resource_set_read(resource_id)
def get_resource(self, client_id, client_secret, resource_id: str):
openid_connection = KeycloakOpenIDConnection(
server_url=self.server_url,
client_id=client_id,
client_secret_key=client_secret,
realm_name=self.keycloak_admin.realm_name,
verify=self.server_url.startswith('https'),
timeout=10)
client_uma = KeycloakUMA(connection=openid_connection)
return client_uma.resource_set_read(resource_id)

def get_permission_ticket(self, resources: list[str]):
def get_permission_ticket(self, client_id, client_secret, resources: list[str]):
if not isinstance(resources, list):
resources = [resources]
payload = [
{"resource_id": resource} for resource in resources
]
data = self.keycloak_uma.connection.raw_post(
f"${self.keycloak_uma.connection.base_url}realms/{self.realm}/authz/protection/permission",
openid_connection = KeycloakOpenIDConnection(
server_url=self.server_url,
client_id=client_id,
client_secret_key=client_secret,
realm_name=self.keycloak_admin.realm_name,
verify=self.server_url.startswith('https'),
timeout=10)
client_uma = KeycloakUMA(connection=openid_connection)
data = client_uma.connection.raw_post(
f"${client_uma.connection.base_url}realms/{self.realm}/authz/protection/permission",
data=json.dumps(payload)
)
return raise_error_from_response(data, KeycloakPostError)
Expand Down Expand Up @@ -372,13 +407,23 @@ def __get_service_account_user(self, client_id: str):
)

def get_policies(self,
client_id,
client_secret,
resource: str = "",
name: str = "",
scope: str = "",
first: int = 0,
maximum: int = -1, ) -> list[str]:

return self.keycloak_uma.policy_query(resource, name, scope, first, maximum)
openid_connection = KeycloakOpenIDConnection(
server_url=self.server_url,
client_id=client_id,
client_secret_key=client_secret,
realm_name=self.keycloak_admin.realm_name,
verify=self.server_url.startswith('https'),
timeout=10)
client_uma = KeycloakUMA(connection=openid_connection)
return client_uma.policy_query(resource, name, scope, first, maximum)

def get_client_authz_policies(self, client_id):
_client_id = self.keycloak_admin.get_client_id(client_id)
Expand Down Expand Up @@ -497,9 +542,17 @@ def generate_protection_pat(self, client_id, client_secret):
"client_id": client_id,
"client_secret": client_secret,
}
connection = ConnectionManager(self.keycloak_uma.connection.base_url)
openid_connection = KeycloakOpenIDConnection(
server_url=self.server_url,
client_id=client_id,
client_secret_key=client_secret,
realm_name=self.keycloak_admin.realm_name,
verify=self.server_url.startswith('https'),
timeout=10)
client_uma = KeycloakUMA(connection=openid_connection)
connection = ConnectionManager(client_uma.connection.base_url)
connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = connection.raw_post(self.keycloak_uma.uma_well_known["token_endpoint"], data=payload)
data_raw = connection.raw_post(client_uma.uma_well_known["token_endpoint"], data=payload)
return raise_error_from_response(data_raw, KeycloakPostError)


Expand All @@ -526,7 +579,15 @@ def get_rpt(self, client_id, client_secret, token, ticket):
params_path = {
"realm-name": self.realm
}
connection = ConnectionManager(self.keycloak_uma.connection.base_url)
openid_connection = KeycloakOpenIDConnection(
server_url=self.server_url,
client_id=client_id,
client_secret_key=client_secret,
realm_name=self.keycloak_admin.realm_name,
verify=self.server_url.startswith('https'),
timeout=10)
client_uma = KeycloakUMA(connection=openid_connection)
connection = ConnectionManager(client_uma.connection.base_url)
connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data = connection.raw_post(urls_patterns.URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data, KeycloakPostError)

0 comments on commit 98d1a96

Please sign in to comment.