Skip to content

Commit

Permalink
Use connection pooling for Postgres in Job Service database (BATCH-17…
Browse files Browse the repository at this point in the history
…0) (#2378)

* Start a Postgres container for tests-e2e-airflow Make target.

* Revert "Start a Postgres container for tests-e2e-airflow Make target."

This reverts commit b9b406c.

* Use pgx connection pooling on Postgres (BATCH-170)

Use the pgx.Pool facility to use connection pooling for the
Job Service database when configured to use Postgres for the
JS service. Split out the Postgresql and SQLite implementations
and mask behind a common SQLJobService interface definition.
Currently, all unit tests function correctly when using Postgres.

* Get Postgres and SQLite configs working in unit tests

* Add new context param to various func invocations.

* Also remove the test.db-shm and test.db-wal test files for SQLite

* golangci-lint fixes

* Add short sleep for CI Integration tests

To avoid the "no executors available" error, when containers
are not fully up.

* Try a 2-minute sleep before running e2e tests after cluster build.

* Remove debugging comment.

* Add error object to NewSQLJobService() constructor

Improve error-handling when constructing new database connection,
do not just log errors, but pass them up to callers. Render a
useful error message if the database type is not valid. Thanks
to @suprjinx for PR suggestions.
  • Loading branch information
richscott authored Apr 18, 2023
1 parent cd4d2f3 commit a08830b
Show file tree
Hide file tree
Showing 15 changed files with 1,134 additions and 783 deletions.
6 changes: 3 additions & 3 deletions config/jobservice/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ databaseType: "postgres"
databasePath: "/var/jobservice.db"
# Connection details when using database type 'postgres'
postgresConfig:
maxOpenConns: 50
maxIdleConns: 10
connMaxLifetime: 30m
poolMaxConns: 50
poolMinConns: 10
poolMaxConnLifetime: 30m
connection:
host: postgres
port: 5432
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ require (
github.com/go-playground/validator/v10 v10.11.1
github.com/golang/mock v1.6.0
github.com/goreleaser/goreleaser v1.11.5
github.com/jackc/pgx/v5 v5.3.1
github.com/jessevdk/go-flags v1.5.0
github.com/magefile/mage v1.14.0
github.com/openconfig/goyang v1.2.0
Expand Down Expand Up @@ -143,8 +144,9 @@ require (
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/jackc/puddle/v2 v2.2.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
Expand Down Expand Up @@ -190,7 +192,7 @@ require (
github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036 // indirect
go.mongodb.org/mongo-driver v1.10.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/term v0.5.0 // indirect
Expand Down
10 changes: 8 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,9 @@ github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwX
github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgproto3/v2 v2.3.1 h1:nwj7qwf0S+Q7ISFfBndqeLwSwxs+4DPsbRFjECT1Y4Y=
github.com/jackc/pgproto3/v2 v2.3.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg=
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg=
github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc=
github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw=
Expand All @@ -519,11 +520,15 @@ github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQ
github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs=
github.com/jackc/pgx/v4 v4.17.2 h1:0Ut0rpeKwvIVbMQ1KbMBU4h6wxehBI535LK6Flheh8E=
github.com/jackc/pgx/v4 v4.17.2/go.mod h1:lcxIZN44yMIrWI78a5CpucdD14hX0SBDbNRvjDBItsw=
github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU=
github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8=
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.3.0 h1:eHK/5clGOatcjX3oWGBO/MpxpbHzSwud5EWTSCI+MX0=
github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk=
github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
Expand Down Expand Up @@ -957,8 +962,9 @@ golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down
62 changes: 14 additions & 48 deletions internal/jobservice/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,14 @@ package jobservice

import (
"context"
"database/sql"
"fmt"
"net"
"os"
"path/filepath"
"time"

_ "github.com/jackc/pgx/v4/stdlib"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

"github.com/armadaproject/armada/internal/common/auth/authorization"
"github.com/armadaproject/armada/internal/common/database"
grpcCommon "github.com/armadaproject/armada/internal/common/grpc"
"github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/jobservice/configuration"
Expand Down Expand Up @@ -46,43 +41,12 @@ func (a *App) StartUp(ctx context.Context, config *configuration.JobServiceConfi
[]authorization.AuthService{&authorization.AnonymousAuthService{}},
)

var db *sql.DB

if config.DatabaseType == "postgres" {
var err error
log.Info("using postgres")
db, err = sql.Open("pgx", database.CreateConnectionString(config.PostgresConfig.Connection))
if err != nil {
return err
}
db.SetMaxOpenConns(config.PostgresConfig.MaxOpenConns)
db.SetMaxIdleConns(config.PostgresConfig.MaxIdleConns)
db.SetConnMaxLifetime(config.PostgresConfig.ConnMaxLifetime)

} else if config.DatabaseType == "sqlite" {
log.Info("using sqlite")
var err error

dbDir := filepath.Dir(config.DatabasePath)
if _, err := os.Stat(dbDir); os.IsNotExist(err) {
if errMkDir := os.Mkdir(dbDir, 0o755); errMkDir != nil {
log.Fatalf("error: could not make directory at %s for sqlite db: %v", dbDir, errMkDir)
}
}

db, err = sql.Open("sqlite", config.DatabasePath)
if err != nil {
log.Fatalf("error opening sqlite DB from %s %v", config.DatabasePath, err)
}
defer func() {
if err := db.Close(); err != nil {
log.Warnf("error closing database: %v", err)
}
}()
err, sqlJobRepo, dbCallbackFn := repository.NewSQLJobService(config, log)
if err != nil {
panic(err)
}

sqlJobRepo := repository.NewSQLJobService(config, db)
sqlJobRepo.Setup()
defer dbCallbackFn()
sqlJobRepo.Setup(ctx)
jobService := server.NewJobService(config, sqlJobRepo)
js.RegisterJobServiceServer(grpcServer, jobService)

Expand All @@ -92,14 +56,14 @@ func (a *App) StartUp(ctx context.Context, config *configuration.JobServiceConfi
}

g.Go(func() error {
PurgeJobSets(log, config.PurgeJobSetTime, sqlJobRepo)
PurgeJobSets(ctx, log, config.PurgeJobSetTime, sqlJobRepo)
return nil
})
g.Go(func() error {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {

jobSets, err := sqlJobRepo.GetSubscribedJobSets()
jobSets, err := sqlJobRepo.GetSubscribedJobSets(ctx)
log.Infof("job service has %d subscribed job sets", len(jobSets))
if err != nil {
logging.WithStacktrace(log, err).Warn("error getting jobsets")
Expand Down Expand Up @@ -136,26 +100,28 @@ func (a *App) StartUp(ctx context.Context, config *configuration.JobServiceConfi
return nil
}

func PurgeJobSets(log *log.Entry, purgeJobSetTime int64, sqlJobRepo *repository.SQLJobService) {
func PurgeJobSets(ctx context.Context, log *log.Entry, purgeJobSetTime int64,
sqlJobRepo repository.SQLJobService,
) {
log.Info("duration config: ", purgeJobSetTime)
ticker := time.NewTicker(time.Duration(purgeJobSetTime) * time.Second)
for range ticker.C {
jobSets, err := sqlJobRepo.GetSubscribedJobSets()
jobSets, err := sqlJobRepo.GetSubscribedJobSets(ctx)
if err != nil {
logging.WithStacktrace(log, err).Warn("error getting jobsets")
}
for _, value := range jobSets {
log.Infof("subscribed job sets : %s", value)
unsubscribe, err := sqlJobRepo.CheckToUnSubscribe(value.Queue, value.JobSet, purgeJobSetTime)
unsubscribe, err := sqlJobRepo.CheckToUnSubscribe(ctx, value.Queue, value.JobSet, purgeJobSetTime)
if err != nil {
log.WithError(err).Errorf("Unable to unsubscribe from queue/jobset %s/%s", value.Queue, value.JobSet)
}
if unsubscribe {
_, err := sqlJobRepo.CleanupJobSetAndJobs(value.Queue, value.JobSet)
_, err := sqlJobRepo.CleanupJobSetAndJobs(ctx, value.Queue, value.JobSet)
if err != nil {
logging.WithStacktrace(log, err).Warn("error cleaning up jobs")
}
_, err = sqlJobRepo.UnsubscribeJobSet(value.Queue, value.JobSet)
_, err = sqlJobRepo.UnsubscribeJobSet(ctx, value.Queue, value.JobSet)
if err != nil {
log.WithError(err).Errorf("unable to delete queue/jobset %s/%s", value.Queue, value.JobSet)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/jobservice/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
)

type PostgresConfig struct {
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
Connection map[string]string
PoolMaxOpenConns int
PoolMaxIdleConns int
PoolMaxConnLifetime time.Duration
Connection map[string]string
}

type JobServiceConfiguration struct {
Expand Down
41 changes: 19 additions & 22 deletions internal/jobservice/events/client_moq.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions internal/jobservice/eventstojobs/eventstojobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (eventToJobService *EventsToJobService) streamCommon(ctx context.Context, t
case <-ctx.Done():
return nil
case t := <-ticker.C:
jobSetFound, oldMessageId, err := eventToJobService.jobServiceRepository.IsJobSetSubscribed(eventToJobService.queue, eventToJobService.jobSetId)
jobSetFound, oldMessageId, err := eventToJobService.jobServiceRepository.IsJobSetSubscribed(ctx, eventToJobService.queue, eventToJobService.jobSetId)
if err != nil {
return errors.Errorf("unsubscribe jobsets: %v", err)
}
Expand Down Expand Up @@ -104,15 +104,15 @@ func (eventToJobService *EventsToJobService) streamCommon(ctx context.Context, t
if err != nil {
log.WithError(err).Error("could not obtain job set event message, retrying")
settingSubscribeErr := eventToJobService.jobServiceRepository.SetSubscriptionError(
eventToJobService.queue, eventToJobService.jobSetId, err.Error(), fromMessageId)
ctx, eventToJobService.queue, eventToJobService.jobSetId, err.Error(), fromMessageId)
if settingSubscribeErr != nil {
log.WithError(settingSubscribeErr).Error("could not set error field in job set table")
}
time.Sleep(5 * time.Second)
continue
}
errClear := eventToJobService.jobServiceRepository.AddMessageIdAndClearSubscriptionError(
eventToJobService.queue, eventToJobService.jobSetId, fromMessageId)
ctx, eventToJobService.queue, eventToJobService.jobSetId, fromMessageId)
if errClear != nil {
log.WithError(errClear).Error("could not clear subscription error from job set table")
}
Expand All @@ -123,7 +123,7 @@ func (eventToJobService *EventsToJobService) streamCommon(ctx context.Context, t
log.WithFields(requestFields).Infof("fromMessageId: %s JobId: %s State: %s", fromMessageId, currentJobId, jobStatus.GetState().String())
}
jobStatus := repository.NewJobStatus(eventToJobService.queue, eventToJobService.jobSetId, currentJobId, *jobStatus)
err := eventToJobService.jobServiceRepository.UpdateJobServiceDb(jobStatus)
err := eventToJobService.jobServiceRepository.UpdateJobServiceDb(ctx, jobStatus)
if err != nil {
log.WithError(err).Error("could not update job status, retrying")
time.Sleep(5 * time.Second)
Expand Down
20 changes: 10 additions & 10 deletions internal/jobservice/eventstojobs/eventstojobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func Test_SubscribeToJobSetId(t *testing.T) {
tests := []struct {
name string
jobEventMessageFn func(context.Context, *api.JobSetRequest) (*api.EventStreamMessage, error)
isJobSetSubscribedFn func(string, string) (bool, string, error)
isJobSetSubscribedFn func(context.Context, string, string) (bool, string, error)
ttlSecs int64
wantErr bool
wantSubscriptionErr bool
Expand All @@ -28,7 +28,7 @@ func Test_SubscribeToJobSetId(t *testing.T) {
jobEventMessageFn: func(context.Context, *api.JobSetRequest) (*api.EventStreamMessage, error) {
return &api.EventStreamMessage{Message: &api.EventMessage{}}, nil
},
isJobSetSubscribedFn: func(string, string) (bool, string, error) {
isJobSetSubscribedFn: func(context.Context, string, string) (bool, string, error) {
return true, "", nil
},
wantErr: true,
Expand All @@ -39,7 +39,7 @@ func Test_SubscribeToJobSetId(t *testing.T) {
jobEventMessageFn: func(context.Context, *api.JobSetRequest) (*api.EventStreamMessage, error) {
return &api.EventStreamMessage{Message: &api.EventMessage{}}, errors.New("some error")
},
isJobSetSubscribedFn: func(string, string) (bool, string, error) {
isJobSetSubscribedFn: func(context.Context, string, string) (bool, string, error) {
return true, "", nil
},
wantErr: true,
Expand All @@ -51,7 +51,7 @@ func Test_SubscribeToJobSetId(t *testing.T) {
jobEventMessageFn: func(context.Context, *api.JobSetRequest) (*api.EventStreamMessage, error) {
return &api.EventStreamMessage{Message: &api.EventMessage{}}, nil
},
isJobSetSubscribedFn: func(string, string) (bool, string, error) {
isJobSetSubscribedFn: func(context.Context, string, string) (bool, string, error) {
return false, "", nil
},
wantErr: false,
Expand All @@ -67,10 +67,10 @@ func Test_SubscribeToJobSetId(t *testing.T) {

mockJobRepo := repository.JobTableUpdaterMock{
IsJobSetSubscribedFunc: tt.isJobSetSubscribedFn,
SubscribeJobSetFunc: func(string, string, string) error { return nil },
AddMessageIdAndClearSubscriptionErrorFunc: func(string, string, string) error { return nil },
SetSubscriptionErrorFunc: func(string, string, string, string) error { return nil },
UnsubscribeJobSetFunc: func(string, string) (int64, error) { return 0, nil },
SubscribeJobSetFunc: func(context.Context, string, string, string) error { return nil },
AddMessageIdAndClearSubscriptionErrorFunc: func(context.Context, string, string, string) error { return nil },
SetSubscriptionErrorFunc: func(context.Context, string, string, string, string) error { return nil },
UnsubscribeJobSetFunc: func(context.Context, string, string) (int64, error) { return 0, nil },
}

service := eventstojobs.NewEventsToJobService(
Expand All @@ -87,10 +87,10 @@ func Test_SubscribeToJobSetId(t *testing.T) {
}
if tt.wantSubscriptionErr {
assert.True(t, len(mockJobRepo.SetSubscriptionErrorCalls()) > 0)
assert.Equal(t, 0, len(mockJobRepo.ClearSubscriptionErrorCalls()))
assert.Equal(t, 0, len(mockJobRepo.AddMessageIdAndClearSubscriptionErrorCalls()))
} else {
assert.Equal(t, 0, len(mockJobRepo.SetSubscriptionErrorCalls()))
assert.True(t, len(mockJobRepo.ClearSubscriptionErrorCalls()) > 0)
assert.True(t, len(mockJobRepo.AddMessageIdAndClearSubscriptionErrorCalls()) > 0)
}
})
}
Expand Down
Loading

0 comments on commit a08830b

Please sign in to comment.