From a70a6948c97091488247962f7404e01176f1d7cb Mon Sep 17 00:00:00 2001 From: voidshard Date: Sun, 17 Mar 2024 10:35:32 +0000 Subject: [PATCH] Rework cmds into single binary - Move igor commands into single binary - Adds DB migrate tool - Respeced DB setup into migrations for https://github.com/golang-migrate/migrate - Moved User/Password setting to env vars for easier integration - Fixed flaky unit test --- .gitignore | 2 +- Dockerfile | 39 ++++ Dockerfile.apiserver | 22 -- Dockerfile.worker | 20 -- cmd/apiserver/main.go | 90 -------- cmd/db_migrate/01.tmpl | 110 ---------- cmd/db_migrate/run.sh | 40 ---- cmd/igor/api.go | 56 +++++ cmd/igor/main.go | 46 +++++ cmd/igor/migrate.go | 195 ++++++++++++++++++ cmd/igor/worker.go | 50 +++++ cmd/worker/main.go | 71 ------- go.mod | 15 +- go.sum | 22 ++ migrations/README.md | 6 + migrations/dev/000000_setup_database.down.sql | 4 + .../dev/000000_setup_database.up.sql | 9 +- migrations/prod/100000_create_tables.down.sql | 11 + .../prod/100000_create_tables.up.sql | 2 - pkg/database/options.go | 2 +- pkg/database/postgres.go | 45 ++-- pkg/database/postgres_test.go | 47 ++--- pkg/queue/asynq.go | 2 +- pkg/queue/options.go | 2 +- tests/{Dockerfile.dummyworker => Dockerfile} | 0 tests/compose.yaml | 14 +- tests/dummy_worker.go | 2 +- tests/run.sh | 27 ++- 28 files changed, 518 insertions(+), 433 deletions(-) create mode 100644 Dockerfile delete mode 100644 Dockerfile.apiserver delete mode 100644 Dockerfile.worker delete mode 100644 cmd/apiserver/main.go delete mode 100644 cmd/db_migrate/01.tmpl delete mode 100755 cmd/db_migrate/run.sh create mode 100644 cmd/igor/api.go create mode 100644 cmd/igor/main.go create mode 100644 cmd/igor/migrate.go create mode 100644 cmd/igor/worker.go delete mode 100644 cmd/worker/main.go create mode 100644 migrations/README.md create mode 100644 migrations/dev/000000_setup_database.down.sql rename cmd/db_migrate/00.tmpl => migrations/dev/000000_setup_database.up.sql (70%) create mode 100644 migrations/prod/100000_create_tables.down.sql rename cmd/db_migrate/01.sql => migrations/prod/100000_create_tables.up.sql (98%) rename tests/{Dockerfile.dummyworker => Dockerfile} (100%) diff --git a/.gitignore b/.gitignore index b5d5dca..6effb2d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ -cmd/db_migrate/*.sql +cmd/db_migrate/00.sql **/__pycache__ release.sh diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..33b6df8 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,39 @@ +# Build the application +FROM golang:1.21 + +WORKDIR /go/src/github.com/voidshard/igor +COPY go.mod ./ +COPY go.sum ./ +COPY cmd cmd +COPY pkg pkg +COPY internal internal +RUN go mod download + +RUN CGO_ENABLED=0 GOOS=linux go build -o /igor ./cmd/igor/*.go + +# Create a minimal image +FROM alpine + +ARG USER=app +ARG GROUPNAME=$USER +ARG UID=12345 +ARG GID=23456 + +RUN addgroup \ + --gid "$GID" \ + "$GROUPNAME" \ +&& adduser \ + --disabled-password \ + --gecos "" \ + --home "$(pwd)" \ + --ingroup "$GROUPNAME" \ + --no-create-home \ + --uid "$UID" \ + $USER + +COPY --from=0 /igor /igor +COPY migrations migrations +RUN chown -R $USER:$GROUPNAME /igor migrations + +USER $USER +ENTRYPOINT ["/igor"] diff --git a/Dockerfile.apiserver b/Dockerfile.apiserver deleted file mode 100644 index 9a5b8b1..0000000 --- a/Dockerfile.apiserver +++ /dev/null @@ -1,22 +0,0 @@ -FROM golang:1.21 - -ARG USERNAME=app -ARG USER_UID=1000 -ARG USER_GID=$USER_UID - -RUN groupadd --gid $USER_GID $USERNAME && useradd --uid $USER_UID --gid $USER_GID -m $USERNAME - -WORKDIR /go/src/github.com/voidshard/igor -COPY go.mod ./ -COPY go.sum ./ -COPY cmd cmd -COPY pkg pkg -COPY internal internal -RUN go mod download - -EXPOSE 8100 - -RUN CGO_ENABLED=0 GOOS=linux go build -o /app ./cmd/apiserver/main.go && chown -R $USER_UID:$USER_GID /app - -USER $USERNAME -CMD ["/app"] diff --git a/Dockerfile.worker b/Dockerfile.worker deleted file mode 100644 index 1dbd414..0000000 --- a/Dockerfile.worker +++ /dev/null @@ -1,20 +0,0 @@ -FROM golang:1.21 - -ARG USERNAME=app -ARG USER_UID=1000 -ARG USER_GID=$USER_UID - -RUN groupadd --gid $USER_GID $USERNAME && useradd --uid $USER_UID --gid $USER_GID -m $USERNAME - -WORKDIR /go/src/github.com/voidshard/igor -COPY go.mod ./ -COPY go.sum ./ -COPY cmd cmd -COPY pkg pkg -COPY internal internal -RUN go mod download - -RUN CGO_ENABLED=0 GOOS=linux go build -o /app ./cmd/worker/main.go && chown -R $USER_UID:$USER_GID /app - -USER $USERNAME -CMD ["/app"] diff --git a/cmd/apiserver/main.go b/cmd/apiserver/main.go deleted file mode 100644 index 2d0a9ad..0000000 --- a/cmd/apiserver/main.go +++ /dev/null @@ -1,90 +0,0 @@ -package main - -import ( - "os" - - "github.com/jessevdk/go-flags" - - "github.com/voidshard/igor/internal/utils" - "github.com/voidshard/igor/pkg/api" - "github.com/voidshard/igor/pkg/api/http/server" - "github.com/voidshard/igor/pkg/database" - "github.com/voidshard/igor/pkg/queue" -) - -const ( - // defaults are set in the cmd/db_migrate/run.sh file, which obviously you should set - defaultDatabaseURL = "postgres://igorreadwrite:readwrite@localhost:5432/igor?sslmode=disable&search_path=igor" - - // default to local redis no pass - defaultRedisURL = "redis://localhost:6379/0" -) - -var CLI struct { - Addr string `long:"addr" env:"ADDR" description:"Address to bind to" default:"localhost:8100"` - TLSCert string `long:"cert" env:"CERT" description:"Path to TLS certificate"` - TLSKey string `long:"key" env:"KEY" description:"Path to TLS key"` - - DatabaseURL string `long:"database-url" env:"DATABASE_URL" description:"Database connection string"` - - QueueURL string `long:"queue-url" env:"QUEUE_URL" description:"Queue connection string"` - QueueTLSCert string `long:"queue-tls-cert" env:"QUEUE_TLS_CERT" description:"Queue TLS certificate"` - QueueTLSKey string `long:"queue-tls-key" env:"QUEUE_TLS_KEY" description:"Queue TLS key"` - QueueTLSCaCert string `long:"queue-tls-ca-cert" env:"QUEUE_TLS_CA_CERT" description:"Queue TLS CA certificate"` - - Debug bool `long:"debug" env:"DEBUG" description:"Enable debug logging"` - - StaticDir string `long:"static-dir" env:"STATIC_DIR" default:"" description:"Serve static files from this directory"` -} - -func main() { - // This main runs an API server (in this case, http) so that callers can interact with Igor over HTTP. - // Since this is configured with OptionsClientDefault it does not run any background routines - // that Igor needs to function (ie, to process events, queue tasks etc). - // - // This is intended purely to serve Igor's service API to clients over the network. Though you could - // have one server type do both if you wanted. - // - // If you wished to interact with Igor via. importing the pkg libraries, then you don't need to run this. - // - // Alternatively, you could add more servers under pkg/api/ to serve Igor's API over other protocols like - // gRPC, thrift or whatever you like and modifiy this to serve them all. - - var parser = flags.NewParser(&CLI, flags.Default) - if _, err := parser.Parse(); err != nil { - switch flagsErr := err.(type) { - case flags.ErrorType: - if flagsErr == flags.ErrHelp { - os.Exit(0) - } - os.Exit(1) - default: - os.Exit(1) - } - } - - if CLI.DatabaseURL == "" { - CLI.DatabaseURL = defaultDatabaseURL - } - - if CLI.QueueURL == "" { - CLI.QueueURL = defaultRedisURL - } - tlsCfg, err := utils.TLSConfig(CLI.QueueTLSCaCert, CLI.QueueTLSCert, CLI.QueueTLSKey) - if err != nil { - panic(err) - } - qOpts := &queue.Options{URL: CLI.QueueURL, TLSConfig: tlsCfg} - - api, err := api.New( - &database.Options{URL: CLI.DatabaseURL}, - qOpts, - api.OptionsClientDefault(), - ) - if err != nil { - panic(err) - } - - s := server.NewServer(CLI.Addr, CLI.StaticDir, CLI.TLSCert, CLI.TLSKey, CLI.Debug) - s.ServeForever(api) -} diff --git a/cmd/db_migrate/01.tmpl b/cmd/db_migrate/01.tmpl deleted file mode 100644 index c9c8f5a..0000000 --- a/cmd/db_migrate/01.tmpl +++ /dev/null @@ -1,110 +0,0 @@ --- Should be run as igor read-write user --- (so this user owns tables) - --- notify_event function sends a JSON notification to channel 'igor_events' on insert, update, or delete. --- We only need this on Layer and Task tables - this is obviously expensive to do, but considered less --- expensive than polling the database for changes (and a great deal easier / less promlematic). -CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$ - DECLARE - old_data json; - new_data json; - notification json; - BEGIN - -- Convert the old or new row to JSON, based on the kind of action. - -- Args as bytea is not a valid JSON type .. so we remove it - -- we actually don't need it for deciding what to do with the event anyway - IF (TG_OP = 'DELETE') THEN - old_data = row_to_json(OLD)::jsonb - 'args'; - new_data = NULL; - ELSIF (TG_OP = 'INSERT') THEN - old_data = NULL; - new_data = row_to_json(NEW)::jsonb - 'args'; - ELSE - old_data = row_to_json(OLD)::jsonb - 'args'; - new_data = row_to_json(NEW)::jsonb - 'args'; - END IF; - -- Contruct the notification as a JSON string. - notification = json_build_object('table',TG_TABLE_NAME, 'old', old_data, 'new', new_data); - -- Execute pg_notify(channel, notification) - PERFORM pg_notify('igor_events',notification::text); - -- Result is ignored since this is an AFTER trigger - RETURN NULL; - END; -$$ LANGUAGE plpgsql; - --- create_partition_and_insert function creates a partition table if it doesn't exist and inserts the row. --- This is easier to deal with than installing partman or using cronjobs to create partitions, though, less --- performant. --- Partitioning makes is easy to remove old data from the DB (either DROP or some kind of archiving). -CREATE OR REPLACE FUNCTION create_partition_and_insert() RETURNS trigger AS $$ - DECLARE - partition_date TEXT; - partition TEXT; - BEGIN - partition_date = to_char(to_timestamp(NEW.created_at)::date, 'YYYY_MM_DD'); - partition = TG_TABLE_NAME || '_' || partition_date; - IF NOT EXISTS(SELECT relname FROM pg_class WHERE relname=partition) THEN - EXECUTE 'CREATE TABLE ' || partition || ' (check (to_timestamp(created_at)::date = ''' || to_timestamp(NEW.created_at)::date || ''')) INHERITS (' || TG_TABLE_NAME || ');'; - IF (TG_TABLE_NAME = LOWER('${TABLE_LAYERS}')) THEN - EXECUTE 'CREATE TRIGGER ' || partition || '_notify_event AFTER INSERT OR UPDATE OR DELETE ON ' || partition || ' FOR EACH ROW EXECUTE PROCEDURE notify_event();'; - ELSIF (TG_TABLE_NAME = LOWER('${TABLE_TASKS}')) THEN - EXECUTE 'CREATE TRIGGER ' || partition || '_notify_event AFTER INSERT OR UPDATE OR DELETE ON ' || partition || ' FOR EACH ROW EXECUTE PROCEDURE notify_event();'; - END IF; - EXECUTE 'GRANT SELECT ON TABLE ' || partition || ' TO igorreadonly;'; - EXECUTE 'GRANT ALL ON TABLE ' || partition || ' TO igorreadwrite;'; - END IF; - EXECUTE 'INSERT INTO ' || partition || ' SELECT(' || TG_TABLE_NAME || ' ' || quote_literal(NEW) || ').*;'; - RETURN NULL; - END; -$$ LANGUAGE plpgsql VOLATILE COST 101; - -CREATE TABLE IF NOT EXISTS ${TABLE_JOBS} ( - name VARCHAR(255), - id VARCHAR(36) PRIMARY KEY, - status VARCHAR(12) NOT NULL DEFAULT 'PENDING', - etag VARCHAR(36) NOT NULL, - created_at BIGINT NOT NULL DEFAULT 0, - updated_at BIGINT NOT NULL DEFAULT 0, - UNIQUE (id, created_at) -) WITH (OIDS=FALSE); -CREATE OR REPLACE TRIGGER ${TABLE_JOBS}_partition_trigger BEFORE INSERT ON ${TABLE_JOBS} FOR EACH ROW EXECUTE PROCEDURE create_partition_and_insert(); - --- Layer table has no FK to Job as we can only ever create Layers in CreateJob API call, in which we create --- the Job and it's Layers in a transaction (and optionally Tasks too) -CREATE TABLE IF NOT EXISTS ${TABLE_LAYERS} ( - name VARCHAR(255), - paused_at BIGINT NOT NULL DEFAULT 0, - priority INT NOT NULL DEFAULT 0, - id VARCHAR(36) PRIMARY KEY, - status VARCHAR(12) NOT NULL DEFAULT 'PENDING', - etag VARCHAR(36) NOT NULL, - job_id VARCHAR(36) NOT NULL, - created_at BIGINT NOT NULL DEFAULT 0, - updated_at BIGINT NOT NULL DEFAULT 0, - UNIQUE (id, created_at) -) WITH (OIDS=FALSE); -CREATE OR REPLACE TRIGGER ${TABLE_LAYERS}_notify_event AFTER INSERT OR UPDATE OR DELETE ON ${TABLE_LAYERS} FOR EACH ROW EXECUTE PROCEDURE notify_event(); -CREATE OR REPLACE TRIGGER ${TABLE_LAYERS}_partition_trigger BEFORE INSERT ON ${TABLE_LAYERS} FOR EACH ROW EXECUTE PROCEDURE create_partition_and_insert(); - -CREATE TABLE IF NOT EXISTS ${TABLE_TASKS} ( - type VARCHAR(255) NOT NULL, - args BYTEA, - name VARCHAR(255), - paused_at BIGINT NOT NULL DEFAULT 0, - retries INT NOT NULL DEFAULT 0, - id VARCHAR(36) PRIMARY KEY, - status VARCHAR(12) NOT NULL DEFAULT 'PENDING', - etag VARCHAR(36) NOT NULL, - job_id VARCHAR(36) NOT NULL, - layer_id VARCHAR(36) NOT NULL, - queue_task_id VARCHAR(255) NOT NULL, - message TEXT, - created_at BIGINT NOT NULL DEFAULT 0, - updated_at BIGINT NOT NULL DEFAULT 0, - CONSTRAINT fk_layer_id FOREIGN KEY (layer_id) REFERENCES ${TABLE_LAYERS}(id) ON DELETE CASCADE, - UNIQUE (id, created_at) -) WITH (OIDS=FALSE); -CREATE OR REPLACE TRIGGER ${TABLE_TASKS}_notify_event AFTER INSERT OR UPDATE OR DELETE ON ${TABLE_TASKS} FOR EACH ROW EXECUTE PROCEDURE notify_event(); -CREATE OR REPLACE TRIGGER ${TABLE_TASKS}_partition_trigger BEFORE INSERT ON ${TABLE_TASKS} FOR EACH ROW EXECUTE PROCEDURE create_partition_and_insert(); - -GRANT SELECT ON ALL TABLES IN SCHEMA igor TO igorreadonly; diff --git a/cmd/db_migrate/run.sh b/cmd/db_migrate/run.sh deleted file mode 100755 index ff2a734..0000000 --- a/cmd/db_migrate/run.sh +++ /dev/null @@ -1,40 +0,0 @@ -#!/bin/bash - -# Sets up a Postgres DB for Igor - -set -eu - -# Expects env variables: -# -## How to reach postgres -# - PGHOST -# - PGPORT -# - PGUSER -# - PGPASS -# -## What we're going to set -# - PGUSER_READWRITE_PASS -# - PGUSER_READONLY_PASS - -SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) -cd $SCRIPT_DIR - -# Load env variables -PGHOST=${PGHOST:-localhost} -PGPORT=${PGPORT:-5432} -PGUSER=${PGUSER:-postgres} - -# Template user passwords -RW_PASS=${PGUSER_READWRITE_PASS:-readwrite} \ -RO_PASS=${PGUSER_READONLY_PASS:-readonly} \ -envsubst <00.tmpl >00.sql - -# Template user passwords -TABLE_JOBS=Job \ -TABLE_LAYERS=Layer \ -TABLE_TASKS=Task \ -envsubst <01.tmpl >01.sql - -# Run -PGPASSWORD=${PGPASSWORD:-test} psql -h $PGHOST -p $PGPORT -U $PGUSER -d postgres -f 00.sql -PGPASSWORD=${PGUSER_READWRITE_PASS:-readwrite} psql -h $PGHOST -p $PGPORT -U igorreadwrite -d igor -f 01.sql diff --git a/cmd/igor/api.go b/cmd/igor/api.go new file mode 100644 index 0000000..2edd901 --- /dev/null +++ b/cmd/igor/api.go @@ -0,0 +1,56 @@ +package main + +import ( + "github.com/voidshard/igor/internal/utils" + "github.com/voidshard/igor/pkg/api" + "github.com/voidshard/igor/pkg/api/http/server" + "github.com/voidshard/igor/pkg/database" + "github.com/voidshard/igor/pkg/queue" +) + +const ( + docApi = `Run the API server` +) + +type optsAPI struct { + optsGeneral + optsDatabase + optsQueue + + Addr string `long:"addr" env:"ADDR" description:"Address to bind to" default:"localhost:8100"` + TLSCert string `long:"cert" env:"CERT" description:"Path to TLS certificate"` + TLSKey string `long:"key" env:"KEY" description:"Path to TLS key"` + + StaticDir string `long:"static-dir" env:"STATIC_DIR" default:"" description:"Serve static files from this directory"` +} + +func (c *optsAPI) Execute(args []string) error { + // This main runs an API server (in this case, http) so that callers can interact with Igor over HTTP. + // Since this is configured with OptionsClientDefault it does not run any background routines + // that Igor needs to function (ie, to process events, queue tasks etc). + // + // This is intended purely to serve Igor's service API to clients over the network. Though you could + // have one server type do both if you wanted. + // + // If you wished to interact with Igor via. importing the pkg libraries, then you don't need to run this. + // + // Alternatively, you could add more servers under pkg/api/ to serve Igor's API over other protocols like + // gRPC, thrift or whatever you like and modifiy this to serve them all. + tlsCfg, err := utils.TLSConfig(c.QueueTLSCaCert, c.QueueTLSCert, c.QueueTLSKey) + if err != nil { + panic(err) + } + qOpts := &queue.Options{URL: c.QueueURL, TLSConfig: tlsCfg} + + api, err := api.New( + &database.Options{URL: c.DatabaseURL}, + qOpts, + api.OptionsClientDefault(), + ) + if err != nil { + panic(err) + } + + s := server.NewServer(c.Addr, c.StaticDir, c.TLSCert, c.TLSKey, c.Debug) + return s.ServeForever(api) +} diff --git a/cmd/igor/main.go b/cmd/igor/main.go new file mode 100644 index 0000000..c045208 --- /dev/null +++ b/cmd/igor/main.go @@ -0,0 +1,46 @@ +package main + +import ( + "os" + + "github.com/jessevdk/go-flags" +) + +type optsDatabase struct { + DatabaseURL string `long:"database-url" env:"DATABASE_URL" description:"Database connection string" default:"postgres://$DATABASE_USER:$DATABASE_PASSWORD@localhost:5432/igor?sslmode=disable&search_path=igor"` +} + +type optsQueue struct { + QueueURL string `long:"queue-url" env:"QUEUE_URL" description:"Queue connection string" default:"redis://localhost:6379/0"` + QueueTLSCert string `long:"queue-tls-cert" env:"QUEUE_TLS_CERT" description:"Queue TLS certificate"` + QueueTLSKey string `long:"queue-tls-key" env:"QUEUE_TLS_KEY" description:"Queue TLS key"` + QueueTLSCaCert string `long:"queue-tls-ca-cert" env:"QUEUE_TLS_CA_CERT" description:"Queue TLS CA certificate"` +} + +type optsGeneral struct { + Debug bool `long:"debug" env:"DEBUG" description:"Enable debug logging"` +} + +var cmdAPI optsAPI +var cmdWorker optsWorker +var parser = flags.NewParser(nil, flags.Default) + +func init() { + parser.AddCommand("api", "Run Igor API Server", docApi, &cmdAPI) + parser.AddCommand("worker", "Run Igor background worker", docWorker, &cmdWorker) + parser.AddCommand("migrate", "Postgres Migration Operations", docMigrate, &optsMigrate{}) +} + +func main() { + if _, err := parser.Parse(); err != nil { + switch flagsErr := err.(type) { + case flags.ErrorType: + if flagsErr == flags.ErrHelp { + os.Exit(0) + } + os.Exit(1) + default: + os.Exit(1) + } + } +} diff --git a/cmd/igor/migrate.go b/cmd/igor/migrate.go new file mode 100644 index 0000000..e84a5e8 --- /dev/null +++ b/cmd/igor/migrate.go @@ -0,0 +1,195 @@ +package main + +import ( + "errors" + "fmt" + "os" + "strings" + "time" + + "database/sql" + + _ "github.com/lib/pq" + + "github.com/golang-migrate/migrate/v4" + "github.com/golang-migrate/migrate/v4/database/postgres" + _ "github.com/golang-migrate/migrate/v4/source/file" + "github.com/voidshard/igor/pkg/database" +) + +const ( + docMigrate = `Run the migration` +) + +type optsMigrateGeneral struct { + Source string `long:"source" env:"MIGRATIONS_SOURCE" description:"Source of the migrations" default:"file://migrations/prod"` +} + +type cmdMigrateUp struct { + optsMigrateGeneral + optsGeneral + optsDatabase + + Steps int `long:"steps" env:"MIGRATIONS_UP_STEPS" description:"Number of steps to migrate up (defaults to all)"` +} + +type cmdMigrateDown struct { + optsMigrateGeneral + optsGeneral + optsDatabase + + Steps int `long:"steps" env:"MIGRATIONS_DOWN_STEPS" description:"Number of steps to migrate down (defaults to all)"` +} + +type cmdMigrateVersion struct { + optsMigrateGeneral + optsGeneral + optsDatabase +} + +type cmdMigrateForce struct { + optsMigrateGeneral + optsGeneral + optsDatabase + + Version int `long:"version" env:"MIGRATIONS_FORCE_VERSION" description:"Force-set the current version of the database"` +} + +type cmdMigrateWait struct { + optsMigrateGeneral + optsGeneral + optsDatabase + + Version int `long:"version" env:"MIGRATIONS_WAIT_VERSION" description:"Min version to wait for"` + Timeout time.Duration `long:"timeout" env:"MIGRATIONS_WAIT_TIMEOUT" description:"Time to wait before erroring (optional)"` +} + +type optsMigrate struct { + Up cmdMigrateUp `command:"up" description:"Up version the database"` + + Down cmdMigrateDown `command:"down" description:"Down version the database"` + + Version cmdMigrateVersion `command:"version" description:"Get or force-set the current version of the database"` + + Wait cmdMigrateWait `command:"wait" description:"Wait (block) for the database to be at least the given version"` +} + +func (c *cmdMigrateForce) Execute(args []string) error { + m, db, err := buildMigrate(c.Source, &database.Options{URL: c.DatabaseURL}) + if err != nil { + return err + } + defer db.Close() + + return m.Force(c.Version) +} + +func (c *cmdMigrateUp) Execute(args []string) error { + m, db, err := buildMigrate(c.Source, &database.Options{URL: c.DatabaseURL}) + if err != nil { + return err + } + defer db.Close() + + if c.Steps < 0 { // force steps to be positive + c.Steps = -1 * c.Steps + } + + if c.Steps == 0 { + return m.Up() + } else { + return m.Steps(c.Steps) + } +} + +func (c *cmdMigrateDown) Execute(args []string) error { + m, db, err := buildMigrate(c.Source, &database.Options{URL: c.DatabaseURL}) + if err != nil { + return err + } + defer db.Close() + + if c.Steps > 0 { // force steps to be negative + c.Steps = -1 * c.Steps + } + + if c.Steps == 0 { + return m.Down() + } else { + return m.Steps(c.Steps) + } +} + +func (c *cmdMigrateVersion) Execute(args []string) error { + m, db, err := buildMigrate(c.Source, &database.Options{URL: c.DatabaseURL}) + if err != nil { + return err + } + defer db.Close() + + v, err := getVersion(m) + if err != nil { + return err + } + fmt.Println(v) + return nil +} + +func (c *cmdMigrateWait) Execute(args []string) error { + m, db, err := buildMigrate(c.Source, &database.Options{URL: c.DatabaseURL}) + if err != nil { + return err + } + defer db.Close() + + end := time.Now().Add(c.Timeout) + for { + v, err := getVersion(m) + if err != nil { + return err + } + if v >= c.Version { + return nil + } + if c.Timeout > 0 && time.Now().After(end) { + return errors.New(fmt.Sprintf("timeout waiting for version %d", c.Version)) + } + time.Sleep(1 * time.Second) + } +} + +func getVersion(m *migrate.Migrate) (int, error) { + vuint, _, err := m.Version() + if errors.Is(err, migrate.ErrNilVersion) { + return -1, nil + } + if err != nil { + return -1, err + } + return int(vuint), nil +} + +func buildMigrate(source string, opts *database.Options) (*migrate.Migrate, *sql.DB, error) { + opts.SetDefaults() + opts.URL = strings.Replace(opts.URL, "$"+opts.UsernameEnvVar, os.Getenv(opts.UsernameEnvVar), 1) + opts.URL = strings.Replace(opts.URL, "$"+opts.PasswordEnvVar, os.Getenv(opts.PasswordEnvVar), 1) + + db, err := sql.Open("postgres", opts.URL) + if err != nil { + return nil, nil, err + } + + driver, err := postgres.WithInstance(db, &postgres.Config{}) + if err != nil { + defer db.Close() + return nil, nil, err + } + + m, err := migrate.NewWithDatabaseInstance(source, "postgres", driver) + if err != nil { + defer db.Close() + return nil, nil, err + } + + return m, db, nil +} diff --git a/cmd/igor/worker.go b/cmd/igor/worker.go new file mode 100644 index 0000000..00eae9d --- /dev/null +++ b/cmd/igor/worker.go @@ -0,0 +1,50 @@ +package main + +import ( + "os" + "os/signal" + + "github.com/voidshard/igor/internal/utils" + "github.com/voidshard/igor/pkg/api" + "github.com/voidshard/igor/pkg/database" + "github.com/voidshard/igor/pkg/queue" +) + +const ( + docWorker = `Run Igor background worker` +) + +type optsWorker struct { + optsGeneral + optsDatabase + optsQueue +} + +func (c *optsWorker) Execute(args []string) error { + // This main runs an Igor internal server. That is, it runs some number of internal worker routines + // to process igor events, queue tasks, push status updates or whatever else. + // + // This is intended to be run internal background processes, not to serve Igor's API to clients. + // Though you could have one server type do both if you wanted. + tlsCfg, err := utils.TLSConfig(c.QueueTLSCaCert, c.QueueTLSCert, c.QueueTLSKey) + if err != nil { + panic(err) + } + + api, err := api.New( + &database.Options{URL: c.DatabaseURL}, + &queue.Options{URL: c.QueueURL, TLSConfig: tlsCfg}, + api.OptionsServerDefault(), + ) + if err != nil { + panic(err) + } + + defer api.Close() + + exit := make(chan os.Signal, 1) + signal.Notify(exit, os.Interrupt) + <-exit + + return nil +} diff --git a/cmd/worker/main.go b/cmd/worker/main.go deleted file mode 100644 index 9350182..0000000 --- a/cmd/worker/main.go +++ /dev/null @@ -1,71 +0,0 @@ -package main - -import ( - "os" - "os/signal" - - "github.com/jessevdk/go-flags" - - "github.com/voidshard/igor/pkg/api" - "github.com/voidshard/igor/pkg/database" - "github.com/voidshard/igor/pkg/queue" -) - -const ( - // defaults are set in the cmd/db_migrate/run.sh file, which obviously you should set - defaultDatabaseURL = "postgres://igorreadwrite:readwrite@localhost:5432/igor?sslmode=disable&search_path=igor" - - // default to local redis no pass - defaultRedisURL = "redis://localhost:6379/0" -) - -var CLI struct { - DatabaseURL string `long:"database-url" env:"DATABASE_URL" description:"Database connection string"` - - RedisURL string `long:"redis-url" env:"REDIS_URL" description:"Redis connection string"` - - Debug bool `long:"debug" env:"DEBUG" description:"Enable debug logging"` -} - -func main() { - // This main runs an Igor internal server. That is, it runs some number of internal worker routines - // to process igor events, queue tasks, push status updates or whatever else. - // - // This is intended to be run internal background processes, not to serve Igor's API to clients. - // Though you could have one server type do both if you wanted. - - var parser = flags.NewParser(&CLI, flags.Default) - if _, err := parser.Parse(); err != nil { - switch flagsErr := err.(type) { - case flags.ErrorType: - if flagsErr == flags.ErrHelp { - os.Exit(0) - } - os.Exit(1) - default: - os.Exit(1) - } - } - - if CLI.DatabaseURL == "" { - CLI.DatabaseURL = defaultDatabaseURL - } - if CLI.RedisURL == "" { - CLI.RedisURL = defaultRedisURL - } - - api, err := api.New( - &database.Options{URL: CLI.DatabaseURL}, - &queue.Options{URL: CLI.RedisURL}, - api.OptionsServerDefault(), - ) - if err != nil { - panic(err) - } - - defer api.Close() - - exit := make(chan os.Signal, 1) - signal.Notify(exit, os.Interrupt) - <-exit -} diff --git a/go.mod b/go.mod index f7668e2..1dc0ddb 100644 --- a/go.mod +++ b/go.mod @@ -15,21 +15,26 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/golang/protobuf v1.5.2 // indirect - github.com/google/uuid v1.2.0 // indirect + github.com/golang-migrate/migrate/v4 v4.17.0 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/google/uuid v1.4.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/lib/pq v1.10.9 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/redis/go-redis/v9 v9.0.3 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/spf13/cast v1.3.1 // indirect + go.uber.org/atomic v1.7.0 // indirect golang.org/x/crypto v0.17.0 // indirect - golang.org/x/sync v0.1.0 // indirect + golang.org/x/sync v0.5.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect - google.golang.org/protobuf v1.26.0 // indirect + golang.org/x/time v0.3.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 21a0725..491ad7a 100644 --- a/go.sum +++ b/go.sum @@ -9,16 +9,27 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/golang-migrate/migrate/v4 v4.17.0 h1:rd40H3QXU0AA4IoLllFcEAEo9dYKRHYND2gB4p7xcaU= +github.com/golang-migrate/migrate/v4 v4.17.0/go.mod h1:+Cp2mtLP4/aXDTKb9wmXYitdrNx2HGs45rbWAo6OsKM= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hibiken/asynq v0.24.1 h1:+5iIEAyA9K/lcSPvx3qoPtsKJeKI5u9aOIvUmSsazEw= github.com/hibiken/asynq v0.24.1/go.mod h1:u5qVeSbrnfT+vtG5Mq8ZPzQu/BmCKMHvTGb91uy9Tts= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= @@ -37,6 +48,8 @@ github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NB github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.0.3 h1:+7mmR26M0IvyLxGZUHxu4GiBkJkVDid0Un+j4ScYu4k= @@ -54,6 +67,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/mock v0.3.0 h1:3mUxI1No2/60yUYax92Pt8eNOEecx2D3lcXZh2NEZJo= @@ -72,6 +87,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -88,6 +105,8 @@ golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -97,9 +116,12 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/migrations/README.md b/migrations/README.md new file mode 100644 index 0000000..585d148 --- /dev/null +++ b/migrations/README.md @@ -0,0 +1,6 @@ +DB Migrations for Postgres. + + +/dev contains migrations suitable for testing; it creates a database, user(s) with default weak passwords (again, for testing), schema etc. + +/prod creates our main tables & functions, implies database, schema & users exist already. diff --git a/migrations/dev/000000_setup_database.down.sql b/migrations/dev/000000_setup_database.down.sql new file mode 100644 index 0000000..e12a32d --- /dev/null +++ b/migrations/dev/000000_setup_database.down.sql @@ -0,0 +1,4 @@ +--- teardown database +DROP DATABASE IF EXISTS igor; +DROP USER IF EXISTS igorreadwrite; +DROP USER IF EXISTS igorreadonly; diff --git a/cmd/db_migrate/00.tmpl b/migrations/dev/000000_setup_database.up.sql similarity index 70% rename from cmd/db_migrate/00.tmpl rename to migrations/dev/000000_setup_database.up.sql index 89a4dba..7198440 100644 --- a/cmd/db_migrate/00.tmpl +++ b/migrations/dev/000000_setup_database.up.sql @@ -1,16 +1,17 @@ -- Should be run as pg admin user (to set up schema & roles) ---- Setup database, schema and roles +--- Setup database CREATE DATABASE igor; -CREATE USER igorreadwrite WITH PASSWORD '${RW_PASS}'; +CREATE USER igorreadwrite WITH PASSWORD 'readwrite'; GRANT CONNECT ON DATABASE igor TO igorreadwrite; GRANT ALL PRIVILEGES ON DATABASE igor TO igorreadwrite; -CREATE USER igorreadonly WITH PASSWORD '${RO_PASS}'; +CREATE USER igorreadonly WITH PASSWORD 'readonly'; GRANT CONNECT ON DATABASE igor TO igorreadonly; -\c igor +--- Setup schema and roles, implies we are in the igor database +\connect igor CREATE SCHEMA IF NOT EXISTS igor; ALTER SCHEMA igor OWNER TO igorreadwrite; diff --git a/migrations/prod/100000_create_tables.down.sql b/migrations/prod/100000_create_tables.down.sql new file mode 100644 index 0000000..585b86a --- /dev/null +++ b/migrations/prod/100000_create_tables.down.sql @@ -0,0 +1,11 @@ +DROP TRIGGER IF EXISTS Job_notify_event ON Job; +DROP TRIGGER IF EXISTS Layer_notify_event ON Layer; +DROP TRIGGER IF EXISTS Task_notify_event ON Task; +DROP TRIGGER IF EXISTS Job_partition_trigger ON Job; +DROP TRIGGER IF EXISTS Layer_partition_trigger ON Layer; +DROP TRIGGER IF EXISTS Task_partition_trigger ON Task; +DROP FUNCTION IF EXISTS notify_event(); +DROP FUNCTION IF EXISTS create_partition_and_insert(); +DROP TABLE IF EXISTS Job; +DROP TABLE IF EXISTS Layer; +DROP TABLE IF EXISTS Task; diff --git a/cmd/db_migrate/01.sql b/migrations/prod/100000_create_tables.up.sql similarity index 98% rename from cmd/db_migrate/01.sql rename to migrations/prod/100000_create_tables.up.sql index d8d1a5f..1845ec8 100644 --- a/cmd/db_migrate/01.sql +++ b/migrations/prod/100000_create_tables.up.sql @@ -106,5 +106,3 @@ CREATE TABLE IF NOT EXISTS Task ( ) WITH (OIDS=FALSE); CREATE OR REPLACE TRIGGER Task_notify_event AFTER INSERT OR UPDATE OR DELETE ON Task FOR EACH ROW EXECUTE PROCEDURE notify_event(); CREATE OR REPLACE TRIGGER Task_partition_trigger BEFORE INSERT ON Task FOR EACH ROW EXECUTE PROCEDURE create_partition_and_insert(); - -GRANT SELECT ON ALL TABLES IN SCHEMA igor TO igorreadonly; diff --git a/pkg/database/options.go b/pkg/database/options.go index a96344a..a93e6ad 100644 --- a/pkg/database/options.go +++ b/pkg/database/options.go @@ -21,7 +21,7 @@ type Options struct { UsernameEnvVar string } -func (o *Options) setDefaults() { +func (o *Options) SetDefaults() { if o.PasswordEnvVar == "" { o.PasswordEnvVar = defaultPasswordEnvVar } diff --git a/pkg/database/postgres.go b/pkg/database/postgres.go index e142e9e..6f7a46d 100644 --- a/pkg/database/postgres.go +++ b/pkg/database/postgres.go @@ -22,7 +22,7 @@ type Postgres struct { // NewPostgres returns a new Postgres database connection. func NewPostgres(opts *Options) (*Postgres, error) { - opts.setDefaults() + opts.SetDefaults() opts.URL = strings.Replace(opts.URL, "$"+opts.UsernameEnvVar, os.Getenv(opts.UsernameEnvVar), 1) opts.URL = strings.Replace(opts.URL, "$"+opts.PasswordEnvVar, os.Getenv(opts.PasswordEnvVar), 1) pool, err := pgxpool.New(context.Background(), opts.URL) @@ -199,12 +199,7 @@ func (p *Postgres) SetTaskQueueID(taskID, etag, newEtag, queueTaskID string, sta // Jobs returns jobs matching the given query func (p *Postgres) Jobs(q *structs.Query) ([]*structs.Job, error) { - where, args := toSqlQuery(map[string][]string{ - "id": q.JobIDs, - "status": statusToStrings(q.Statuses), - }, - q.UpdatedBefore, q.UpdatedAfter, q.CreatedBefore, q.CreatedAfter, - ) + where, args := toSqlQuery([]string{"id", "status"}, [][]string{q.JobIDs, statusToStrings(q.Statuses)}, q.UpdatedBefore, q.UpdatedAfter, q.CreatedBefore, q.CreatedAfter) args = append(args, q.Limit, q.Offset) // TODO: prepare statement @@ -246,13 +241,7 @@ func (p *Postgres) Jobs(q *structs.Query) ([]*structs.Job, error) { // Layers returns layers matching the given query func (p *Postgres) Layers(q *structs.Query) ([]*structs.Layer, error) { - where, args := toSqlQuery(map[string][]string{ - "job_id": q.JobIDs, - "id": q.LayerIDs, - "status": statusToStrings(q.Statuses), - }, - q.UpdatedBefore, q.UpdatedAfter, q.CreatedBefore, q.CreatedAfter, - ) + where, args := toSqlQuery([]string{"job_id", "id", "status"}, [][]string{q.JobIDs, q.LayerIDs, statusToStrings(q.Statuses)}, q.UpdatedBefore, q.UpdatedAfter, q.CreatedBefore, q.CreatedAfter) args = append(args, q.Limit, q.Offset) qstr := fmt.Sprintf(`SELECT name, paused_at, priority, id, status, etag, job_id, created_at, updated_at FROM %s %s @@ -297,14 +286,7 @@ func (p *Postgres) Layers(q *structs.Query) ([]*structs.Layer, error) { // Tasks returns tasks matching the given query func (p *Postgres) Tasks(q *structs.Query) ([]*structs.Task, error) { - where, args := toSqlQuery(map[string][]string{ - "job_id": q.JobIDs, - "layer_id": q.LayerIDs, - "id": q.TaskIDs, - "status": statusToStrings(q.Statuses), - }, - q.UpdatedBefore, q.UpdatedAfter, q.CreatedBefore, q.CreatedAfter, - ) + where, args := toSqlQuery([]string{"job_id", "layer_id", "id", "status"}, [][]string{q.JobIDs, q.LayerIDs, q.TaskIDs, statusToStrings(q.Statuses)}, q.UpdatedBefore, q.UpdatedAfter, q.CreatedBefore, q.CreatedAfter) args = append(args, q.Limit, q.Offset) qstr := fmt.Sprintf(`SELECT type, args, name, paused_at, id, status, etag, job_id, layer_id, queue_task_id, message, created_at, updated_at FROM %s %s @@ -408,13 +390,24 @@ func (p *Postgres) setPaused(table string, at int64, newTag string, ids []*struc } // toSqlQuery converts query data into a SQL query string & args -func toSqlQuery(in map[string][]string, upB, upA, crB, crA int64) (string, []interface{}) { - if in == nil { - in = map[string][]string{} +func toSqlQuery(keys []string, values [][]string, upB, upA, crB, crA int64) (string, []interface{}) { + // Accepting two lists for keys and values means our order of iteration is assured. + // ie. as opposed to using a simplier map[string][]string + if keys == nil { + keys = []string{} + } + if values == nil { + values = [][]string{} + } + if len(keys) != len(values) { + // This would imply our code is busted inside the postgres package + // Unit tests should catch this .. + return "", []interface{}{} } and := []string{} args := []interface{}{} - for k, v := range in { + for index, k := range keys { + v := values[index] if v == nil || len(v) == 0 { continue } diff --git a/pkg/database/postgres_test.go b/pkg/database/postgres_test.go index 50fc15c..407fcd6 100644 --- a/pkg/database/postgres_test.go +++ b/pkg/database/postgres_test.go @@ -11,7 +11,8 @@ import ( func TestToSqlQuery(t *testing.T) { cases := []struct { Name string - In map[string][]string + Keys []string + Values [][]string ExpectQuery string ExpectArgs []interface{} UpdatedBefore int64 @@ -21,19 +22,22 @@ func TestToSqlQuery(t *testing.T) { }{ { Name: "Nil", - In: nil, + Keys: nil, + Values: nil, ExpectQuery: "", ExpectArgs: []interface{}{}, }, { Name: "Empty", - In: map[string][]string{}, + Keys: []string{}, + Values: [][]string{}, ExpectQuery: "", ExpectArgs: []interface{}{}, }, { Name: "TimeFilters", - In: map[string][]string{}, + Keys: []string{}, + Values: [][]string{}, ExpectQuery: "WHERE updated_at >= $1 AND updated_at <= $2 AND created_at >= $3 AND created_at <= $4", ExpectArgs: []interface{}{int64(100), int64(200), int64(300), int64(400)}, UpdatedBefore: 100, @@ -42,37 +46,30 @@ func TestToSqlQuery(t *testing.T) { CreatedAfter: 400, }, { - Name: "OneField", - In: map[string][]string{ - "field": []string{"a"}, - }, + Name: "OneField", + Keys: []string{"field"}, + Values: [][]string{{"a"}}, ExpectQuery: "WHERE field IN ($1)", ExpectArgs: []interface{}{"a"}, }, { - Name: "OneFieldMultipleArgs", - In: map[string][]string{ - "field": []string{"a", "b", "c"}, - }, + Name: "OneFieldMultipleArgs", + Keys: []string{"field"}, + Values: [][]string{{"a", "b", "c"}}, ExpectQuery: "WHERE field IN ($1, $2, $3)", ExpectArgs: []interface{}{"a", "b", "c"}, }, { - Name: "MultipleFieldsMultipleArgs", - In: map[string][]string{ - "field1": []string{"a", "b", "c"}, - "field2": []string{"d", "e", "f"}, - }, - // TODO: we iterate a map; priority is not guaranteed + Name: "MultipleFieldsMultipleArgs", + Keys: []string{"field1", "field2"}, + Values: [][]string{{"a", "b", "c"}, {"d", "e", "f"}}, ExpectQuery: "WHERE field1 IN ($1, $2, $3) AND field2 IN ($4, $5, $6)", ExpectArgs: []interface{}{"a", "b", "c", "d", "e", "f"}, }, { - Name: "MultipleFieldsMultipleArgsWithTimeFilters", - In: map[string][]string{ - "field1": []string{"a", "b", "c"}, - "field2": []string{"d", "e", "f"}, - }, + Name: "MultipleFieldsMultipleArgsWithTimeFilters", + Keys: []string{"field1", "field2"}, + Values: [][]string{{"a", "b", "c"}, {"d", "e", "f"}}, ExpectQuery: "WHERE field1 IN ($1, $2, $3) AND field2 IN ($4, $5, $6) AND updated_at >= $7 AND updated_at <= $8 AND created_at >= $9 AND created_at <= $10", ExpectArgs: []interface{}{"a", "b", "c", "d", "e", "f", int64(100), int64(200), int64(300), int64(400)}, UpdatedBefore: 100, @@ -84,7 +81,7 @@ func TestToSqlQuery(t *testing.T) { for _, c := range cases { t.Run(c.Name, func(t *testing.T) { - qstr, args := toSqlQuery(c.In, c.UpdatedBefore, c.UpdatedAfter, c.CreatedBefore, c.CreatedAfter) + qstr, args := toSqlQuery(c.Keys, c.Values, c.UpdatedBefore, c.UpdatedAfter, c.CreatedBefore, c.CreatedAfter) assert.Equal(t, c.ExpectQuery, qstr) assert.Equal(t, c.ExpectArgs, args) @@ -215,7 +212,7 @@ func TestToLayerSqlArgs(t *testing.T) { LayerSpec: structs.LayerSpec{ Name: "name", PausedAt: 100, - Priority: 12, + Priority: 12, }, ID: "id", Status: structs.PENDING, diff --git a/pkg/queue/asynq.go b/pkg/queue/asynq.go index 3271272..48fa8ee 100644 --- a/pkg/queue/asynq.go +++ b/pkg/queue/asynq.go @@ -47,7 +47,7 @@ type Asynq struct { // NewAsynqQueue returns a new Asynq queue with the given settings func NewAsynqQueue(svc database.QueueDB, opts *Options) (*Asynq, error) { - opts.setDefaults() + opts.SetDefaults() opts.URL = strings.Replace(opts.URL, "$"+opts.UsernameEnvVar, os.Getenv(opts.UsernameEnvVar), 1) opts.URL = strings.Replace(opts.URL, "$"+opts.PasswordEnvVar, os.Getenv(opts.PasswordEnvVar), 1) redisOpts := asynq.RedisClientOpt{ diff --git a/pkg/queue/options.go b/pkg/queue/options.go index 8f646de..35528b2 100644 --- a/pkg/queue/options.go +++ b/pkg/queue/options.go @@ -26,7 +26,7 @@ type Options struct { TLSConfig *tls.Config } -func (o *Options) setDefaults() { +func (o *Options) SetDefaults() { if o.PasswordEnvVar == "" { o.PasswordEnvVar = defaultPasswordEnvVar } diff --git a/tests/Dockerfile.dummyworker b/tests/Dockerfile similarity index 100% rename from tests/Dockerfile.dummyworker rename to tests/Dockerfile diff --git a/tests/compose.yaml b/tests/compose.yaml index 7f7ecc1..cbf004c 100644 --- a/tests/compose.yaml +++ b/tests/compose.yaml @@ -3,10 +3,10 @@ services: # supplies some tasks (sleep, die, maybe_die) for testing build: context: ../ - dockerfile: tests/Dockerfile.dummyworker + dockerfile: tests/Dockerfile environment: - DATABASE_URL=postgres://igorreadwrite:readwrite@postgres:5432/igor?sslmode=disable&search_path=igor - - REDIS_URL=redis:6379 + - QUEUE_URL=redis:6379 depends_on: - postgres - redis @@ -17,12 +17,13 @@ services: # serves Igor's API over HTTP (currently the only method, but more can be supported) build: context: ../ - dockerfile: Dockerfile.apiserver + dockerfile: Dockerfile + command: ["api"] ports: - "8100:8100" environment: - DATABASE_URL=postgres://igorreadwrite:readwrite@postgres:5432/igor?sslmode=disable&search_path=igor - - REDIS_URL=redis:6379 + - QUEUE_URL=redis:6379 - ADDR=0.0.0.0:8100 - DEBUG=true depends_on: @@ -35,10 +36,11 @@ services: # performs the inner logic of Igor, running background tasks, queuing tasks and generally managing things build: context: ../ - dockerfile: Dockerfile.worker + dockerfile: Dockerfile + command: ["worker"] environment: - DATABASE_URL=postgres://igorreadwrite:readwrite@postgres:5432/igor?sslmode=disable&search_path=igor - - REDIS_URL=redis:6379 + - QUEUE_URL=redis:6379 - DEBUG=true depends_on: - postgres diff --git a/tests/dummy_worker.go b/tests/dummy_worker.go index dc9efac..fe9e346 100644 --- a/tests/dummy_worker.go +++ b/tests/dummy_worker.go @@ -18,7 +18,7 @@ const ( func main() { pgURL := os.Getenv("DATABASE_URL") - rdURL := os.Getenv("REDIS_URL") + rdURL := os.Getenv("QUEUE_URL") svc, err := api.New( &database.Options{URL: pgURL}, diff --git a/tests/run.sh b/tests/run.sh index c19af03..e4602ef 100755 --- a/tests/run.sh +++ b/tests/run.sh @@ -6,9 +6,13 @@ PGHOST=${PGHOST:-localhost} PGPORT=${PGPORT:-5432} -PGUSER=${PGUSER:-postgres} PGDATABASE=${PGDATABASE:-igor} -PGPASSWORD=${PGPASSWORD:-test} + +# these test user/passwords are made in migrations/dev/ +OWNUSER=${OWNUSER:-postgres} # owner +OWNPASS=${OWNPASS:-test} +RWUSER=${RWUSER:-igorreadwrite} # readwrite +RWPASS=${RWPASS:-readwrite} REDISHOST=${REDISHOST:-localhost} REDISPORT=${REDISPORT:-6379} @@ -21,7 +25,9 @@ set -eux SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) cd $SCRIPT_DIR -MIGRATE=$SCRIPT_DIR/../cmd/db_migrate/run.sh +# build the igor binary so we can access & test the migration tool +IGOR=/tmp/igor +go build -o $IGOR $SCRIPT_DIR/../cmd/igor/*.go # stand up the test infra docker compose build @@ -29,17 +35,24 @@ docker compose up -d # wait for the postgres server to be ready RETRIES=5 -until PGPASSWORD=$PGPASSWORD psql -h $PGHOST -U $PGUSER -d $PGDATABASE -c "select 1" > /dev/null 2>&1 || [ $RETRIES -eq 0 ]; do +until PGPASSWORD=$OWNPASS psql -h $PGHOST -p $PGPORT -U $OWNUSER -c "select 1" > /dev/null 2>&1 || [ $RETRIES -eq 0 ]; do echo "Waiting for postgres server, $((RETRIES--)) remaining attempts..." sleep 5 done -# apply the db migration -PGHOST=$PGHOST PGPORT=$PGPORT PGUSER=$PGUSER PGDATABASE=$PGDATABASE PGPASSWORD=$PGPASSWORD $MIGRATE +# apply db migrations +# Create the DB +# Nb. "migrate" refuses to acknowledge '\connect' to make & connect to the DB to make a schema .. so we have to do the first step manually +PGHOST=$PGHOST PGPORT=$PGPORT PGUSER=$OWNUSER PGPASSWORD=$OWNPASS psql -f ${SCRIPT_DIR}/../migrations/dev/000000_setup_database.up.sql + +# Apply the migrations +DATABASE_URL="postgres://${RWUSER}:${RWPASS}@${PGHOST}:${PGPORT}/${PGDATABASE}?sslmode=disable&search_path=igor" $IGOR migrate up --source file://${SCRIPT_DIR}/../migrations/prod +# Print the version +DATABASE_URL="postgres://${RWUSER}:${RWPASS}@${PGHOST}:${PGPORT}/${PGDATABASE}?sslmode=disable&search_path=igor" $IGOR migrate version --source file://${SCRIPT_DIR}/../migrations/prod # run the tests set +e -IGOR_TEST_API="http://localhost:${APIPORT}/api/v1" IGOR_TEST_DATA=${SCRIPT_DIR}/data IGOR_TEST_PG_URL="postgres://${PGUSER}:${PGPASSWORD}@${PGHOST}:${PGPORT}/${PGDATABASE}?sslmode=disable&search_path=igor" IGOR_TEST_RD_URL="redis://${REDISHOST}:${REDISPORT}/${REDISDB}" go test -v ./... +IGOR_TEST_API="http://localhost:${APIPORT}/api/v1" IGOR_TEST_DATA=${SCRIPT_DIR}/data IGOR_TEST_PG_URL="postgres://${RWUSER}:${RWPASS}@${PGHOST}:${PGPORT}/${PGDATABASE}?sslmode=disable&search_path=igor" IGOR_TEST_RD_URL="redis://${REDISHOST}:${REDISPORT}/${REDISDB}" go test -v ./... # tear down & remove the test infra docker compose stop