Skip to content

Commit

Permalink
feat: add interface for insert only client
Browse files Browse the repository at this point in the history
Signed-off-by: Sarah Funkhouser <[email protected]>
  • Loading branch information
golanglemonade committed Sep 25, 2024
1 parent 84da28a commit dcfc2b3
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 46 deletions.
3 changes: 3 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ local-development:
jobs:
- changed-files:
- any-glob-to-any-file: pkg/jobs/**
client:
- changed-files:
- any-glob-to-any-file: pkg/riverqueue/**
14 changes: 9 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ require (
github.com/knadh/koanf/providers/posflag v0.1.0
github.com/knadh/koanf/v2 v2.1.1
github.com/mcuadros/go-defaults v1.2.0
github.com/riverqueue/river v0.11.4
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.4
github.com/riverqueue/river v0.12.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.12.0
github.com/riverqueue/river/rivertype v0.12.0
github.com/rs/zerolog v1.33.0
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -52,14 +53,17 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/redis/go-redis/v9 v9.6.1 // indirect
github.com/resend/resend-go/v2 v2.11.0 // indirect
github.com/riverqueue/river/riverdriver v0.11.4 // indirect
github.com/riverqueue/river/rivershared v0.11.4 // indirect
github.com/riverqueue/river/rivertype v0.11.4 // indirect
github.com/riverqueue/river/riverdriver v0.12.0 // indirect
github.com/riverqueue/river/rivershared v0.12.0 // indirect
github.com/sosodev/duration v1.3.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/theopenlane/echox v0.2.0 // indirect
github.com/theopenlane/iam v0.1.6 // indirect
github.com/tidwall/gjson v1.17.3 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/vektah/gqlparser/v2 v2.5.16 // indirect
Expand Down
33 changes: 21 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,18 @@ github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/resend/resend-go/v2 v2.11.0 h1:Ja5eXizUCbvyLgbiP8sFsJW/UN1b7d6IEUqi80IlgiU=
github.com/resend/resend-go/v2 v2.11.0/go.mod h1:ihnxc7wPpSgans8RV8d8dIF4hYWVsqMK5KxXAr9LIos=
github.com/riverqueue/river v0.11.4 h1:NMRsODhRgFztf080RMCjI377jldLXsx41E2r7+c0lPE=
github.com/riverqueue/river v0.11.4/go.mod h1:HvgBkqon7lYKm9Su4lVOnn1qx8Q4FnSMJjf5auVial4=
github.com/riverqueue/river/riverdriver v0.11.4 h1:kBg68vfTnRuSwsgcZ7UbKC4ocZ+KSCGnuZw/GwMMMP4=
github.com/riverqueue/river/riverdriver v0.11.4/go.mod h1:+NxTrldRYYsdTbZSxX7L2LuWU/B0IAtAActDJcNbcPs=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.11.4 h1:QBegZQrB59dafWaiNphJC85KTA0CmeGYcpCqu52qbnI=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.11.4/go.mod h1:CQC2a/+GRtN6b67IA7jFCvcCtOBWRz3lWqyNxDggKSM=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.4 h1:rRY8WabllXRsLp8U+gxUpYgTgI8dveF3UWnZJu965Lg=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.4/go.mod h1:GgWsTnC7V7lanQLyj8W1UuYuzyDoJZc4bhhDomtYr30=
github.com/riverqueue/river/rivershared v0.11.4 h1:XGfzJKG7hhwd0MwImF/4r+t6F9aq2Q7e6NNYifStnus=
github.com/riverqueue/river/rivershared v0.11.4/go.mod h1:vZc9tRvSZ9spLqcz9UUuKbZGuDRwBhS3LuzLY7d/jkw=
github.com/riverqueue/river/rivertype v0.11.4 h1:TAdi4CQEYukveYneAqm5LupRVZjvSfB8tL3xKR13wi4=
github.com/riverqueue/river/rivertype v0.11.4/go.mod h1:3WRQEDlLKZky/vGwFcZC3uKjC+/8izE6ucHwCsuir98=
github.com/riverqueue/river v0.12.0 h1:ZkzSH0wyXtg8lSJdLSVl4sdZ9aNUiCi/CG9LzSbanpA=
github.com/riverqueue/river v0.12.0/go.mod h1:uMocd8FYK6YFHMyYzptAhGETtrjh9oTzXdQBzBOlIT0=
github.com/riverqueue/river/riverdriver v0.12.0 h1:ONm3MfrmA/Yg6oaus5LP9w8vj3OvTlS5y+F45JmJR3I=
github.com/riverqueue/river/riverdriver v0.12.0/go.mod h1:/gc34Xm61CSZl/m96LcreD4L8LPAlN7VadFDTOVsgEs=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.12.0 h1:P8rgOutF3DqDiLKgetUy38X9JZKWOzsEGW/oC0qkOjM=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.12.0/go.mod h1:QGsr8DbI/DTDZCsmdDVnKQfltnzcFwj3TgVmj1N25Fc=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.12.0 h1:2Y77a6pe/UXQU3vn2ombU8sA1iIy2c7GJJ3XXM9Qqds=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.12.0/go.mod h1:qYxaATboRtB3tAPlBGF4zxUpfcPo61jPPO3xAa+j450=
github.com/riverqueue/river/rivershared v0.12.0 h1:zLAh2A9quwBbGIXzUtedQUupz1VnWwV/R3biV1UVIfY=
github.com/riverqueue/river/rivershared v0.12.0/go.mod h1:PbbRy9ZgezvfFSpLC+TwRCEd6+dgsvkWXfqaSQO+4SQ=
github.com/riverqueue/river/rivertype v0.12.0 h1:fgAIUc09QUjW/8seNJ7BL5GvNvdpqX9ggPk4sPxa7Vc=
github.com/riverqueue/river/rivertype v0.12.0/go.mod h1:3WRQEDlLKZky/vGwFcZC3uKjC+/8izE6ucHwCsuir98=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
Expand Down Expand Up @@ -159,6 +159,15 @@ github.com/theopenlane/newman v0.1.1 h1:pxGPRcy8kXQplfv4Sp1N3XUkWmx/scZvp7oj+y2l
github.com/theopenlane/newman v0.1.1/go.mod h1:A37pInKEYsdvUmjQzTDv7x5T4KhMxoFW105DA3XvH4Y=
github.com/theopenlane/utils v0.2.1 h1:T6VfvOQDcAXBa1NFVL4QCsCbHvVQkp6Tl4hGJVd7TwQ=
github.com/theopenlane/utils v0.2.1/go.mod h1:ydEtwhmEvkVt3KKmNqiQiSY5b3rKH7U4umZ3QbFDsxU=
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.17.3 h1:bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94=
github.com/tidwall/gjson v1.17.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
Expand Down
40 changes: 16 additions & 24 deletions internal/river/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"syscall"
"time"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/rs/zerolog/log"

"github.com/theopenlane/riverboat/pkg/riverqueue"
)

const (
Expand All @@ -21,17 +21,6 @@ const (

// Start the river server with the given configuration
func Start(ctx context.Context, c Config) error {
// Create a new database connection pool
dbPool, err := pgxpool.New(ctx, c.DatabaseHost)
if err != nil {
log.Fatal().Err(err).Msg("failed to connect to database")
}

// Run migrations on startup
if err := runMigrations(ctx, dbPool); err != nil {
log.Fatal().Err(err).Msg("failed to run migrations")
}

// Create workers based on the configuration
worker, err := createWorkers(c.Workers)
if err != nil {
Expand All @@ -46,22 +35,25 @@ func Start(ctx context.Context, c Config) error {
log.Debug().Interface("queues", queues).Msg("queues created")

// create a new river client
client, err := river.NewClient(
riverpgxv5.New(dbPool),
&river.Config{
Workers: worker,
Queues: queues,
Logger: createLogger(c.Logger),
},
client, err := riverqueue.New(
ctx,
riverqueue.WithConnectionURI(c.DatabaseHost),
riverqueue.WithRunMigrations(true),
riverqueue.WithLogger(createLogger(c.Logger)),
riverqueue.WithWorkers(worker),
riverqueue.WithQueues(queues),
)
if err != nil {
log.Fatal().Err(err).Msg("failed to create river client")
}

// get the underlying river client
rc := client.GetRiverClient()

log.Info().Msg(startBlock)

// run the client
if err := client.Start(ctx); err != nil {
if err := rc.Start(ctx); err != nil {
log.Fatal().Err(err).Msg("failed to start river client")
}

Expand Down Expand Up @@ -91,7 +83,7 @@ func Start(ctx context.Context, c Config) error {
}
}()

err := client.Stop(softStopCtx)
err := rc.Stop(softStopCtx)

if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
panic(err)
Expand All @@ -110,14 +102,14 @@ func Start(ctx context.Context, c Config) error {
// always work. However, in the case of a bug where a job blocks despite
// being cancelled, it may be necessary to either ignore River's stop
// result (what's shown here) or have a supervisor kill the process.
err = client.StopAndCancel(hardStopCtx)
err = rc.StopAndCancel(hardStopCtx)
if err != nil && errors.Is(err, context.DeadlineExceeded) {
log.Info().Msg("hard stop timeout; ignoring stop procedure and exiting unsafely")
} else if err != nil {
log.Panic().Err(err).Msg("hard stop failed")
}
}()
<-client.Stopped()
<-rc.Stopped()

return nil
}
Expand Down
186 changes: 186 additions & 0 deletions pkg/riverqueue/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package riverqueue

import (
"context"
"fmt"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertype"
"github.com/rs/zerolog/log"
)

// JobClient is an interface for the river client to insert jobs
// this interface is only used for inserting new jobs and will not contain any other methods
type JobClient interface {
// InsertMany inserts many jobs at once. Each job is inserted as an InsertManyParams tuple, which takes job args along with an optional set of insert options, which override insert options provided
// by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout.
InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)
// InsertManyTx inserts many jobs at once. Each job is inserted as an InsertManyParams tuple, which takes job args along with an optional set of insert options, which override insert options provided
// by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout.
InsertManyTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)
// Insert inserts a new job with the provided args. Job opts can be used to override any defaults that may have been provided by an implementation of JobArgsWithInsertOpts.InsertOpts,
// as well as any global defaults. The provided context is used for the underlying Postgres insert and can be used to cancel the operation or apply a timeout.
Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
// InsertTx inserts a new job with the provided args on the given transaction. Job opts can be used to override any defaults that may have been provided by an implementation of JobArgsWithInsertOpts.InsertOpts,
// as well as any global defaults. The provided context is used for the underlying Postgres insert and can be used to cancel the operation or apply a timeout.
InsertTx(ctx context.Context, tx pgx.Tx, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
// InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism, making the operation quite fast and memory efficient. Each job is inserted as an InsertManyParams tuple,
// which takes job args along with an optional set of insert options, which override insert options provided by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults.
// The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout.
InsertManyFast(ctx context.Context, params []river.InsertManyParams) (int, error)
// InsertManyTx inserts many jobs at once using Postgres' `COPY FROM` mechanism, making the operation quite fast and memory efficient. Each job is inserted as an InsertManyParams tuple,
// which takes job args along with an optional set of insert options, which override insert options provided by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults.
// The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout.
InsertManyFastTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) (int, error)
// JobCancel cancels the job with the given ID. If possible, the job is cancelled immediately and will not be retried.
// The provided context is used for the underlying Postgres update and can be used to cancel the operation or apply a timeout.
JobCancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error)
// JobCancelTx cancels the job with the given ID within the specified transaction. This variant lets a caller cancel a job atomically alongside other database changes.
// A cancelled job doesn't take effect until the transaction commits, and if the transaction rolls back, so too is the cancelled job.
JobCancelTx(ctx context.Context, tx pgx.Tx, jobID int64) (*rivertype.JobRow, error)

// GetPool returns the underlying pgx pool
GetPool() *pgxpool.Pool
// TruncateRiverTables truncates River tables in the target database. This is for test cleanup and should obviously only be used in tests.
TruncateRiverTables(ctx context.Context) error
// GetRiverClient returns the underlying river client
// this can be used to interact directly with the river client for more advanced use cases (e.g. starting the river server)
// which are outside the scope of the insert-only client interface
GetRiverClient() *river.Client[pgx.Tx]
}

// Config settings for the river client
type Config struct {
// ConnectionURI is the connection URI for the database
ConnectionURI string `koanf:"connectionURI" json:"connectionURI" default:""`
// RunMigrations is a flag to determine if migrations should be run
RunMigrations bool `koanf:"runMigrations" json:"runMigrations" default:"false"`
// RiverConf is the river configuration
RiverConf river.Config `koanf:"riverConf" json:"riverConf"`
}

// Client is a river Client that implements the JobClient interface
type Client struct {
config Config

pool *pgxpool.Pool

// riverClient is the river client that is used to interact with the river server
// using the pgx driver
riverClient *river.Client[pgx.Tx]
}

// ensure the client implements the JobClient interface
var _ JobClient = &Client{}

// New creates a new river client with the options provided
func New(ctx context.Context, opts ...Option) (c *Client, err error) {
// Initialize the Client struct
c = &Client{}

// apply the options to the client
for _, opt := range opts {
opt(c)
}

if c.config.ConnectionURI == "" {
return nil, ErrConnectionURIRequired
}

// create a new river client with the given connection URI
c.pool, err = pgxpool.New(ctx, c.config.ConnectionURI)
if err != nil {
log.Error().Err(err).Msg("error creating job queue database connection")
return nil, err
}

// run migrations if the flag is set
if c.config.RunMigrations {
if err := RunMigrations(ctx, c.pool); err != nil {
log.Error().Err(err).Msg("error running migrations")
return nil, err
}
}

// create a new river client with the given connection URI
c.riverClient, err = river.NewClient(riverpgxv5.New(c.pool), &c.config.RiverConf)
if err != nil {
log.Error().Err(err).Msg("error creating river client")
return nil, err
}

return c, nil
}

// GetPool returns the underlying pgx pool
func (c *Client) GetPool() *pgxpool.Pool {
return c.pool
}

// GetRiverClient returns the underlying river client
func (c *Client) GetRiverClient() *river.Client[pgx.Tx] {
return c.riverClient
}

// Insert satisfies the JobClient interface
func (c *Client) Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error) {
return c.riverClient.Insert(ctx, args, opts)
}

// InsertMany satisfies the JobClient interface
func (c *Client) InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error) {
return c.riverClient.InsertMany(ctx, params)
}

// InsertManyTx satisfies the JobClient interface
func (c *Client) InsertManyTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error) {
return c.riverClient.InsertManyTx(ctx, tx, params)
}

// InsertTx satisfies the JobClient interface
func (c *Client) InsertTx(ctx context.Context, tx pgx.Tx, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error) {
return c.riverClient.InsertTx(ctx, tx, args, opts)
}

// InsertManyFast satisfies the JobClient interface
func (c *Client) InsertManyFast(ctx context.Context, params []river.InsertManyParams) (int, error) {
return c.riverClient.InsertManyFast(ctx, params)
}

// InsertManyFastTx satisfies the JobClient interface
func (c *Client) InsertManyFastTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) (int, error) {
return c.riverClient.InsertManyFastTx(ctx, tx, params)
}

// JobCancel satisfies the JobClient interface
func (c *Client) JobCancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error) {
return c.riverClient.JobCancel(ctx, jobID)
}

// JobCancelTx satisfies the JobClient interface
func (c *Client) JobCancelTx(ctx context.Context, tx pgx.Tx, jobID int64) (*rivertype.JobRow, error) {
return c.riverClient.JobCancelTx(ctx, tx, jobID)
}

// TruncateRiverTables truncates River tables in the target database. This is
// for test cleanup and should obviously only be used in tests.
func (c *Client) TruncateRiverTables(ctx context.Context) error {
pool := c.GetPool()

ctx, cancel := context.WithTimeout(ctx, 10*time.Second) // nolint:mnd
defer cancel()

tables := []string{"river_job", "river_leader", "river_queue"}

for _, table := range tables {
if _, err := pool.Exec(ctx, fmt.Sprintf("TRUNCATE TABLE %s;", table)); err != nil {
return fmt.Errorf("error truncating %q: %w", table, err)
}
}

return nil
}
2 changes: 2 additions & 0 deletions pkg/riverqueue/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// package riverqueue is an insert-only wrapper for the river client
package riverqueue
5 changes: 5 additions & 0 deletions pkg/riverqueue/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package riverqueue

import "errors"

var ErrConnectionURIRequired = errors.New("connection URI is required to initialize the client")
Loading

0 comments on commit dcfc2b3

Please sign in to comment.