From dcfc2b3e76db1f0e9fa28197ce597e6784c241d4 Mon Sep 17 00:00:00 2001 From: Sarah Funkhouser <147884153+golanglemonade@users.noreply.github.com> Date: Wed, 25 Sep 2024 13:59:25 -0600 Subject: [PATCH] feat: add interface for insert only client Signed-off-by: Sarah Funkhouser <147884153+golanglemonade@users.noreply.github.com> --- .github/labeler.yml | 3 + go.mod | 14 +- go.sum | 33 ++-- internal/river/river.go | 40 ++-- pkg/riverqueue/client.go | 186 ++++++++++++++++++ pkg/riverqueue/doc.go | 2 + pkg/riverqueue/errors.go | 5 + {internal/river => pkg/riverqueue}/migrate.go | 12 +- pkg/riverqueue/options.go | 70 +++++++ 9 files changed, 319 insertions(+), 46 deletions(-) create mode 100644 pkg/riverqueue/client.go create mode 100644 pkg/riverqueue/doc.go create mode 100644 pkg/riverqueue/errors.go rename {internal/river => pkg/riverqueue}/migrate.go (60%) create mode 100644 pkg/riverqueue/options.go diff --git a/.github/labeler.yml b/.github/labeler.yml index b5f34c9..88142e6 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -22,3 +22,6 @@ local-development: jobs: - changed-files: - any-glob-to-any-file: pkg/jobs/** +client: + - changed-files: + - any-glob-to-any-file: pkg/riverqueue/** diff --git a/go.mod b/go.mod index c31fb87..30ad237 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,9 @@ require ( github.com/knadh/koanf/providers/posflag v0.1.0 github.com/knadh/koanf/v2 v2.1.1 github.com/mcuadros/go-defaults v1.2.0 - github.com/riverqueue/river v0.11.4 - github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.4 + github.com/riverqueue/river v0.12.0 + github.com/riverqueue/river/riverdriver/riverpgxv5 v0.12.0 + github.com/riverqueue/river/rivertype v0.12.0 github.com/rs/zerolog v1.33.0 github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.9.0 @@ -52,14 +53,17 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/redis/go-redis/v9 v9.6.1 // indirect github.com/resend/resend-go/v2 v2.11.0 // indirect - github.com/riverqueue/river/riverdriver v0.11.4 // indirect - github.com/riverqueue/river/rivershared v0.11.4 // indirect - github.com/riverqueue/river/rivertype v0.11.4 // indirect + github.com/riverqueue/river/riverdriver v0.12.0 // indirect + github.com/riverqueue/river/rivershared v0.12.0 // indirect github.com/sosodev/duration v1.3.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/theopenlane/echox v0.2.0 // indirect github.com/theopenlane/iam v0.1.6 // indirect + github.com/tidwall/gjson v1.17.3 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/sjson v1.2.5 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect github.com/vektah/gqlparser/v2 v2.5.16 // indirect diff --git a/go.sum b/go.sum index 2dee70d..7262dee 100644 --- a/go.sum +++ b/go.sum @@ -109,18 +109,18 @@ github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0 github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/resend/resend-go/v2 v2.11.0 h1:Ja5eXizUCbvyLgbiP8sFsJW/UN1b7d6IEUqi80IlgiU= github.com/resend/resend-go/v2 v2.11.0/go.mod h1:ihnxc7wPpSgans8RV8d8dIF4hYWVsqMK5KxXAr9LIos= -github.com/riverqueue/river v0.11.4 h1:NMRsODhRgFztf080RMCjI377jldLXsx41E2r7+c0lPE= -github.com/riverqueue/river v0.11.4/go.mod h1:HvgBkqon7lYKm9Su4lVOnn1qx8Q4FnSMJjf5auVial4= -github.com/riverqueue/river/riverdriver v0.11.4 h1:kBg68vfTnRuSwsgcZ7UbKC4ocZ+KSCGnuZw/GwMMMP4= -github.com/riverqueue/river/riverdriver v0.11.4/go.mod h1:+NxTrldRYYsdTbZSxX7L2LuWU/B0IAtAActDJcNbcPs= -github.com/riverqueue/river/riverdriver/riverdatabasesql v0.11.4 h1:QBegZQrB59dafWaiNphJC85KTA0CmeGYcpCqu52qbnI= -github.com/riverqueue/river/riverdriver/riverdatabasesql v0.11.4/go.mod h1:CQC2a/+GRtN6b67IA7jFCvcCtOBWRz3lWqyNxDggKSM= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.4 h1:rRY8WabllXRsLp8U+gxUpYgTgI8dveF3UWnZJu965Lg= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.4/go.mod h1:GgWsTnC7V7lanQLyj8W1UuYuzyDoJZc4bhhDomtYr30= -github.com/riverqueue/river/rivershared v0.11.4 h1:XGfzJKG7hhwd0MwImF/4r+t6F9aq2Q7e6NNYifStnus= -github.com/riverqueue/river/rivershared v0.11.4/go.mod h1:vZc9tRvSZ9spLqcz9UUuKbZGuDRwBhS3LuzLY7d/jkw= -github.com/riverqueue/river/rivertype v0.11.4 h1:TAdi4CQEYukveYneAqm5LupRVZjvSfB8tL3xKR13wi4= -github.com/riverqueue/river/rivertype v0.11.4/go.mod h1:3WRQEDlLKZky/vGwFcZC3uKjC+/8izE6ucHwCsuir98= +github.com/riverqueue/river v0.12.0 h1:ZkzSH0wyXtg8lSJdLSVl4sdZ9aNUiCi/CG9LzSbanpA= +github.com/riverqueue/river v0.12.0/go.mod h1:uMocd8FYK6YFHMyYzptAhGETtrjh9oTzXdQBzBOlIT0= +github.com/riverqueue/river/riverdriver v0.12.0 h1:ONm3MfrmA/Yg6oaus5LP9w8vj3OvTlS5y+F45JmJR3I= +github.com/riverqueue/river/riverdriver v0.12.0/go.mod h1:/gc34Xm61CSZl/m96LcreD4L8LPAlN7VadFDTOVsgEs= +github.com/riverqueue/river/riverdriver/riverdatabasesql v0.12.0 h1:P8rgOutF3DqDiLKgetUy38X9JZKWOzsEGW/oC0qkOjM= +github.com/riverqueue/river/riverdriver/riverdatabasesql v0.12.0/go.mod h1:QGsr8DbI/DTDZCsmdDVnKQfltnzcFwj3TgVmj1N25Fc= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.12.0 h1:2Y77a6pe/UXQU3vn2ombU8sA1iIy2c7GJJ3XXM9Qqds= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.12.0/go.mod h1:qYxaATboRtB3tAPlBGF4zxUpfcPo61jPPO3xAa+j450= +github.com/riverqueue/river/rivershared v0.12.0 h1:zLAh2A9quwBbGIXzUtedQUupz1VnWwV/R3biV1UVIfY= +github.com/riverqueue/river/rivershared v0.12.0/go.mod h1:PbbRy9ZgezvfFSpLC+TwRCEd6+dgsvkWXfqaSQO+4SQ= +github.com/riverqueue/river/rivertype v0.12.0 h1:fgAIUc09QUjW/8seNJ7BL5GvNvdpqX9ggPk4sPxa7Vc= +github.com/riverqueue/river/rivertype v0.12.0/go.mod h1:3WRQEDlLKZky/vGwFcZC3uKjC+/8izE6ucHwCsuir98= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= @@ -159,6 +159,15 @@ github.com/theopenlane/newman v0.1.1 h1:pxGPRcy8kXQplfv4Sp1N3XUkWmx/scZvp7oj+y2l github.com/theopenlane/newman v0.1.1/go.mod h1:A37pInKEYsdvUmjQzTDv7x5T4KhMxoFW105DA3XvH4Y= github.com/theopenlane/utils v0.2.1 h1:T6VfvOQDcAXBa1NFVL4QCsCbHvVQkp6Tl4hGJVd7TwQ= github.com/theopenlane/utils v0.2.1/go.mod h1:ydEtwhmEvkVt3KKmNqiQiSY5b3rKH7U4umZ3QbFDsxU= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.17.3 h1:bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94= +github.com/tidwall/gjson v1.17.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= diff --git a/internal/river/river.go b/internal/river/river.go index 112d133..eed0163 100644 --- a/internal/river/river.go +++ b/internal/river/river.go @@ -9,10 +9,10 @@ import ( "syscall" "time" - "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" - "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/rs/zerolog/log" + + "github.com/theopenlane/riverboat/pkg/riverqueue" ) const ( @@ -21,17 +21,6 @@ const ( // Start the river server with the given configuration func Start(ctx context.Context, c Config) error { - // Create a new database connection pool - dbPool, err := pgxpool.New(ctx, c.DatabaseHost) - if err != nil { - log.Fatal().Err(err).Msg("failed to connect to database") - } - - // Run migrations on startup - if err := runMigrations(ctx, dbPool); err != nil { - log.Fatal().Err(err).Msg("failed to run migrations") - } - // Create workers based on the configuration worker, err := createWorkers(c.Workers) if err != nil { @@ -46,22 +35,25 @@ func Start(ctx context.Context, c Config) error { log.Debug().Interface("queues", queues).Msg("queues created") // create a new river client - client, err := river.NewClient( - riverpgxv5.New(dbPool), - &river.Config{ - Workers: worker, - Queues: queues, - Logger: createLogger(c.Logger), - }, + client, err := riverqueue.New( + ctx, + riverqueue.WithConnectionURI(c.DatabaseHost), + riverqueue.WithRunMigrations(true), + riverqueue.WithLogger(createLogger(c.Logger)), + riverqueue.WithWorkers(worker), + riverqueue.WithQueues(queues), ) if err != nil { log.Fatal().Err(err).Msg("failed to create river client") } + // get the underlying river client + rc := client.GetRiverClient() + log.Info().Msg(startBlock) // run the client - if err := client.Start(ctx); err != nil { + if err := rc.Start(ctx); err != nil { log.Fatal().Err(err).Msg("failed to start river client") } @@ -91,7 +83,7 @@ func Start(ctx context.Context, c Config) error { } }() - err := client.Stop(softStopCtx) + err := rc.Stop(softStopCtx) if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { panic(err) @@ -110,14 +102,14 @@ func Start(ctx context.Context, c Config) error { // always work. However, in the case of a bug where a job blocks despite // being cancelled, it may be necessary to either ignore River's stop // result (what's shown here) or have a supervisor kill the process. - err = client.StopAndCancel(hardStopCtx) + err = rc.StopAndCancel(hardStopCtx) if err != nil && errors.Is(err, context.DeadlineExceeded) { log.Info().Msg("hard stop timeout; ignoring stop procedure and exiting unsafely") } else if err != nil { log.Panic().Err(err).Msg("hard stop failed") } }() - <-client.Stopped() + <-rc.Stopped() return nil } diff --git a/pkg/riverqueue/client.go b/pkg/riverqueue/client.go new file mode 100644 index 0000000..0cb2123 --- /dev/null +++ b/pkg/riverqueue/client.go @@ -0,0 +1,186 @@ +package riverqueue + +import ( + "context" + "fmt" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivertype" + "github.com/rs/zerolog/log" +) + +// JobClient is an interface for the river client to insert jobs +// this interface is only used for inserting new jobs and will not contain any other methods +type JobClient interface { + // InsertMany inserts many jobs at once. Each job is inserted as an InsertManyParams tuple, which takes job args along with an optional set of insert options, which override insert options provided + // by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout. + InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error) + // InsertManyTx inserts many jobs at once. Each job is inserted as an InsertManyParams tuple, which takes job args along with an optional set of insert options, which override insert options provided + // by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout. + InsertManyTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error) + // Insert inserts a new job with the provided args. Job opts can be used to override any defaults that may have been provided by an implementation of JobArgsWithInsertOpts.InsertOpts, + // as well as any global defaults. The provided context is used for the underlying Postgres insert and can be used to cancel the operation or apply a timeout. + Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error) + // InsertTx inserts a new job with the provided args on the given transaction. Job opts can be used to override any defaults that may have been provided by an implementation of JobArgsWithInsertOpts.InsertOpts, + // as well as any global defaults. The provided context is used for the underlying Postgres insert and can be used to cancel the operation or apply a timeout. + InsertTx(ctx context.Context, tx pgx.Tx, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error) + // InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism, making the operation quite fast and memory efficient. Each job is inserted as an InsertManyParams tuple, + // which takes job args along with an optional set of insert options, which override insert options provided by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. + // The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout. + InsertManyFast(ctx context.Context, params []river.InsertManyParams) (int, error) + // InsertManyTx inserts many jobs at once using Postgres' `COPY FROM` mechanism, making the operation quite fast and memory efficient. Each job is inserted as an InsertManyParams tuple, + // which takes job args along with an optional set of insert options, which override insert options provided by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. + // The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout. + InsertManyFastTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) (int, error) + // JobCancel cancels the job with the given ID. If possible, the job is cancelled immediately and will not be retried. + // The provided context is used for the underlying Postgres update and can be used to cancel the operation or apply a timeout. + JobCancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error) + // JobCancelTx cancels the job with the given ID within the specified transaction. This variant lets a caller cancel a job atomically alongside other database changes. + // A cancelled job doesn't take effect until the transaction commits, and if the transaction rolls back, so too is the cancelled job. + JobCancelTx(ctx context.Context, tx pgx.Tx, jobID int64) (*rivertype.JobRow, error) + + // GetPool returns the underlying pgx pool + GetPool() *pgxpool.Pool + // TruncateRiverTables truncates River tables in the target database. This is for test cleanup and should obviously only be used in tests. + TruncateRiverTables(ctx context.Context) error + // GetRiverClient returns the underlying river client + // this can be used to interact directly with the river client for more advanced use cases (e.g. starting the river server) + // which are outside the scope of the insert-only client interface + GetRiverClient() *river.Client[pgx.Tx] +} + +// Config settings for the river client +type Config struct { + // ConnectionURI is the connection URI for the database + ConnectionURI string `koanf:"connectionURI" json:"connectionURI" default:""` + // RunMigrations is a flag to determine if migrations should be run + RunMigrations bool `koanf:"runMigrations" json:"runMigrations" default:"false"` + // RiverConf is the river configuration + RiverConf river.Config `koanf:"riverConf" json:"riverConf"` +} + +// Client is a river Client that implements the JobClient interface +type Client struct { + config Config + + pool *pgxpool.Pool + + // riverClient is the river client that is used to interact with the river server + // using the pgx driver + riverClient *river.Client[pgx.Tx] +} + +// ensure the client implements the JobClient interface +var _ JobClient = &Client{} + +// New creates a new river client with the options provided +func New(ctx context.Context, opts ...Option) (c *Client, err error) { + // Initialize the Client struct + c = &Client{} + + // apply the options to the client + for _, opt := range opts { + opt(c) + } + + if c.config.ConnectionURI == "" { + return nil, ErrConnectionURIRequired + } + + // create a new river client with the given connection URI + c.pool, err = pgxpool.New(ctx, c.config.ConnectionURI) + if err != nil { + log.Error().Err(err).Msg("error creating job queue database connection") + return nil, err + } + + // run migrations if the flag is set + if c.config.RunMigrations { + if err := RunMigrations(ctx, c.pool); err != nil { + log.Error().Err(err).Msg("error running migrations") + return nil, err + } + } + + // create a new river client with the given connection URI + c.riverClient, err = river.NewClient(riverpgxv5.New(c.pool), &c.config.RiverConf) + if err != nil { + log.Error().Err(err).Msg("error creating river client") + return nil, err + } + + return c, nil +} + +// GetPool returns the underlying pgx pool +func (c *Client) GetPool() *pgxpool.Pool { + return c.pool +} + +// GetRiverClient returns the underlying river client +func (c *Client) GetRiverClient() *river.Client[pgx.Tx] { + return c.riverClient +} + +// Insert satisfies the JobClient interface +func (c *Client) Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error) { + return c.riverClient.Insert(ctx, args, opts) +} + +// InsertMany satisfies the JobClient interface +func (c *Client) InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error) { + return c.riverClient.InsertMany(ctx, params) +} + +// InsertManyTx satisfies the JobClient interface +func (c *Client) InsertManyTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error) { + return c.riverClient.InsertManyTx(ctx, tx, params) +} + +// InsertTx satisfies the JobClient interface +func (c *Client) InsertTx(ctx context.Context, tx pgx.Tx, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error) { + return c.riverClient.InsertTx(ctx, tx, args, opts) +} + +// InsertManyFast satisfies the JobClient interface +func (c *Client) InsertManyFast(ctx context.Context, params []river.InsertManyParams) (int, error) { + return c.riverClient.InsertManyFast(ctx, params) +} + +// InsertManyFastTx satisfies the JobClient interface +func (c *Client) InsertManyFastTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) (int, error) { + return c.riverClient.InsertManyFastTx(ctx, tx, params) +} + +// JobCancel satisfies the JobClient interface +func (c *Client) JobCancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error) { + return c.riverClient.JobCancel(ctx, jobID) +} + +// JobCancelTx satisfies the JobClient interface +func (c *Client) JobCancelTx(ctx context.Context, tx pgx.Tx, jobID int64) (*rivertype.JobRow, error) { + return c.riverClient.JobCancelTx(ctx, tx, jobID) +} + +// TruncateRiverTables truncates River tables in the target database. This is +// for test cleanup and should obviously only be used in tests. +func (c *Client) TruncateRiverTables(ctx context.Context) error { + pool := c.GetPool() + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) // nolint:mnd + defer cancel() + + tables := []string{"river_job", "river_leader", "river_queue"} + + for _, table := range tables { + if _, err := pool.Exec(ctx, fmt.Sprintf("TRUNCATE TABLE %s;", table)); err != nil { + return fmt.Errorf("error truncating %q: %w", table, err) + } + } + + return nil +} diff --git a/pkg/riverqueue/doc.go b/pkg/riverqueue/doc.go new file mode 100644 index 0000000..73cb3c1 --- /dev/null +++ b/pkg/riverqueue/doc.go @@ -0,0 +1,2 @@ +// package riverqueue is an insert-only wrapper for the river client +package riverqueue diff --git a/pkg/riverqueue/errors.go b/pkg/riverqueue/errors.go new file mode 100644 index 0000000..4c88a4b --- /dev/null +++ b/pkg/riverqueue/errors.go @@ -0,0 +1,5 @@ +package riverqueue + +import "errors" + +var ErrConnectionURIRequired = errors.New("connection URI is required to initialize the client") diff --git a/internal/river/migrate.go b/pkg/riverqueue/migrate.go similarity index 60% rename from internal/river/migrate.go rename to pkg/riverqueue/migrate.go index c2dc7fa..4c42cd1 100644 --- a/internal/river/migrate.go +++ b/pkg/riverqueue/migrate.go @@ -1,19 +1,21 @@ -package river +package riverqueue import ( "context" "github.com/jackc/pgx/v5/pgxpool" - "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivermigrate" ) -// runMigrations runs the migrations for the river server +// RunMigrations runs the migrations for the river server // see https://riverqueue.com/docs/migrations for more information -func runMigrations(ctx context.Context, dbPool *pgxpool.Pool) error { +func RunMigrations(ctx context.Context, dbPool *pgxpool.Pool) error { // run migrations here - migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil) + migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil) + if err != nil { + return err + } if _, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil); err != nil { return err diff --git a/pkg/riverqueue/options.go b/pkg/riverqueue/options.go new file mode 100644 index 0000000..09a6fa4 --- /dev/null +++ b/pkg/riverqueue/options.go @@ -0,0 +1,70 @@ +package riverqueue + +import ( + "log/slog" + "time" + + "github.com/riverqueue/river" +) + +// Option is a function that configures a client +type Option func(*Client) + +// WithConnectionURI sets the connection URI for the client +func WithConnectionURI(uri string) Option { + return func(c *Client) { + c.config.ConnectionURI = uri + } +} + +// WithLogger sets the logger for the client +func WithLogger(l *slog.Logger) Option { + return func(c *Client) { + c.config.RiverConf.Logger = l + } +} + +// WithMaxRetries sets the maximum number of retries for the client +func WithMaxRetries(maxRetries int) Option { + return func(c *Client) { + c.config.RiverConf.MaxAttempts = maxRetries + } +} + +// WithJobTimeout sets the job timeout for the client +func WithJobTimeout(jobTimeout time.Duration) Option { + return func(c *Client) { + c.config.RiverConf.JobTimeout = jobTimeout + } +} + +// WithRunMigrations sets the run migrations flag for the client +func WithRunMigrations(runMigrations bool) Option { + return func(c *Client) { + c.config.RunMigrations = runMigrations + } +} + +// WithWorkers sets the workers for the client +// this should be omitted when creating an insert only client +func WithWorkers(workers *river.Workers) Option { + return func(c *Client) { + c.config.RiverConf.Workers = workers + } +} + +// WithQueues sets the queues for the client +// this should be omitted when creating an insert only client +func WithQueues(q map[string]river.QueueConfig) Option { + return func(c *Client) { + c.config.RiverConf.Queues = q + } +} + +// WithRiverConfig sets the entire river configuration for the client +// prefer using the other options when possible +func WithRiverConfig(conf river.Config) Option { + return func(c *Client) { + c.config.RiverConf = conf + } +}