Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow records with regex #126

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ environments:
- "test.example.com"
```

###### Regex

Additionally to the `records` list a `regex_records` list can be defined.
In this list regex can be to define, which records are allowed.

```yaml
...
environments:
- name: "Test1"
...
zones:
- name: "example.com"
regex_records:
- "_acme-challenge.service-.*.example.com"
```

##### Services

Under a `zone` `services` can be defined.
Expand Down
10 changes: 9 additions & 1 deletion powerdns_api_proxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
RRSETRequest,
ZoneNotAllowedException,
)
from powerdns_api_proxy.utils import check_zones_equal
from powerdns_api_proxy.utils import check_record_in_regex, check_zones_equal


@lru_cache(maxsize=1)
Expand Down Expand Up @@ -147,10 +147,18 @@ def check_rrset_allowed(zone: ProxyConfigZone, rrset: RRSET) -> bool:
if zone.all_records:
return True

if not rrset['name'].rstrip('.').endswith(zone.name.rstrip('.')):
logger.debug('RRSET not allowed, because zone does not match')
return False

for record in zone.records:
if check_zones_equal(rrset['name'], record):
return True

for regex in zone.regex_records:
if check_record_in_regex(rrset['name'], regex):
return True

if check_acme_record_allowed(zone, rrset):
return True

Expand Down
6 changes: 5 additions & 1 deletion powerdns_api_proxy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ class ProxyConfigServices(BaseModel):
class ProxyConfigZone(BaseModel):
'''
`name` is the zone name.
`description` is a description of the zone.
`regex` should be set to `True` if `name` is a regex.
`records` is a list of record names that are allowed.
`regex_records` is a list of record regexes that are allowed.
`admin` enabled creating and deleting the zone.
`subzones` sets the same permissions on all subzones.
`all_records` will be set to `True` if no `records` are defined.
Expand All @@ -30,6 +33,7 @@ class ProxyConfigZone(BaseModel):
regex: bool = False
description: str = ''
records: list[str] = []
regex_records: list[str] = []
services: ProxyConfigServices = ProxyConfigServices(acme=False)
admin: bool = False
subzones: bool = False
Expand All @@ -38,7 +42,7 @@ class ProxyConfigZone(BaseModel):

def __init__(self, **data):
super().__init__(**data)
if len(self.records) == 0:
if len(self.records) == 0 and len(self.regex_records) == 0:
logger.debug(
f'Setting all_records to True for zone {self.name}, because no records are defined'
)
Expand Down
5 changes: 5 additions & 0 deletions powerdns_api_proxy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ def check_zone_in_regex(zone: str, regex: str) -> bool:
return re.match(regex, zone.rstrip('.')) is not None


def check_record_in_regex(record: str, regex: str) -> bool:
'''Checks if record is in regex'''
return re.match(regex, record.rstrip('.')) is not None


def check_zones_equal(zone1: str, zone2: str) -> bool:
'''Checks if zones equal with or without trailing dot'''
return zone1.rstrip('.') == zone2.rstrip('.')
125 changes: 119 additions & 6 deletions tests/unit/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ def test_check_rrset_not_allowed_single_entries():
],
)
for item in [
'entry1.test-zone.example.com.',
'entry2.entry1.test-zone.example.com',
'test-zone.example.com.',
'entry100.test-zone.example.com.',
'entry200.entry1.test-zone.example.com',
'test-record.example.com.',
]:
rrset: RRSET = {
'name': item,
Expand All @@ -293,7 +293,7 @@ def test_check_rrset_not_allowed_single_entries():
'records': [],
'comments': [],
}
assert check_rrset_allowed(zone, rrset)
assert not check_rrset_allowed(zone, rrset)


def test_check_rrsets_request_allowed_no_raise():
Expand Down Expand Up @@ -348,8 +348,8 @@ def test_check_rrsets_request_allowed_raise():
)
with pytest.raises(HTTPException) as err:
ensure_rrsets_request_allowed(zone, request)
assert err.value.status_code == 403
assert err.value.detail == 'RRSET entry1.test-zone.example.com. not allowed'
assert err.value.status_code == 403
assert err.value.detail == 'RRSET entry1.test-zone.example.com. not allowed'


def test_check_rrsets_request_not_allowed_read_only():
Expand Down Expand Up @@ -378,6 +378,119 @@ def test_check_rrsets_request_not_allowed_read_only():
assert err.value.detail == 'RRSET update not allowed with read only token'


def test_rrset_request_not_allowed_regex_empty():
zone = ProxyConfigZone(
name='test-zone.example.com.',
regex_records=[],
)
request: RRSETRequest = {'rrsets': []}
assert ensure_rrsets_request_allowed(zone, request)


def test_rrset_request_allowed_all_regex():
zone = ProxyConfigZone(
name='test-zone.example.com.',
regex_records=[
'.*',
],
)
request: RRSETRequest = {'rrsets': []}
for item in [
'entry1.test-zone.example.com.',
'entry2.entry1.test-zone.example.com',
]:
request['rrsets'].append(
{
'name': item,
'type': 'TXT',
'changetype': 'REPLACE',
'ttl': 3600,
'records': [],
'comments': [],
}
)
assert ensure_rrsets_request_allowed(zone, request)


def test_rrset_request_allowed_acme_regex():
zone = ProxyConfigZone(
name='test-zone.example.com.',
regex_records=[
'_acme-challenge.example.*.test-zone.example.com',
],
)
request: RRSETRequest = {'rrsets': []}
for item in [
'_acme-challenge.example-entry.test-zone.example.com.',
]:
request['rrsets'].append(
{
'name': item,
'type': 'TXT',
'changetype': 'REPLACE',
'ttl': 3600,
'records': [],
'comments': [],
}
)
assert ensure_rrsets_request_allowed(zone, request)


def test_rrset_request_not_allowed_false_regex():
zone = ProxyConfigZone(
name='test-zone.example.com.',
regex_records=[
'example.*.test-zone.example.com',
],
)
request: RRSETRequest = {'rrsets': []}
for item in [
'entry1.test-zone.example.com.',
'entry2.entry1.test-zone.example.com',
]:
request['rrsets'].append(
{
'name': item,
'type': 'TXT',
'changetype': 'REPLACE',
'ttl': 3600,
'records': [],
'comments': [],
}
)
with pytest.raises(HTTPException) as err:
ensure_rrsets_request_allowed(zone, request)
assert err.value.status_code == 403
assert err.value.detail == 'RRSET entry1.test-zone.example.com. not allowed'


def test_rrset_request_not_allowed_false_zone():
zone = ProxyConfigZone(
name='test-zone.example.com.',
regex_records=[
'example.*.test-zone2.example.com',
],
)
request: RRSETRequest = {'rrsets': []}
for item in [
'example1.test-zone2.example.com.',
]:
request['rrsets'].append(
{
'name': item,
'type': 'TXT',
'changetype': 'REPLACE',
'ttl': 3600,
'records': [],
'comments': [],
}
)
with pytest.raises(HTTPException) as err:
ensure_rrsets_request_allowed(zone, request)
assert err.value.status_code == 403
assert err.value.detail == 'RRSET example1.test-zone2.example.com. not allowed'


def test_check_acme_record_allowed_all_records():
zone = ProxyConfigZone(name='test-zone.example.com', all_records=True)
rrset = RRSET(
Expand Down