Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix permissions for non-superusers to use lantern.async_task #314

Merged
merged 3 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ set (_update_files
sql/updates/0.2.1--0.2.2.sql
sql/updates/0.2.2--0.2.3.sql
sql/updates/0.2.3--0.2.4.sql
sql/updates/0.2.4--0.2.5.sql
)

# Generate version information for the binary
Expand Down
46 changes: 43 additions & 3 deletions sql/lantern.sql
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ BEGIN
RAISE NOTICE 'pg_cron extension not found. Skipping lantern async task setup';
RETURN;
END IF;
GRANT USAGE ON SCHEMA cron TO PUBLIC;

CREATE TABLE lantern.tasks (
jobid bigserial primary key,
Expand All @@ -500,7 +501,8 @@ BEGIN
error_message text
);

GRANT SELECT ON lantern.tasks TO public;
GRANT SELECT, INSERT, UPDATE, DELETE ON lantern.tasks TO public;
GRANT USAGE, SELECT ON SEQUENCE lantern.tasks_jobid_seq TO public;
ALTER TABLE lantern.tasks ENABLE ROW LEVEL SECURITY;
CREATE POLICY lantern_tasks_policy ON lantern.tasks USING (username OPERATOR(pg_catalog.=) current_user);

Expand All @@ -521,7 +523,7 @@ BEGIN
-- Get the job name from the jobid
-- Call the job finalizer if corresponding job exists BOTH in lantern async tasks AND
-- active cron jobs
UPDATE lantern.tasks t SET
UPDATE lantern.tasks t SET
(duration, status, error_message, pg_cron_job_name) = (run.end_time - t.started_at, NEW.status,
CASE WHEN NEW.status = 'failed' THEN return_message ELSE NULL END,
c.jobname )
Expand All @@ -532,9 +534,16 @@ BEGIN
t.pg_cron_job_name = c.jobname AND
c.jobid = NEW.jobid
-- using returning as a trick to run the unschedule function as a side effect
RETURNING cron.unschedule(t.pg_cron_job_name) INTO res;
-- Note: have to unschedule by jobid because of pg_cron#320 https://github.com/citusdata/pg_cron/issues/320
RETURNING cron.unschedule(t.jobid) INTO res;

RETURN NEW;

EXCEPTION
WHEN OTHERS THEN
RAISE WARNING 'Lantern Async tasks: Unknown job failure in % % %', NEW, SQLERRM, SQLSTATE;
PERFORM cron.unschedule(NEW.jobid);
RETURN NEW;
END
$$ LANGUAGE plpgsql;

Expand Down Expand Up @@ -887,3 +896,34 @@ $weighted_vector_search$ LANGUAGE plpgsql;

SELECT _lantern_internal.maybe_setup_weighted_vector_search();
DROP FUNCTION _lantern_internal.maybe_setup_weighted_vector_search;

-- helper function to mask large vectors in explain outputs of queries containing vectors
CREATE OR REPLACE FUNCTION lantern.masked_explain(
query text,
do_analyze boolean = true,
buffers boolean = true,
costs boolean = true,
timing boolean = true
) RETURNS text AS $$
DECLARE
explain_query text;
explain_output jsonb;
flags text = '';
BEGIN
IF do_analyze THEN
flags := flags || 'ANALYZE, ';
END IF;
IF buffers THEN
flags := flags || 'BUFFERS, ';
END IF;
IF costs THEN
flags := flags || 'COSTS, ';
END IF;
IF timing THEN
flags := flags || 'TIMING ';
END IF;
explain_query := format('EXPLAIN (%s, FORMAT JSON) %s', flags, query);
EXECUTE explain_query INTO explain_output;
RETURN jsonb_pretty(_lantern_internal.mask_order_by_in_plan(explain_output));
END $$ LANGUAGE plpgsql;

97 changes: 97 additions & 0 deletions sql/updates/0.2.4--0.2.5.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@

DROP TRIGGER IF EXISTS status_change_trigger ON cron.job_run_details;
DROP FUNCTION IF EXISTS _lantern_internal.async_task_finalizer_trigger();

-- helper function to mask large vectors in explain outputs of queries containing vectors
CREATE OR REPLACE FUNCTION lantern.masked_explain(
query text,
do_analyze boolean = true,
buffers boolean = true,
costs boolean = true,
timing boolean = true
) RETURNS text AS $$
DECLARE
explain_query text;
explain_output jsonb;
flags text = '';
BEGIN
IF do_analyze THEN
flags := flags || 'ANALYZE, ';
END IF;
IF buffers THEN
flags := flags || 'BUFFERS, ';
END IF;
IF costs THEN
flags := flags || 'COSTS, ';
END IF;
IF timing THEN
flags := flags || 'TIMING ';
END IF;
explain_query := format('EXPLAIN (%s, FORMAT JSON) %s', flags, query);
EXECUTE explain_query INTO explain_output;
RETURN jsonb_pretty(_lantern_internal.mask_order_by_in_plan(explain_output));
END $$ LANGUAGE plpgsql;

DO $async_update$
BEGIN
IF NOT (SELECT EXISTS (SELECT 1 FROM information_schema.schemata WHERE schema_name = 'cron'))
THEN
RAISE NOTICE 'pg_cron extension not found. Skipping lantern async task setup';
RETURN;
END IF;

GRANT USAGE ON SCHEMA cron TO PUBLIC;
GRANT SELECT, INSERT, UPDATE, DELETE ON lantern.tasks TO public;
GRANT USAGE, SELECT ON SEQUENCE lantern.tasks_jobid_seq TO public;

-- create a trigger and added to cron.job_run_details
CREATE OR REPLACE FUNCTION _lantern_internal.async_task_finalizer_trigger() RETURNS TRIGGER AS $$
DECLARE
res RECORD;
BEGIN
-- if NEW.status is one of "starting", "running", "sending, "connecting", return
IF NEW.status IN ('starting', 'running', 'sending', 'connecting') THEN
RETURN NEW;
END IF;

IF NEW.status NOT IN ('succeeded', 'failed') THEN
RAISE WARNING 'Lantern Async tasks: Unexpected status %', NEW.status;
END IF;

-- Get the job name from the jobid
-- Call the job finalizer if corresponding job exists BOTH in lantern async tasks AND
-- active cron jobs
UPDATE lantern.tasks t SET
(duration, status, error_message, pg_cron_job_name) = (run.end_time - t.started_at, NEW.status,
CASE WHEN NEW.status = 'failed' THEN return_message ELSE NULL END,
c.jobname )
FROM cron.job c
LEFT JOIN cron.job_run_details run
ON c.jobid = run.jobid
WHERE
t.pg_cron_job_name = c.jobname AND
c.jobid = NEW.jobid
-- using returning as a trick to run the unschedule function as a side effect
-- Note: have to unschedule by jobid because of pg_cron#320 https://github.com/citusdata/pg_cron/issues/320
RETURNING cron.unschedule(t.jobid) INTO res;

RETURN NEW;

EXCEPTION
WHEN OTHERS THEN
RAISE WARNING 'Lantern Async tasks: Unknown job failure in % % %', NEW, SQLERRM, SQLSTATE;
PERFORM cron.unschedule(NEW.jobid);
RETURN NEW;
END
$$ LANGUAGE plpgsql;

CREATE TRIGGER status_change_trigger
AFTER UPDATE OF status
ON cron.job_run_details
FOR EACH ROW
WHEN (OLD.status IS DISTINCT FROM NEW.status)
EXECUTE FUNCTION _lantern_internal.async_task_finalizer_trigger();

END;
$async_update$ LANGUAGE plpgsql;
-- N.B.: the block above may return early. DO NOT put anything down here
48 changes: 48 additions & 0 deletions test/expected/async_tasks.out
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,51 @@ SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done
(4 rows)

-- NOTE: the test finishes but the async index creation may still be in progress
-- create non superuser and test the function
SET client_min_messages = WARNING;
-- suppress NOTICE: role "test_user" does not exist, skipping
DROP USER IF EXISTS test_user_async;
SET client_min_messages = NOTICE;
CREATE USER test_user_async WITH PASSWORD 'test_password';
GRANT SELECT ON "sift_base1k_UpperCase" TO test_user_async;
GRANT SELECT ON sift_base1k_id_seq TO test_user_async;
SET ROLE test_user_async;
SELECT lantern.async_task($$SELECT 1$$, 'simple job');
NOTICE: Job scheduled with pg_cron name: 'async_task_5'
async_task
------------
5
(1 row)

SELECT lantern.async_task($$CREATE INDEX idx2 ON "sift_base1k_UpperCase" USING lantern_hnsw (v) WITH (dim=128, M=6);$$, 'Indexing Job');
NOTICE: Job scheduled with pg_cron name: 'async_task_6'
async_task
------------
6
(1 row)

-- this should fail since test_user does not have permission to drop the table
-- sql line for do not stop on error
SELECT lantern.async_task('DROP TABLE "sift_base1k_UpperCase";', 'Dropping Table Job');
NOTICE: Job scheduled with pg_cron name: 'async_task_7'
async_task
------------
7
(1 row)

SELECT pg_sleep(4);
pg_sleep
----------

(1 row)

SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done, status, error_message FROM lantern.tasks ORDER BY jobid;
jobid | query | pg_cron_job_name | job_name | is_done | status | error_message
-------+------------------------------------------------------------------------------------------+------------------+--------------------+---------+-----------+------------------------------------------------------
5 | SELECT 1 | async_task_5 | simple job | t | succeeded |
6 | CREATE INDEX idx2 ON "sift_base1k_UpperCase" USING lantern_hnsw (v) WITH (dim=128, M=6); | async_task_6 | Indexing Job | t | failed | ERROR: must be owner of table sift_base1k_UpperCase+
| | | | | |
7 | DROP TABLE "sift_base1k_UpperCase"; | async_task_7 | Dropping Table Job | t | failed | ERROR: must be owner of table sift_base1k_UpperCase+
| | | | | |
(3 rows)

3 changes: 2 additions & 1 deletion test/expected/ext_relocation.out
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ORDER BY 1, 3, 2;
schema1 | quantize_vector | _lantern_internal
schema1 | reindex_lantern_indexes | _lantern_internal
schema1 | validate_index | _lantern_internal
schema1 | masked_explain | lantern
schema1 | cos_dist | schema1
schema1 | create_pq_codebook | schema1
schema1 | dequantize_vector | schema1
Expand All @@ -62,7 +63,7 @@ ORDER BY 1, 3, 2;
schema1 | ldb_pqvec_send | schema1
schema1 | quantize_table | schema1
schema1 | quantize_vector | schema1
(28 rows)
(29 rows)

-- show all the extension operators
SELECT ne.nspname AS extschema, op.oprname, np.nspname AS proschema
Expand Down
20 changes: 20 additions & 0 deletions test/sql/async_tasks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,23 @@ SELECT _lantern_internal.validate_index('idx', false);

SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done, status, error_message FROM lantern.tasks;
-- NOTE: the test finishes but the async index creation may still be in progress

-- create non superuser and test the function
SET client_min_messages = WARNING;
-- suppress NOTICE: role "test_user" does not exist, skipping
DROP USER IF EXISTS test_user_async;
SET client_min_messages = NOTICE;
CREATE USER test_user_async WITH PASSWORD 'test_password';
GRANT SELECT ON "sift_base1k_UpperCase" TO test_user_async;
GRANT SELECT ON sift_base1k_id_seq TO test_user_async;

SET ROLE test_user_async;

SELECT lantern.async_task($$SELECT 1$$, 'simple job');

SELECT lantern.async_task($$CREATE INDEX idx2 ON "sift_base1k_UpperCase" USING lantern_hnsw (v) WITH (dim=128, M=6);$$, 'Indexing Job');
-- this should fail since test_user does not have permission to drop the table
-- sql line for do not stop on error
SELECT lantern.async_task('DROP TABLE "sift_base1k_UpperCase";', 'Dropping Table Job');
SELECT pg_sleep(4);
SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done, status, error_message FROM lantern.tasks ORDER BY jobid;
Loading