Skip to content

Commit

Permalink
Merge pull request #101 from dlucredativ/postgres
Browse files Browse the repository at this point in the history
Add postgres cache support
  • Loading branch information
Snawoot authored Aug 3, 2023
2 parents 1273feb + 7b90da1 commit b100b60
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ WORKDIR /build
RUN true \
&& apk add --no-cache --virtual .build-deps alpine-sdk libffi-dev \
&& apk add --no-cache libffi \
&& pip3 install --no-cache-dir .[sqlite,redis,uvloop] \
&& pip3 install --no-cache-dir .[sqlite,redis,postgres,uvloop] \
&& mkdir /var/lib/mta-sts \
&& chown -R "$USER:$USER" /build /var/lib/mta-sts \
&& apk del .build-deps \
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ All dependency packages installed automatically if this package is installed via
Run:

```bash
sudo python3 -m pip install postfix-mta-sts-resolver[redis,sqlite]
sudo python3 -m pip install postfix-mta-sts-resolver[redis,sqlite,postgres]
```

If you don't need `redis` or `sqlite` support, you may omit one of them in square brackets. If you don't need any of them and you plan to use internal cache without persistence, you should also omit square brackets.
Expand All @@ -53,10 +53,10 @@ All pip invocations can be run with `--user` option of `pip` installer. In this
Run in project directory:

```bash
sudo python3 -m pip install .[redis,sqlite]
sudo python3 -m pip install .[redis,sqlite,postgres]
```

If you don't need `redis` or `sqlite` support, you may omit one of them in square brackets. If you don't need any of them and you plan to use internal cache without persistence, you should also omit square brackets.
If you don't need `redis`, `sqlite` or `postgres` support, you may omit one of them in square brackets. If you don't need any of them and you plan to use internal cache without persistence, you should also omit square brackets.

Package scripts shall be available in standard executable locations upon completion.

Expand Down
15 changes: 15 additions & 0 deletions config_examples/mta-sts-daemon.yml.postgres
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
host: 127.0.0.1
port: 8461
reuse_port: true
shutdown_timeout: 20
cache:
type: postgres
options:
dsn: postgres://user@%2Frun%2Fpostgresql/user
default_zone:
strict_testing: false
timeout: 4
zones:
myzone:
strict_testing: false
timeout: 4
1 change: 1 addition & 0 deletions postfix_mta_sts_resolver/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
INTERNAL_CACHE_SIZE = 10000
SQLITE_THREADS = cpu_count()
SQLITE_TIMEOUT = 5
POSTGRES_TIMEOUT = 5
REDIS_CONNECT_TIMEOUT = 5
REDIS_TIMEOUT = 5
CACHE_GRACE = 60
Expand Down
110 changes: 110 additions & 0 deletions postfix_mta_sts_resolver/postgres_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# pylint: disable=invalid-name,protected-access

import asyncio
import asyncpg
import json
import logging

from .defaults import POSTGRES_TIMEOUT
from .base_cache import BaseCache, CacheEntry


class PostgresCache(BaseCache):
def __init__(self, *, timeout=POSTGRES_TIMEOUT, **kwargs):
self._last_proactive_fetch_ts_id = 1
asyncpglogger = logging.getLogger("asyncpg")
if not asyncpglogger.hasHandlers(): # pragma: no cover
asyncpglogger.addHandler(logging.NullHandler())
self._timeout = timeout
self._pool = None
self.kwargs = kwargs

async def setup(self):
queries = [
"CREATE TABLE IF NOT EXISTS proactive_fetch_ts "
"(id serial primary key, last_fetch_ts integer)",
"CREATE TABLE IF NOT EXISTS sts_policy_cache "
"(id serial primary key, domain text, ts integer, pol_id text, pol_body jsonb)",
"CREATE UNIQUE INDEX IF NOT EXISTS sts_policy_domain ON sts_policy_cache (domain)",
"CREATE INDEX IF NOT EXISTS sts_policy_domain_ts ON sts_policy_cache (domain, ts)",
]

async def set_type_codec(conn):
await conn.set_type_codec(
'jsonb',
encoder=json.dumps,
decoder=json.loads,
schema='pg_catalog',
)

self._pool = await asyncpg.create_pool(init=set_type_codec, **self.kwargs)
async with self._pool.acquire(timeout=self._timeout) as conn:
async with conn.transaction():
for q in queries:
await conn.execute(q)

async def get_proactive_fetch_ts(self):
async with self._pool.acquire(timeout=self._timeout) as conn, conn.transaction():
cur = await conn.cursor('SELECT last_fetch_ts FROM '
'proactive_fetch_ts where id = $1',
self._last_proactive_fetch_ts_id)
res = await cur.fetchrow()
return int(res[0]) if res is not None else 0

async def set_proactive_fetch_ts(self, timestamp):
async with self._pool.acquire(timeout=self._timeout) as conn, conn.transaction():
await conn.execute("""
INSERT INTO proactive_fetch_ts (last_fetch_ts, id)
VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET last_fetch_ts = EXCLUDED.last_fetch_ts
""",
int(timestamp), self._last_proactive_fetch_ts_id,
)

async def get(self, key):
async with self._pool.acquire(timeout=self._timeout) as conn, conn.transaction():
cur = await conn.cursor('SELECT ts, pol_id, pol_body FROM '
'sts_policy_cache WHERE domain=$1',
key)
res = await cur.fetchrow()
if res is not None:
ts, pol_id, pol_body = res
ts = int(ts)
return CacheEntry(ts, pol_id, pol_body)
else:
return None

async def set(self, key, value):
ts, pol_id, pol_body = value
async with self._pool.acquire(timeout=self._timeout) as conn, conn.transaction():
await conn.execute("""
INSERT INTO sts_policy_cache (domain, ts, pol_id, pol_body) VALUES ($1, $2, $3, $4)
ON CONFLICT (domain) DO UPDATE
SET ts = EXCLUDED.ts, pol_id = EXCLUDED.pol_id, pol_body = EXCLUDED.pol_body
WHERE sts_policy_cache.ts < EXCLUDED.ts
""", key, int(ts), pol_id, pol_body)

async def scan(self, token, amount_hint):
if token is None:
token = 1

async with self._pool.acquire(timeout=self._timeout) as conn, conn.transaction():
res = await conn.fetch('SELECT id, ts, pol_id, pol_body, domain FROM '
'sts_policy_cache WHERE id >= $1 LIMIT $2',
token, amount_hint)
if res:
result = []
new_token = token
for row in res:
rowid, ts, pol_id, pol_body, domain = row
ts = int(ts)
rowid = int(rowid)
new_token = max(new_token, rowid)
result.append((domain, CacheEntry(ts, pol_id, pol_body)))
new_token += 1
return new_token, result
else:
return None, []

async def teardown(self):
await self._pool.close()
4 changes: 4 additions & 0 deletions postfix_mta_sts_resolver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ def create_cache(cache_type, options):
# pylint: disable=import-outside-toplevel
from . import redis_cache
cache = redis_cache.RedisSentinelCache(**options)
elif cache_type == "postgres":
# pylint: disable=import-outside-toplevel
from . import postgres_cache
cache = postgres_cache.PostgresCache(**options)
else:
raise NotImplementedError("Unsupported cache type!")
return cache
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
extras_require={
'sqlite': 'aiosqlite>=0.10.0',
'redis': 'redis>=4.2.0rc1',
'postgres': 'asyncpg>=0.27',
'dev': [
'pytest>=3.0.0',
'pytest-cov',
Expand Down
4 changes: 2 additions & 2 deletions tests/install.debian.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ PYTHON="${PYTHON:-python3}"
# run under travis, but not under autopkgtest
if [ -z "${AUTOPKGTEST_TMP+x}" ] ; then
apt-get update
apt-get install -y redis-server dnsmasq lsof nginx-extras tinyproxy \
apt-get install -y redis-server postgresql dnsmasq lsof nginx-extras tinyproxy \
build-essential libssl-dev libffi-dev python3-dev cargo
systemctl start redis-server || { journalctl -xe ; false ; }
"$PYTHON" -m pip install cryptography
Expand All @@ -17,7 +17,7 @@ fi
install -m 644 tests/resolv.conf /etc/resolv-dnsmasq.conf
cat tests/dnsmasq.conf.appendix >> /etc/dnsmasq.conf
echo 'nameserver 127.0.0.1' > /etc/resolv.conf
systemctl restart dnsmasq || { journalctl -xe ; false ; }
systemctl restart dnsmasq postgresql || { journalctl -xe ; false ; }


# certificates for the test cases
Expand Down
14 changes: 14 additions & 0 deletions tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ async def setup_cache(cache_type, cache_opts):
await cache.setup()
if cache_type == 'redis':
await cache._pool.flushdb()
if cache_type == 'postgres':
async with cache._pool.acquire() as conn:
await conn.execute('TRUNCATE sts_policy_cache')
await conn.execute('TRUNCATE proactive_fetch_ts')
return cache, tmpfile

@pytest.mark.parametrize("cache_type,cache_opts,safe_set", [
Expand All @@ -23,6 +27,8 @@ async def setup_cache(cache_type, cache_opts):
("sqlite", {}, False),
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, True),
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, False)
("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, True),
("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, False),
])
@pytest.mark.asyncio
async def test_cache_lifecycle(cache_type, cache_opts, safe_set):
Expand All @@ -47,6 +53,8 @@ async def test_cache_lifecycle(cache_type, cache_opts, safe_set):
("internal", {}),
("sqlite", {}),
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}),
("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}),
("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}),
])
@pytest.mark.asyncio
async def test_proactive_fetch_ts_lifecycle(cache_type, cache_opts):
Expand Down Expand Up @@ -84,6 +92,12 @@ async def test_proactive_fetch_ts_lifecycle(cache_type, cache_opts):
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 3, 4),
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 0, 4),
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, constants.DOMAIN_QUEUE_LIMIT*2, constants.DOMAIN_QUEUE_LIMIT),
("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, 3, 1),
("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, 3, 2),
("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, 3, 3),
("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, 3, 4),
("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, 0, 4),
("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, constants.DOMAIN_QUEUE_LIMIT*2, constants.DOMAIN_QUEUE_LIMIT),
])
@pytest.mark.timeout(10)
@pytest.mark.asyncio
Expand Down
2 changes: 1 addition & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_populate_cfg_defaults(cfg):
assert isinstance(res['proactive_policy_fetching']['concurrency_limit'], int)
assert isinstance(res['proactive_policy_fetching']['grace_ratio'], (int, float))
assert isinstance(res['cache'], collections.abc.Mapping)
assert res['cache']['type'] in ('redis', 'sqlite', 'internal')
assert res['cache']['type'] in ('redis', 'sqlite', 'postgres', 'internal')
assert isinstance(res['default_zone'], collections.abc.Mapping)
assert isinstance(res['zones'], collections.abc.Mapping)
for zone in list(res['zones'].values()) + [res['default_zone']]:
Expand Down

0 comments on commit b100b60

Please sign in to comment.