diff --git a/consul/api/acl/__init__.py b/consul/api/acl/__init__.py new file mode 100644 index 0000000..d19ceda --- /dev/null +++ b/consul/api/acl/__init__.py @@ -0,0 +1,10 @@ +from consul.api.acl.policy import Policy +from consul.api.acl.token import Token + + +class ACL: + def __init__(self, agent): + self.agent = agent + + self.token = self.tokens = Token(agent) + self.policy = self.policies = Policy(agent) diff --git a/consul/api/acl/policy.py b/consul/api/acl/policy.py new file mode 100644 index 0000000..6787ac1 --- /dev/null +++ b/consul/api/acl/policy.py @@ -0,0 +1,32 @@ +from consul.callback import CB + + +class Policy: + def __init__(self, agent): + self.agent = agent + + def list(self, token=None): + """ + Lists all the active ACL policies. This is a privileged endpoint, and + requires a management token. *token* will override this client's + default token. + Requires a token with acl:read capability. ACLPermissionDenied raised otherwise + """ + params = [] + token = token or self.agent.token + if token: + params.append(("token", token)) + return self.agent.http.get(CB.json(), "/v1/acl/policies", params=params) + + def read(self, uuid, token=None): + """ + Returns the policy information for *id*. Requires a token with acl:read capability. + :param accessor_id: Specifies the UUID of the policy you lookup. + :param token: token with acl:read capability + :return: selected Polic information + """ + params = [] + token = token or self.agent.token + if token: + params.append(("token", token)) + return self.agent.http.get(CB.json(), f"/v1/acl/policy/{uuid}", params=params) diff --git a/consul/api/acl.py b/consul/api/acl/token.py similarity index 99% rename from consul/api/acl.py rename to consul/api/acl/token.py index 017b04a..a0ad534 100644 --- a/consul/api/acl.py +++ b/consul/api/acl/token.py @@ -3,7 +3,7 @@ from consul.callback import CB -class ACL: +class Token: def __init__(self, agent): self.agent = agent diff --git a/tests/api/test_acl.py b/tests/api/test_acl.py index 989615e..367d5e4 100644 --- a/tests/api/test_acl.py +++ b/tests/api/test_acl.py @@ -5,50 +5,54 @@ class TestConsulAcl: - def test_acl_permission_denied(self, acl_consul): + def test_acl_token_permission_denied(self, acl_consul): c, _master_token, _consul_version = acl_consul # No token - pytest.raises(consul.ACLPermissionDenied, c.acl.list) - pytest.raises(consul.ACLPermissionDenied, c.acl.create) - pytest.raises(consul.ACLPermissionDenied, c.acl.update, accessor_id="00000000-0000-0000-0000-000000000002") - pytest.raises(consul.ACLPermissionDenied, c.acl.clone, accessor_id="00000000-0000-0000-0000-000000000002") - pytest.raises(consul.ACLPermissionDenied, c.acl.read, accessor_id="00000000-0000-0000-0000-000000000002") - pytest.raises(consul.ACLPermissionDenied, c.acl.delete, accessor_id="00000000-0000-0000-0000-000000000002") + pytest.raises(consul.ACLPermissionDenied, c.acl.token.list) + pytest.raises(consul.ACLPermissionDenied, c.acl.token.create) + pytest.raises( + consul.ACLPermissionDenied, c.acl.token.update, accessor_id="00000000-0000-0000-0000-000000000002" + ) + pytest.raises(consul.ACLPermissionDenied, c.acl.token.clone, accessor_id="00000000-0000-0000-0000-000000000002") + pytest.raises(consul.ACLPermissionDenied, c.acl.token.read, accessor_id="00000000-0000-0000-0000-000000000002") + pytest.raises( + consul.ACLPermissionDenied, c.acl.token.delete, accessor_id="00000000-0000-0000-0000-000000000002" + ) # Token without the right permission (acl:write or acl:read) - pytest.raises(consul.ACLPermissionDenied, c.acl.list, token="anonymous") - pytest.raises(consul.ACLPermissionDenied, c.acl.create, token="anonymous") + pytest.raises(consul.ACLPermissionDenied, c.acl.token.list, token="anonymous") + pytest.raises(consul.ACLPermissionDenied, c.acl.token.create, token="anonymous") pytest.raises( consul.ACLPermissionDenied, - c.acl.update, + c.acl.token.update, accessor_id="00000000-0000-0000-0000-000000000002", token="anonymous", ) pytest.raises( consul.ACLPermissionDenied, - c.acl.clone, + c.acl.token.clone, accessor_id="00000000-0000-0000-0000-000000000002", token="anonymous", ) pytest.raises( consul.ACLPermissionDenied, - c.acl.read, + c.acl.token.read, accessor_id="00000000-0000-0000-0000-000000000002", token="anonymous", ) pytest.raises( consul.ACLPermissionDenied, - c.acl.delete, + c.acl.token.delete, accessor_id="00000000-0000-0000-0000-000000000002", token="anonymous", ) - def test_acl_list(self, acl_consul): + def test_acl_token_list(self, acl_consul): c, master_token, _consul_version = acl_consul # Make sure both master and anonymous tokens are created - acls = c.acl.list(token=master_token) + acls = c.acl.token.list(token=master_token) master_token_repr = { "Description": "Initial Management Token", @@ -62,33 +66,33 @@ def test_acl_list(self, acl_consul): assert find_recursive(acls, master_token_repr) assert find_recursive(acls, anonymous_token_repr) - def test_acl_read(self, acl_consul): + def test_acl_token_read(self, acl_consul): c, master_token, _consul_version = acl_consul # Unknown token - pytest.raises(consul.ConsulException, c.acl.read, accessor_id="unknown", token=master_token) + pytest.raises(consul.ConsulException, c.acl.token.read, accessor_id="unknown", token=master_token) anonymous_token_repr = { "AccessorID": "00000000-0000-0000-0000-000000000002", "SecretID": "anonymous", } - acl = c.acl.read(accessor_id="00000000-0000-0000-0000-000000000002", token=master_token) + acl = c.acl.token.read(accessor_id="00000000-0000-0000-0000-000000000002", token=master_token) assert find_recursive(acl, anonymous_token_repr) - def test_acl_create(self, acl_consul): + def test_acl_token_create(self, acl_consul): c, master_token, _consul_version = acl_consul - c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) - c.acl.create(secret_id="DEADBEEF-0000-0000-0000-000000000000", token=master_token) - c.acl.create( + c.acl.token.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + c.acl.token.create(secret_id="DEADBEEF-0000-0000-0000-000000000000", token=master_token) + c.acl.token.create( secret_id="00000000-A5A5-0000-0000-000000000000", accessor_id="00000000-0000-A5A5-0000-000000000000", description="some token!", token=master_token, ) - assert c.acl.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) - assert c.acl.read(accessor_id="00000000-0000-A5A5-0000-000000000000", token=master_token) + assert c.acl.token.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + assert c.acl.token.read(accessor_id="00000000-0000-A5A5-0000-000000000000", token=master_token) expected = [ { @@ -105,20 +109,20 @@ def test_acl_create(self, acl_consul): "Description": "some token!", }, ] - acl = c.acl.list(token=master_token) + acl = c.acl.token.list(token=master_token) assert find_recursive(acl, expected) - def test_acl_clone(self, acl_consul): + def test_acl_token_clone(self, acl_consul): c, master_token, _consul_version = acl_consul - assert len(c.acl.list(token=master_token)) == 2 + assert len(c.acl.token.list(token=master_token)) == 2 # Unknown token - pytest.raises(consul.ConsulException, c.acl.clone, accessor_id="unknown", token=master_token) + pytest.raises(consul.ConsulException, c.acl.token.clone, accessor_id="unknown", token=master_token) - c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) - c.acl.clone(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="cloned", token=master_token) - assert len(c.acl.list(token=master_token)) == 4 + c.acl.token.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + c.acl.token.clone(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="cloned", token=master_token) + assert len(c.acl.token.list(token=master_token)) == 4 expected = [ { @@ -128,45 +132,68 @@ def test_acl_clone(self, acl_consul): "Description": "cloned", }, ] - acl = c.acl.list(token=master_token) + acl = c.acl.token.list(token=master_token) assert find_recursive(acl, expected) - def test_acl_update(self, acl_consul): + def test_acl_token_update(self, acl_consul): c, master_token, _consul_version = acl_consul # Unknown token - pytest.raises(consul.ConsulException, c.acl.update, accessor_id="unknown", token=master_token) + pytest.raises(consul.ConsulException, c.acl.token.update, accessor_id="unknown", token=master_token) - assert len(c.acl.list(token=master_token)) == 2 - c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="original", token=master_token) - assert len(c.acl.list(token=master_token)) == 3 - c.acl.update(accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="updated", token=master_token) - assert len(c.acl.list(token=master_token)) == 3 + assert len(c.acl.token.list(token=master_token)) == 2 + c.acl.token.create( + accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="original", token=master_token + ) + assert len(c.acl.token.list(token=master_token)) == 3 + c.acl.token.update( + accessor_id="00000000-DEAD-BEEF-0000-000000000000", description="updated", token=master_token + ) + assert len(c.acl.token.list(token=master_token)) == 3 expected = { "AccessorID": "00000000-DEAD-BEEF-0000-000000000000", "Description": "updated", } - acl = c.acl.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + acl = c.acl.token.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) assert find_recursive(acl, expected) - def test_acl_delete(self, acl_consul): + def test_acl_token_delete(self, acl_consul): c, master_token, _consul_version = acl_consul - assert len(c.acl.list(token=master_token)) == 2 - c.acl.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) - assert len(c.acl.list(token=master_token)) == 3 - assert c.acl.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + assert len(c.acl.token.list(token=master_token)) == 2 + c.acl.token.create(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + assert len(c.acl.token.list(token=master_token)) == 3 + assert c.acl.token.read(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) # Delete and ensure it doesn't exist anymore - c.acl.delete(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) - assert len(c.acl.list(token=master_token)) == 2 + c.acl.token.delete(accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token) + assert len(c.acl.token.list(token=master_token)) == 2 pytest.raises( - consul.ConsulException, c.acl.read, accessor_id="00000000-DEAD-BEEF-0000-000000000000", token=master_token + consul.ConsulException, + c.acl.token.read, + accessor_id="00000000-DEAD-BEEF-0000-000000000000", + token=master_token, ) + def test_acl_policy_list(self, acl_consul): + c, master_token, _consul_version = acl_consul + + # Make sure both master and anonymous tokens are created + policies = c.acl.policy.list(token=master_token) + assert find_recursive(policies, {"ID": "00000000-0000-0000-0000-000000000001", "Name": "global-management"}) + + def test_acl_policy_read(self, acl_consul): + c, master_token, _consul_version = acl_consul + + # Unknown token + pytest.raises(consul.ConsulException, c.acl.policy.read, uuid="unknown", token=master_token) + + policy = c.acl.policy.read(uuid="00000000-0000-0000-0000-000000000001", token=master_token) + assert find_recursive(policy, {"ID": "00000000-0000-0000-0000-000000000001", "Name": "global-management"}) + # - # def test_acl_implicit_token_use(self, acl_consul): + # def test_acl_token_implicit_token_use(self, acl_consul): # # configure client to use the master token by default # port, _token, _consul_version = acl_consul # c = consul.Consul(port=port)