From e19d367c097d790146c783ba3637ee41b2abeb50 Mon Sep 17 00:00:00 2001 From: Narek Galstyan Date: Wed, 17 Apr 2024 03:31:51 +0000 Subject: [PATCH 1/3] Fix permissions for non-superusers to use lantern.async_task - Grant necessary permissions to pg_cron resources and lantern.tasks table - Refactor pg_cron unscheduling logic to bypass pg_cron unscheduling related issue: citusdata/pg_cron#320 https://github.com/citusdata/pg_cron/issues/320 --- CMakeLists.txt | 1 + sql/lantern.sql | 46 +++++++++++++++-- sql/updates/0.2.4--0.2.5.sql | 97 +++++++++++++++++++++++++++++++++++ test/expected/async_tasks.out | 48 +++++++++++++++++ test/sql/async_tasks.sql | 20 ++++++++ 5 files changed, 209 insertions(+), 3 deletions(-) create mode 100644 sql/updates/0.2.4--0.2.5.sql diff --git a/CMakeLists.txt b/CMakeLists.txt index 264b5981..8c542809 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -272,6 +272,7 @@ set (_update_files sql/updates/0.2.1--0.2.2.sql sql/updates/0.2.2--0.2.3.sql sql/updates/0.2.3--0.2.4.sql + sql/updates/0.2.4--0.2.5.sql ) # Generate version information for the binary diff --git a/sql/lantern.sql b/sql/lantern.sql index e68940e6..55f53004 100644 --- a/sql/lantern.sql +++ b/sql/lantern.sql @@ -487,6 +487,7 @@ BEGIN RAISE NOTICE 'pg_cron extension not found. Skipping lantern async task setup'; RETURN; END IF; + GRANT USAGE ON SCHEMA cron TO PUBLIC; CREATE TABLE lantern.tasks ( jobid bigserial primary key, @@ -500,7 +501,8 @@ BEGIN error_message text ); - GRANT SELECT ON lantern.tasks TO public; + GRANT SELECT, INSERT, UPDATE, DELETE ON lantern.tasks TO public; + GRANT USAGE, SELECT ON SEQUENCE lantern.tasks_jobid_seq TO public; ALTER TABLE lantern.tasks ENABLE ROW LEVEL SECURITY; CREATE POLICY lantern_tasks_policy ON lantern.tasks USING (username OPERATOR(pg_catalog.=) current_user); @@ -521,7 +523,7 @@ BEGIN -- Get the job name from the jobid -- Call the job finalizer if corresponding job exists BOTH in lantern async tasks AND -- active cron jobs - UPDATE lantern.tasks t SET + UPDATE lantern.tasks t SET (duration, status, error_message, pg_cron_job_name) = (run.end_time - t.started_at, NEW.status, CASE WHEN NEW.status = 'failed' THEN return_message ELSE NULL END, c.jobname ) @@ -532,9 +534,16 @@ BEGIN t.pg_cron_job_name = c.jobname AND c.jobid = NEW.jobid -- using returning as a trick to run the unschedule function as a side effect - RETURNING cron.unschedule(t.pg_cron_job_name) INTO res; + -- Note: have to unschedule by jobid because of pg_cron#320 https://github.com/citusdata/pg_cron/issues/320 + RETURNING cron.unschedule(t.jobid) INTO res; RETURN NEW; + + EXCEPTION + WHEN OTHERS THEN + RAISE WARNING 'Lantern Async tasks: Unknown job failure in % % %', NEW, SQLERRM, SQLSTATE; + PERFORM cron.unschedule(NEW.jobid); + RETURN NEW; END $$ LANGUAGE plpgsql; @@ -887,3 +896,34 @@ $weighted_vector_search$ LANGUAGE plpgsql; SELECT _lantern_internal.maybe_setup_weighted_vector_search(); DROP FUNCTION _lantern_internal.maybe_setup_weighted_vector_search; + +-- helper function to mask large vectors in explain outputs of queries containing vectors +CREATE OR REPLACE FUNCTION lantern.masked_explain( + query text, + do_analyze boolean = true, + buffers boolean = true, + costs boolean = true, + timing boolean = true +) RETURNS text AS $$ +DECLARE + explain_query text; + explain_output jsonb; + flags text = ''; +BEGIN + IF do_analyze THEN + flags := flags || 'ANALYZE, '; + END IF; + IF buffers THEN + flags := flags || 'BUFFERS, '; + END IF; + IF costs THEN + flags := flags || 'COSTS, '; + END IF; + IF timing THEN + flags := flags || 'TIMING '; + END IF; + explain_query := format('EXPLAIN (%s, FORMAT JSON) %s', flags, query); + EXECUTE explain_query INTO explain_output; + RETURN jsonb_pretty(_lantern_internal.mask_order_by_in_plan(explain_output)); +END $$ LANGUAGE plpgsql; + diff --git a/sql/updates/0.2.4--0.2.5.sql b/sql/updates/0.2.4--0.2.5.sql new file mode 100644 index 00000000..ae1fc0a5 --- /dev/null +++ b/sql/updates/0.2.4--0.2.5.sql @@ -0,0 +1,97 @@ + +DROP TRIGGER IF EXISTS status_change_trigger ON cron.job_run_details; +DROP FUNCTION IF EXISTS _lantern_internal.async_task_finalizer_trigger(); + +DO $async_update$ + +BEGIN + IF NOT (SELECT EXISTS (SELECT 1 FROM information_schema.schemata WHERE schema_name = 'cron')) + THEN + RAISE NOTICE 'pg_cron extension not found. Skipping lantern async task setup'; + RETURN; + END IF; + + GRANT USAGE ON SCHEMA cron TO PUBLIC; + GRANT SELECT, INSERT, UPDATE, DELETE ON lantern.tasks TO public; + GRANT USAGE, SELECT ON SEQUENCE lantern.tasks_jobid_seq TO public; + + -- create a trigger and added to cron.job_run_details + CREATE OR REPLACE FUNCTION _lantern_internal.async_task_finalizer_trigger() RETURNS TRIGGER AS $$ + DECLARE + res RECORD; + BEGIN + -- if NEW.status is one of "starting", "running", "sending, "connecting", return + IF NEW.status IN ('starting', 'running', 'sending', 'connecting') THEN + RETURN NEW; + END IF; + + IF NEW.status NOT IN ('succeeded', 'failed') THEN + RAISE WARNING 'Lantern Async tasks: Unexpected status %', NEW.status; + END IF; + + -- Get the job name from the jobid + -- Call the job finalizer if corresponding job exists BOTH in lantern async tasks AND + -- active cron jobs + UPDATE lantern.tasks t SET + (duration, status, error_message, pg_cron_job_name) = (run.end_time - t.started_at, NEW.status, + CASE WHEN NEW.status = 'failed' THEN return_message ELSE NULL END, + c.jobname ) + FROM cron.job c + LEFT JOIN cron.job_run_details run + ON c.jobid = run.jobid + WHERE + t.pg_cron_job_name = c.jobname AND + c.jobid = NEW.jobid + -- using returning as a trick to run the unschedule function as a side effect + -- Note: have to unschedule by jobid because of pg_cron#320 https://github.com/citusdata/pg_cron/issues/320 + RETURNING cron.unschedule(t.jobid) INTO res; + + RETURN NEW; + + EXCEPTION + WHEN OTHERS THEN + RAISE WARNING 'Lantern Async tasks: Unknown job failure in % % %', NEW, SQLERRM, SQLSTATE; + PERFORM cron.unschedule(NEW.jobid); + RETURN NEW; + END + $$ LANGUAGE plpgsql; + + CREATE TRIGGER status_change_trigger + AFTER UPDATE OF status + ON cron.job_run_details + FOR EACH ROW + WHEN (OLD.status IS DISTINCT FROM NEW.status) + EXECUTE FUNCTION _lantern_internal.async_task_finalizer_trigger(); + +$async_update$ +LANGUAGE plpgsql; + +-- helper function to mask large vectors in explain outputs of queries containing vectors +CREATE OR REPLACE FUNCTION lantern.masked_explain( + query text, + do_analyze boolean = true, + buffers boolean = true, + costs boolean = true, + timing boolean = true +) RETURNS text AS $$ +DECLARE + explain_query text; + explain_output jsonb; + flags text = ''; +BEGIN + IF do_analyze THEN + flags := flags || 'ANALYZE, '; + END IF; + IF buffers THEN + flags := flags || 'BUFFERS, '; + END IF; + IF costs THEN + flags := flags || 'COSTS, '; + END IF; + IF timing THEN + flags := flags || 'TIMING '; + END IF; + explain_query := format('EXPLAIN (%s, FORMAT JSON) %s', flags, query); + EXECUTE explain_query INTO explain_output; + RETURN jsonb_pretty(_lantern_internal.mask_order_by_in_plan(explain_output)); +END $$ LANGUAGE plpgsql; diff --git a/test/expected/async_tasks.out b/test/expected/async_tasks.out index 9ce3a898..cb25c4ec 100644 --- a/test/expected/async_tasks.out +++ b/test/expected/async_tasks.out @@ -193,3 +193,51 @@ SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done (4 rows) -- NOTE: the test finishes but the async index creation may still be in progress +-- create non superuser and test the function +SET client_min_messages = WARNING; +-- suppress NOTICE: role "test_user" does not exist, skipping +DROP USER IF EXISTS test_user_async; +SET client_min_messages = NOTICE; +CREATE USER test_user_async WITH PASSWORD 'test_password'; +GRANT SELECT ON "sift_base1k_UpperCase" TO test_user_async; +GRANT SELECT ON sift_base1k_id_seq TO test_user_async; +SET ROLE test_user_async; +SELECT lantern.async_task($$SELECT 1$$, 'simple job'); +NOTICE: Job scheduled with pg_cron name: 'async_task_5' + async_task +------------ + 5 +(1 row) + +SELECT lantern.async_task($$CREATE INDEX idx2 ON "sift_base1k_UpperCase" USING lantern_hnsw (v) WITH (dim=128, M=6);$$, 'Indexing Job'); +NOTICE: Job scheduled with pg_cron name: 'async_task_6' + async_task +------------ + 6 +(1 row) + +-- this should fail since test_user does not have permission to drop the table +-- sql line for do not stop on error +SELECT lantern.async_task('DROP TABLE "sift_base1k_UpperCase";', 'Dropping Table Job'); +NOTICE: Job scheduled with pg_cron name: 'async_task_7' + async_task +------------ + 7 +(1 row) + +SELECT pg_sleep(4); + pg_sleep +---------- + +(1 row) + +SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done, status, error_message FROM lantern.tasks ORDER BY jobid; + jobid | query | pg_cron_job_name | job_name | is_done | status | error_message +-------+------------------------------------------------------------------------------------------+------------------+--------------------+---------+-----------+------------------------------------------------------ + 5 | SELECT 1 | async_task_5 | simple job | t | succeeded | + 6 | CREATE INDEX idx2 ON "sift_base1k_UpperCase" USING lantern_hnsw (v) WITH (dim=128, M=6); | async_task_6 | Indexing Job | t | failed | ERROR: must be owner of table sift_base1k_UpperCase+ + | | | | | | + 7 | DROP TABLE "sift_base1k_UpperCase"; | async_task_7 | Dropping Table Job | t | failed | ERROR: must be owner of table sift_base1k_UpperCase+ + | | | | | | +(3 rows) + diff --git a/test/sql/async_tasks.sql b/test/sql/async_tasks.sql index 66fe7c09..cf058e28 100644 --- a/test/sql/async_tasks.sql +++ b/test/sql/async_tasks.sql @@ -40,3 +40,23 @@ SELECT _lantern_internal.validate_index('idx', false); SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done, status, error_message FROM lantern.tasks; -- NOTE: the test finishes but the async index creation may still be in progress + +-- create non superuser and test the function +SET client_min_messages = WARNING; +-- suppress NOTICE: role "test_user" does not exist, skipping +DROP USER IF EXISTS test_user_async; +SET client_min_messages = NOTICE; +CREATE USER test_user_async WITH PASSWORD 'test_password'; +GRANT SELECT ON "sift_base1k_UpperCase" TO test_user_async; +GRANT SELECT ON sift_base1k_id_seq TO test_user_async; + +SET ROLE test_user_async; + +SELECT lantern.async_task($$SELECT 1$$, 'simple job'); + +SELECT lantern.async_task($$CREATE INDEX idx2 ON "sift_base1k_UpperCase" USING lantern_hnsw (v) WITH (dim=128, M=6);$$, 'Indexing Job'); +-- this should fail since test_user does not have permission to drop the table +-- sql line for do not stop on error +SELECT lantern.async_task('DROP TABLE "sift_base1k_UpperCase";', 'Dropping Table Job'); +SELECT pg_sleep(4); +SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done, status, error_message FROM lantern.tasks ORDER BY jobid; From d1cfcacdfbf389c1154656d4d72656edd9680b8e Mon Sep 17 00:00:00 2001 From: Narek Galstyan Date: Wed, 17 Apr 2024 03:40:56 +0000 Subject: [PATCH 2/3] Fix relocation test --- test/expected/ext_relocation.out | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/expected/ext_relocation.out b/test/expected/ext_relocation.out index 7ad36497..f73e577d 100644 --- a/test/expected/ext_relocation.out +++ b/test/expected/ext_relocation.out @@ -44,6 +44,7 @@ ORDER BY 1, 3, 2; schema1 | quantize_vector | _lantern_internal schema1 | reindex_lantern_indexes | _lantern_internal schema1 | validate_index | _lantern_internal + schema1 | masked_explain | lantern schema1 | cos_dist | schema1 schema1 | create_pq_codebook | schema1 schema1 | dequantize_vector | schema1 @@ -62,7 +63,7 @@ ORDER BY 1, 3, 2; schema1 | ldb_pqvec_send | schema1 schema1 | quantize_table | schema1 schema1 | quantize_vector | schema1 -(28 rows) +(29 rows) -- show all the extension operators SELECT ne.nspname AS extschema, op.oprname, np.nspname AS proschema From cac228bfb7e64e67802aec7fa86fa62412df0726 Mon Sep 17 00:00:00 2001 From: Narek Galstyan Date: Wed, 17 Apr 2024 03:56:49 +0000 Subject: [PATCH 3/3] Fix update --- sql/updates/0.2.4--0.2.5.sql | 66 ++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/sql/updates/0.2.4--0.2.5.sql b/sql/updates/0.2.4--0.2.5.sql index ae1fc0a5..f8c96344 100644 --- a/sql/updates/0.2.4--0.2.5.sql +++ b/sql/updates/0.2.4--0.2.5.sql @@ -2,8 +2,37 @@ DROP TRIGGER IF EXISTS status_change_trigger ON cron.job_run_details; DROP FUNCTION IF EXISTS _lantern_internal.async_task_finalizer_trigger(); -DO $async_update$ +-- helper function to mask large vectors in explain outputs of queries containing vectors +CREATE OR REPLACE FUNCTION lantern.masked_explain( + query text, + do_analyze boolean = true, + buffers boolean = true, + costs boolean = true, + timing boolean = true +) RETURNS text AS $$ +DECLARE + explain_query text; + explain_output jsonb; + flags text = ''; +BEGIN + IF do_analyze THEN + flags := flags || 'ANALYZE, '; + END IF; + IF buffers THEN + flags := flags || 'BUFFERS, '; + END IF; + IF costs THEN + flags := flags || 'COSTS, '; + END IF; + IF timing THEN + flags := flags || 'TIMING '; + END IF; + explain_query := format('EXPLAIN (%s, FORMAT JSON) %s', flags, query); + EXECUTE explain_query INTO explain_output; + RETURN jsonb_pretty(_lantern_internal.mask_order_by_in_plan(explain_output)); +END $$ LANGUAGE plpgsql; +DO $async_update$ BEGIN IF NOT (SELECT EXISTS (SELECT 1 FROM information_schema.schemata WHERE schema_name = 'cron')) THEN @@ -63,35 +92,6 @@ BEGIN WHEN (OLD.status IS DISTINCT FROM NEW.status) EXECUTE FUNCTION _lantern_internal.async_task_finalizer_trigger(); -$async_update$ -LANGUAGE plpgsql; - --- helper function to mask large vectors in explain outputs of queries containing vectors -CREATE OR REPLACE FUNCTION lantern.masked_explain( - query text, - do_analyze boolean = true, - buffers boolean = true, - costs boolean = true, - timing boolean = true -) RETURNS text AS $$ -DECLARE - explain_query text; - explain_output jsonb; - flags text = ''; -BEGIN - IF do_analyze THEN - flags := flags || 'ANALYZE, '; - END IF; - IF buffers THEN - flags := flags || 'BUFFERS, '; - END IF; - IF costs THEN - flags := flags || 'COSTS, '; - END IF; - IF timing THEN - flags := flags || 'TIMING '; - END IF; - explain_query := format('EXPLAIN (%s, FORMAT JSON) %s', flags, query); - EXECUTE explain_query INTO explain_output; - RETURN jsonb_pretty(_lantern_internal.mask_order_by_in_plan(explain_output)); -END $$ LANGUAGE plpgsql; +END; +$async_update$ LANGUAGE plpgsql; +-- N.B.: the block above may return early. DO NOT put anything down here