Skip to content

Commit

Permalink
Introduce policy list/read capabilities (#70)
Browse files Browse the repository at this point in the history
- ACL endpoint change, consul.acl is now consul.acl.token
- add consul.acl.policy.list and acl.policy.read
  • Loading branch information
cpaillet authored May 15, 2024
1 parent 7501962 commit 579a991
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 50 deletions.
10 changes: 10 additions & 0 deletions consul/api/acl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from consul.api.acl.policy import Policy
from consul.api.acl.token import Token


class ACL:
def __init__(self, agent):
self.agent = agent

self.token = self.tokens = Token(agent)
self.policy = self.policies = Policy(agent)
32 changes: 32 additions & 0 deletions consul/api/acl/policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from consul.callback import CB


class Policy:
def __init__(self, agent):
self.agent = agent

def list(self, token=None):
"""
Lists all the active ACL policies. This is a privileged endpoint, and
requires a management token. *token* will override this client's
default token.
Requires a token with acl:read capability. ACLPermissionDenied raised otherwise
"""
params = []
token = token or self.agent.token
if token:
params.append(("token", token))
return self.agent.http.get(CB.json(), "/v1/acl/policies", params=params)

def read(self, uuid, token=None):
"""
Returns the policy information for *id*. Requires a token with acl:read capability.
:param accessor_id: Specifies the UUID of the policy you lookup.
:param token: token with acl:read capability
:return: selected Polic information
"""
params = []
token = token or self.agent.token
if token:
params.append(("token", token))
return self.agent.http.get(CB.json(), f"/v1/acl/policy/{uuid}", params=params)
2 changes: 1 addition & 1 deletion consul/api/acl.py → consul/api/acl/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from consul.callback import CB


class ACL:
class Token:
def __init__(self, agent):
self.agent = agent

Expand Down
125 changes: 76 additions & 49 deletions tests/api/test_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,54 @@


class TestConsulAcl:
def test_acl_permission_denied(self, acl_consul):
def test_acl_token_permission_denied(self, acl_consul):
c, _master_token, _consul_version = acl_consul

# No token
pytest.raises(consul.ACLPermissionDenied, c.acl.list)
pytest.raises(consul.ACLPermissionDenied, c.acl.create)
pytest.raises(consul.ACLPermissionDenied, c.acl.update, accessor_id="00000000-0000-0000-0000-000000000002")
pytest.raises(consul.ACLPermissionDenied, c.acl.clone, accessor_id="00000000-0000-0000-0000-000000000002")
pytest.raises(consul.ACLPermissionDenied, c.acl.read, accessor_id="00000000-0000-0000-0000-000000000002")
pytest.raises(consul.ACLPermissionDenied, c.acl.delete, accessor_id="00000000-0000-0000-0000-000000000002")
pytest.raises(consul.ACLPermissionDenied, c.acl.token.list)
pytest.raises(consul.ACLPermissionDenied, c.acl.token.create)
pytest.raises(
consul.ACLPermissionDenied, c.acl.token.update, accessor_id="00000000-0000-0000-0000-000000000002"
)
pytest.raises(consul.ACLPermissionDenied, c.acl.token.clone, accessor_id="00000000-0000-0000-0000-000000000002")
pytest.raises(consul.ACLPermissionDenied, c.acl.token.read, accessor_id="00000000-0000-0000-0000-000000000002")
pytest.raises(
consul.ACLPermissionDenied, c.acl.token.delete, accessor_id="00000000-0000-0000-0000-000000000002"
)

# Token without the right permission (acl:write or acl:read)
pytest.raises(consul.ACLPermissionDenied, c.acl.list, token="anonymous")
pytest.raises(consul.ACLPermissionDenied, c.acl.create, token="anonymous")
pytest.raises(consul.ACLPermissionDenied, c.acl.token.list, token="anonymous")
pytest.raises(consul.ACLPermissionDenied, c.acl.token.create, token="anonymous")
pytest.raises(
consul.ACLPermissionDenied,
c.acl.update,
c.acl.token.update,
accessor_id="00000000-0000-0000-0000-000000000002",
token="anonymous",
)
pytest.raises(
consul.ACLPermissionDenied,
c.acl.clone,
c.acl.token.clone,
accessor_id="00000000-0000-0000-0000-000000000002",
token="anonymous",
)
pytest.raises(
consul.ACLPermissionDenied,
c.acl.read,
c.acl.token.read,
accessor_id="00000000-0000-0000-0000-000000000002",
token="anonymous",
)
pytest.raises(
consul.ACLPermissionDenied,
c.acl.delete,
c.acl.token.delete,
accessor_id="00000000-0000-0000-0000-000000000002",
token="anonymous",
)

def test_acl_list(self, acl_consul):
def test_acl_token_list(self, acl_consul):
c, master_token, _consul_version = acl_consul

# Make sure both master and anonymous tokens are created
acls = c.acl.list(token=master_token)
acls = c.acl.token.list(token=master_token)

master_token_repr = {
"Description": "Initial Management Token",
Expand All @@ -62,33 +66,33 @@ def test_acl_list(self, acl_consul):
assert find_recursive(acls, master_token_repr)
assert find_recursive(acls, anonymous_token_repr)

def test_acl_read(self, acl_consul):
def test_acl_token_read(self, acl_consul):
c, master_token, _consul_version = acl_consul

# Unknown token
pytest.raises(consul.ConsulException, c.acl.read, accessor_id="unknown", token=master_token)
pytest.raises(consul.ConsulException, c.acl.token.read, accessor_id="unknown", token=master_token)

anonymous_token_repr = {
"AccessorID": "00000000-0000-0000-0000-000000000002",
"SecretID": "anonymous",
}
acl = c.acl.read(accessor_id="00000000-0000-0000-0000-000000000002", token=master_token)
acl = c.acl.token.read(accessor_id="00000000-0000-0000-0000-000000000002", token=master_token)
assert find_recursive(acl, anonymous_token_repr)

def test_acl_create(self, acl_consul):
def test_acl_token_create(self, acl_consul):
c, master_token, _consul_version = acl_consul

c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
c.acl.create(secret_id="DEADBEEF-0000-0000-0000-000000000000", token=master_token)
c.acl.create(
c.acl.token.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
c.acl.token.create(secret_id="DEADBEEF-0000-0000-0000-000000000000", token=master_token)
c.acl.token.create(
secret_id="00000000-A5A5-0000-0000-000000000000",
accessor_id="00000000-0000-A5A5-0000-000000000000",
description="some token!",
token=master_token,
)

assert c.acl.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert c.acl.read(accessor_id="00000000-0000-A5A5-0000-000000000000", token=master_token)
assert c.acl.token.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert c.acl.token.read(accessor_id="00000000-0000-A5A5-0000-000000000000", token=master_token)

expected = [
{
Expand All @@ -105,20 +109,20 @@ def test_acl_create(self, acl_consul):
"Description": "some token!",
},
]
acl = c.acl.list(token=master_token)
acl = c.acl.token.list(token=master_token)
assert find_recursive(acl, expected)

def test_acl_clone(self, acl_consul):
def test_acl_token_clone(self, acl_consul):
c, master_token, _consul_version = acl_consul

assert len(c.acl.list(token=master_token)) == 2
assert len(c.acl.token.list(token=master_token)) == 2

# Unknown token
pytest.raises(consul.ConsulException, c.acl.clone, accessor_id="unknown", token=master_token)
pytest.raises(consul.ConsulException, c.acl.token.clone, accessor_id="unknown", token=master_token)

c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
c.acl.clone(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="cloned", token=master_token)
assert len(c.acl.list(token=master_token)) == 4
c.acl.token.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
c.acl.token.clone(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="cloned", token=master_token)
assert len(c.acl.token.list(token=master_token)) == 4

expected = [
{
Expand All @@ -128,45 +132,68 @@ def test_acl_clone(self, acl_consul):
"Description": "cloned",
},
]
acl = c.acl.list(token=master_token)
acl = c.acl.token.list(token=master_token)
assert find_recursive(acl, expected)

def test_acl_update(self, acl_consul):
def test_acl_token_update(self, acl_consul):
c, master_token, _consul_version = acl_consul

# Unknown token
pytest.raises(consul.ConsulException, c.acl.update, accessor_id="unknown", token=master_token)
pytest.raises(consul.ConsulException, c.acl.token.update, accessor_id="unknown", token=master_token)

assert len(c.acl.list(token=master_token)) == 2
c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="original", token=master_token)
assert len(c.acl.list(token=master_token)) == 3
c.acl.update(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="updated", token=master_token)
assert len(c.acl.list(token=master_token)) == 3
assert len(c.acl.token.list(token=master_token)) == 2
c.acl.token.create(
accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="original", token=master_token
)
assert len(c.acl.token.list(token=master_token)) == 3
c.acl.token.update(
accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="updated", token=master_token
)
assert len(c.acl.token.list(token=master_token)) == 3

expected = {
"AccessorID": "00000000-DEAD-BEEF-0000-000000000000",
"Description": "updated",
}
acl = c.acl.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
acl = c.acl.token.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert find_recursive(acl, expected)

def test_acl_delete(self, acl_consul):
def test_acl_token_delete(self, acl_consul):
c, master_token, _consul_version = acl_consul

assert len(c.acl.list(token=master_token)) == 2
c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert len(c.acl.list(token=master_token)) == 3
assert c.acl.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert len(c.acl.token.list(token=master_token)) == 2
c.acl.token.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert len(c.acl.token.list(token=master_token)) == 3
assert c.acl.token.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)

# Delete and ensure it doesn't exist anymore
c.acl.delete(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert len(c.acl.list(token=master_token)) == 2
c.acl.token.delete(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token)
assert len(c.acl.token.list(token=master_token)) == 2
pytest.raises(
consul.ConsulException, c.acl.read, accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token
consul.ConsulException,
c.acl.token.read,
accessor_id="00000000-DEAD-BEEF-0000-000000000000",
token=master_token,
)

def test_acl_policy_list(self, acl_consul):
c, master_token, _consul_version = acl_consul

# Make sure both master and anonymous tokens are created
policies = c.acl.policy.list(token=master_token)
assert find_recursive(policies, {"ID": "00000000-0000-0000-0000-000000000001", "Name": "global-management"})

def test_acl_policy_read(self, acl_consul):
c, master_token, _consul_version = acl_consul

# Unknown token
pytest.raises(consul.ConsulException, c.acl.policy.read, uuid="unknown", token=master_token)

policy = c.acl.policy.read(uuid="00000000-0000-0000-0000-000000000001", token=master_token)
assert find_recursive(policy, {"ID": "00000000-0000-0000-0000-000000000001", "Name": "global-management"})

#
# def test_acl_implicit_token_use(self, acl_consul):
# def test_acl_token_implicit_token_use(self, acl_consul):
# # configure client to use the master token by default
# port, _token, _consul_version = acl_consul
# c = consul.Consul(port=port)
Expand Down

0 comments on commit 579a991

Please sign in to comment.