diff --git a/config/jobservice/config.yaml b/config/jobservice/config.yaml index e9e39351ded..bc5a6133d59 100644 --- a/config/jobservice/config.yaml +++ b/config/jobservice/config.yaml @@ -9,9 +9,9 @@ databaseType: "postgres" databasePath: "/var/jobservice.db" # Connection details when using database type 'postgres' postgresConfig: - maxOpenConns: 50 - maxIdleConns: 10 - connMaxLifetime: 30m + poolMaxConns: 50 + poolMinConns: 10 + poolMaxConnLifetime: 30m connection: host: postgres port: 5432 diff --git a/go.mod b/go.mod index 200f666cf87..41bf4369602 100644 --- a/go.mod +++ b/go.mod @@ -85,6 +85,7 @@ require ( github.com/go-playground/validator/v10 v10.11.1 github.com/golang/mock v1.6.0 github.com/goreleaser/goreleaser v1.11.5 + github.com/jackc/pgx/v5 v5.3.1 github.com/jessevdk/go-flags v1.5.0 github.com/magefile/mage v1.14.0 github.com/openconfig/goyang v1.2.0 @@ -143,8 +144,9 @@ require ( github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgproto3/v2 v2.3.1 // indirect - github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/puddle v1.3.0 // indirect + github.com/jackc/puddle/v2 v2.2.0 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.0.0 // indirect @@ -190,7 +192,7 @@ require ( github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036 // indirect go.mongodb.org/mongo-driver v1.10.0 // indirect go.uber.org/atomic v1.9.0 // indirect - golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect + golang.org/x/crypto v0.6.0 // indirect golang.org/x/mod v0.7.0 // indirect golang.org/x/sys v0.5.0 // indirect golang.org/x/term v0.5.0 // indirect diff --git a/go.sum b/go.sum index e27a282aacf..2c3f70c64ae 100644 --- a/go.sum +++ b/go.sum @@ -504,8 +504,9 @@ github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwX github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.3.1 h1:nwj7qwf0S+Q7ISFfBndqeLwSwxs+4DPsbRFjECT1Y4Y= github.com/jackc/pgproto3/v2 v2.3.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= @@ -519,11 +520,15 @@ github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQ github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= github.com/jackc/pgx/v4 v4.17.2 h1:0Ut0rpeKwvIVbMQ1KbMBU4h6wxehBI535LK6Flheh8E= github.com/jackc/pgx/v4 v4.17.2/go.mod h1:lcxIZN44yMIrWI78a5CpucdD14hX0SBDbNRvjDBItsw= +github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU= +github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.3.0 h1:eHK/5clGOatcjX3oWGBO/MpxpbHzSwud5EWTSCI+MX0= github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk= +github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk= github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= @@ -957,8 +962,9 @@ golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= diff --git a/internal/jobservice/application.go b/internal/jobservice/application.go index 1290f1c17b1..8824bf1d98f 100644 --- a/internal/jobservice/application.go +++ b/internal/jobservice/application.go @@ -2,19 +2,14 @@ package jobservice import ( "context" - "database/sql" "fmt" "net" - "os" - "path/filepath" "time" - _ "github.com/jackc/pgx/v4/stdlib" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" "github.com/armadaproject/armada/internal/common/auth/authorization" - "github.com/armadaproject/armada/internal/common/database" grpcCommon "github.com/armadaproject/armada/internal/common/grpc" "github.com/armadaproject/armada/internal/common/logging" "github.com/armadaproject/armada/internal/jobservice/configuration" @@ -46,43 +41,12 @@ func (a *App) StartUp(ctx context.Context, config *configuration.JobServiceConfi []authorization.AuthService{&authorization.AnonymousAuthService{}}, ) - var db *sql.DB - - if config.DatabaseType == "postgres" { - var err error - log.Info("using postgres") - db, err = sql.Open("pgx", database.CreateConnectionString(config.PostgresConfig.Connection)) - if err != nil { - return err - } - db.SetMaxOpenConns(config.PostgresConfig.MaxOpenConns) - db.SetMaxIdleConns(config.PostgresConfig.MaxIdleConns) - db.SetConnMaxLifetime(config.PostgresConfig.ConnMaxLifetime) - - } else if config.DatabaseType == "sqlite" { - log.Info("using sqlite") - var err error - - dbDir := filepath.Dir(config.DatabasePath) - if _, err := os.Stat(dbDir); os.IsNotExist(err) { - if errMkDir := os.Mkdir(dbDir, 0o755); errMkDir != nil { - log.Fatalf("error: could not make directory at %s for sqlite db: %v", dbDir, errMkDir) - } - } - - db, err = sql.Open("sqlite", config.DatabasePath) - if err != nil { - log.Fatalf("error opening sqlite DB from %s %v", config.DatabasePath, err) - } - defer func() { - if err := db.Close(); err != nil { - log.Warnf("error closing database: %v", err) - } - }() + err, sqlJobRepo, dbCallbackFn := repository.NewSQLJobService(config, log) + if err != nil { + panic(err) } - - sqlJobRepo := repository.NewSQLJobService(config, db) - sqlJobRepo.Setup() + defer dbCallbackFn() + sqlJobRepo.Setup(ctx) jobService := server.NewJobService(config, sqlJobRepo) js.RegisterJobServiceServer(grpcServer, jobService) @@ -92,14 +56,14 @@ func (a *App) StartUp(ctx context.Context, config *configuration.JobServiceConfi } g.Go(func() error { - PurgeJobSets(log, config.PurgeJobSetTime, sqlJobRepo) + PurgeJobSets(ctx, log, config.PurgeJobSetTime, sqlJobRepo) return nil }) g.Go(func() error { ticker := time.NewTicker(10 * time.Second) for range ticker.C { - jobSets, err := sqlJobRepo.GetSubscribedJobSets() + jobSets, err := sqlJobRepo.GetSubscribedJobSets(ctx) log.Infof("job service has %d subscribed job sets", len(jobSets)) if err != nil { logging.WithStacktrace(log, err).Warn("error getting jobsets") @@ -136,26 +100,28 @@ func (a *App) StartUp(ctx context.Context, config *configuration.JobServiceConfi return nil } -func PurgeJobSets(log *log.Entry, purgeJobSetTime int64, sqlJobRepo *repository.SQLJobService) { +func PurgeJobSets(ctx context.Context, log *log.Entry, purgeJobSetTime int64, + sqlJobRepo repository.SQLJobService, +) { log.Info("duration config: ", purgeJobSetTime) ticker := time.NewTicker(time.Duration(purgeJobSetTime) * time.Second) for range ticker.C { - jobSets, err := sqlJobRepo.GetSubscribedJobSets() + jobSets, err := sqlJobRepo.GetSubscribedJobSets(ctx) if err != nil { logging.WithStacktrace(log, err).Warn("error getting jobsets") } for _, value := range jobSets { log.Infof("subscribed job sets : %s", value) - unsubscribe, err := sqlJobRepo.CheckToUnSubscribe(value.Queue, value.JobSet, purgeJobSetTime) + unsubscribe, err := sqlJobRepo.CheckToUnSubscribe(ctx, value.Queue, value.JobSet, purgeJobSetTime) if err != nil { log.WithError(err).Errorf("Unable to unsubscribe from queue/jobset %s/%s", value.Queue, value.JobSet) } if unsubscribe { - _, err := sqlJobRepo.CleanupJobSetAndJobs(value.Queue, value.JobSet) + _, err := sqlJobRepo.CleanupJobSetAndJobs(ctx, value.Queue, value.JobSet) if err != nil { logging.WithStacktrace(log, err).Warn("error cleaning up jobs") } - _, err = sqlJobRepo.UnsubscribeJobSet(value.Queue, value.JobSet) + _, err = sqlJobRepo.UnsubscribeJobSet(ctx, value.Queue, value.JobSet) if err != nil { log.WithError(err).Errorf("unable to delete queue/jobset %s/%s", value.Queue, value.JobSet) } diff --git a/internal/jobservice/configuration/types.go b/internal/jobservice/configuration/types.go index 84e24bc18fc..7a56a1f9c95 100644 --- a/internal/jobservice/configuration/types.go +++ b/internal/jobservice/configuration/types.go @@ -8,10 +8,10 @@ import ( ) type PostgresConfig struct { - MaxOpenConns int - MaxIdleConns int - ConnMaxLifetime time.Duration - Connection map[string]string + PoolMaxOpenConns int + PoolMaxIdleConns int + PoolMaxConnLifetime time.Duration + Connection map[string]string } type JobServiceConfiguration struct { diff --git a/internal/jobservice/events/client_moq.go b/internal/jobservice/events/client_moq.go index a6f1101bbc7..c292368f10a 100644 --- a/internal/jobservice/events/client_moq.go +++ b/internal/jobservice/events/client_moq.go @@ -16,25 +16,25 @@ var _ JobEventReader = &JobEventReaderMock{} // JobEventReaderMock is a mock implementation of JobEventReader. // -// func TestSomethingThatUsesJobEventReader(t *testing.T) { +// func TestSomethingThatUsesJobEventReader(t *testing.T) { // -// // make and configure a mocked JobEventReader -// mockedJobEventReader := &JobEventReaderMock{ -// CloseFunc: func() { -// panic("mock out the Close method") -// }, -// GetJobEventMessageFunc: func(ctx context.Context, jobReq *api.JobSetRequest) (*api.EventStreamMessage, error) { -// panic("mock out the GetJobEventMessage method") -// }, -// HealthFunc: func(ctx context.Context, empty *types.Empty) (*api.HealthCheckResponse, error) { -// panic("mock out the Health method") -// }, -// } +// // make and configure a mocked JobEventReader +// mockedJobEventReader := &JobEventReaderMock{ +// CloseFunc: func() { +// panic("mock out the Close method") +// }, +// GetJobEventMessageFunc: func(ctx context.Context, jobReq *api.JobSetRequest) (*api.EventStreamMessage, error) { +// panic("mock out the GetJobEventMessage method") +// }, +// HealthFunc: func(ctx context.Context, empty *types.Empty) (*api.HealthCheckResponse, error) { +// panic("mock out the Health method") +// }, +// } // -// // use mockedJobEventReader in code that requires JobEventReader -// // and then make assertions. +// // use mockedJobEventReader in code that requires JobEventReader +// // and then make assertions. // -// } +// } type JobEventReaderMock struct { // CloseFunc mocks the Close method. CloseFunc func() @@ -85,8 +85,7 @@ func (mock *JobEventReaderMock) Close() { // CloseCalls gets all the calls that were made to Close. // Check the length with: -// -// len(mockedJobEventReader.CloseCalls()) +// len(mockedJobEventReader.CloseCalls()) func (mock *JobEventReaderMock) CloseCalls() []struct { } { var calls []struct { @@ -117,8 +116,7 @@ func (mock *JobEventReaderMock) GetJobEventMessage(ctx context.Context, jobReq * // GetJobEventMessageCalls gets all the calls that were made to GetJobEventMessage. // Check the length with: -// -// len(mockedJobEventReader.GetJobEventMessageCalls()) +// len(mockedJobEventReader.GetJobEventMessageCalls()) func (mock *JobEventReaderMock) GetJobEventMessageCalls() []struct { Ctx context.Context JobReq *api.JobSetRequest @@ -153,8 +151,7 @@ func (mock *JobEventReaderMock) Health(ctx context.Context, empty *types.Empty) // HealthCalls gets all the calls that were made to Health. // Check the length with: -// -// len(mockedJobEventReader.HealthCalls()) +// len(mockedJobEventReader.HealthCalls()) func (mock *JobEventReaderMock) HealthCalls() []struct { Ctx context.Context Empty *types.Empty diff --git a/internal/jobservice/eventstojobs/eventstojobs.go b/internal/jobservice/eventstojobs/eventstojobs.go index 9e1bba19161..62ebffe3b14 100644 --- a/internal/jobservice/eventstojobs/eventstojobs.go +++ b/internal/jobservice/eventstojobs/eventstojobs.go @@ -57,7 +57,7 @@ func (eventToJobService *EventsToJobService) streamCommon(ctx context.Context, t case <-ctx.Done(): return nil case t := <-ticker.C: - jobSetFound, oldMessageId, err := eventToJobService.jobServiceRepository.IsJobSetSubscribed(eventToJobService.queue, eventToJobService.jobSetId) + jobSetFound, oldMessageId, err := eventToJobService.jobServiceRepository.IsJobSetSubscribed(ctx, eventToJobService.queue, eventToJobService.jobSetId) if err != nil { return errors.Errorf("unsubscribe jobsets: %v", err) } @@ -104,7 +104,7 @@ func (eventToJobService *EventsToJobService) streamCommon(ctx context.Context, t if err != nil { log.WithError(err).Error("could not obtain job set event message, retrying") settingSubscribeErr := eventToJobService.jobServiceRepository.SetSubscriptionError( - eventToJobService.queue, eventToJobService.jobSetId, err.Error(), fromMessageId) + ctx, eventToJobService.queue, eventToJobService.jobSetId, err.Error(), fromMessageId) if settingSubscribeErr != nil { log.WithError(settingSubscribeErr).Error("could not set error field in job set table") } @@ -112,7 +112,7 @@ func (eventToJobService *EventsToJobService) streamCommon(ctx context.Context, t continue } errClear := eventToJobService.jobServiceRepository.AddMessageIdAndClearSubscriptionError( - eventToJobService.queue, eventToJobService.jobSetId, fromMessageId) + ctx, eventToJobService.queue, eventToJobService.jobSetId, fromMessageId) if errClear != nil { log.WithError(errClear).Error("could not clear subscription error from job set table") } @@ -123,7 +123,7 @@ func (eventToJobService *EventsToJobService) streamCommon(ctx context.Context, t log.WithFields(requestFields).Infof("fromMessageId: %s JobId: %s State: %s", fromMessageId, currentJobId, jobStatus.GetState().String()) } jobStatus := repository.NewJobStatus(eventToJobService.queue, eventToJobService.jobSetId, currentJobId, *jobStatus) - err := eventToJobService.jobServiceRepository.UpdateJobServiceDb(jobStatus) + err := eventToJobService.jobServiceRepository.UpdateJobServiceDb(ctx, jobStatus) if err != nil { log.WithError(err).Error("could not update job status, retrying") time.Sleep(5 * time.Second) diff --git a/internal/jobservice/eventstojobs/eventstojobs_test.go b/internal/jobservice/eventstojobs/eventstojobs_test.go index 02d415321f4..db57019527e 100644 --- a/internal/jobservice/eventstojobs/eventstojobs_test.go +++ b/internal/jobservice/eventstojobs/eventstojobs_test.go @@ -17,7 +17,7 @@ func Test_SubscribeToJobSetId(t *testing.T) { tests := []struct { name string jobEventMessageFn func(context.Context, *api.JobSetRequest) (*api.EventStreamMessage, error) - isJobSetSubscribedFn func(string, string) (bool, string, error) + isJobSetSubscribedFn func(context.Context, string, string) (bool, string, error) ttlSecs int64 wantErr bool wantSubscriptionErr bool @@ -28,7 +28,7 @@ func Test_SubscribeToJobSetId(t *testing.T) { jobEventMessageFn: func(context.Context, *api.JobSetRequest) (*api.EventStreamMessage, error) { return &api.EventStreamMessage{Message: &api.EventMessage{}}, nil }, - isJobSetSubscribedFn: func(string, string) (bool, string, error) { + isJobSetSubscribedFn: func(context.Context, string, string) (bool, string, error) { return true, "", nil }, wantErr: true, @@ -39,7 +39,7 @@ func Test_SubscribeToJobSetId(t *testing.T) { jobEventMessageFn: func(context.Context, *api.JobSetRequest) (*api.EventStreamMessage, error) { return &api.EventStreamMessage{Message: &api.EventMessage{}}, errors.New("some error") }, - isJobSetSubscribedFn: func(string, string) (bool, string, error) { + isJobSetSubscribedFn: func(context.Context, string, string) (bool, string, error) { return true, "", nil }, wantErr: true, @@ -51,7 +51,7 @@ func Test_SubscribeToJobSetId(t *testing.T) { jobEventMessageFn: func(context.Context, *api.JobSetRequest) (*api.EventStreamMessage, error) { return &api.EventStreamMessage{Message: &api.EventMessage{}}, nil }, - isJobSetSubscribedFn: func(string, string) (bool, string, error) { + isJobSetSubscribedFn: func(context.Context, string, string) (bool, string, error) { return false, "", nil }, wantErr: false, @@ -67,10 +67,10 @@ func Test_SubscribeToJobSetId(t *testing.T) { mockJobRepo := repository.JobTableUpdaterMock{ IsJobSetSubscribedFunc: tt.isJobSetSubscribedFn, - SubscribeJobSetFunc: func(string, string, string) error { return nil }, - AddMessageIdAndClearSubscriptionErrorFunc: func(string, string, string) error { return nil }, - SetSubscriptionErrorFunc: func(string, string, string, string) error { return nil }, - UnsubscribeJobSetFunc: func(string, string) (int64, error) { return 0, nil }, + SubscribeJobSetFunc: func(context.Context, string, string, string) error { return nil }, + AddMessageIdAndClearSubscriptionErrorFunc: func(context.Context, string, string, string) error { return nil }, + SetSubscriptionErrorFunc: func(context.Context, string, string, string, string) error { return nil }, + UnsubscribeJobSetFunc: func(context.Context, string, string) (int64, error) { return 0, nil }, } service := eventstojobs.NewEventsToJobService( @@ -87,10 +87,10 @@ func Test_SubscribeToJobSetId(t *testing.T) { } if tt.wantSubscriptionErr { assert.True(t, len(mockJobRepo.SetSubscriptionErrorCalls()) > 0) - assert.Equal(t, 0, len(mockJobRepo.ClearSubscriptionErrorCalls())) + assert.Equal(t, 0, len(mockJobRepo.AddMessageIdAndClearSubscriptionErrorCalls())) } else { assert.Equal(t, 0, len(mockJobRepo.SetSubscriptionErrorCalls())) - assert.True(t, len(mockJobRepo.ClearSubscriptionErrorCalls()) > 0) + assert.True(t, len(mockJobRepo.AddMessageIdAndClearSubscriptionErrorCalls()) > 0) } }) } diff --git a/internal/jobservice/repository/postgres.go b/internal/jobservice/repository/postgres.go new file mode 100644 index 00000000000..4c129f7b7f1 --- /dev/null +++ b/internal/jobservice/repository/postgres.go @@ -0,0 +1,322 @@ +//go:generate moq -out sql_job_service_moq.go . JobTableUpdater +package repository + +import ( + "context" + "fmt" + "time" + + _ "modernc.org/sqlite" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/armadaproject/armada/internal/common/database" + "github.com/armadaproject/armada/internal/jobservice/configuration" + js "github.com/armadaproject/armada/pkg/api/jobservice" +) + +type JSRepoPostgres struct { + jobServiceConfig *configuration.JobServiceConfiguration + dbpool *pgxpool.Pool +} + +func NewJSRepoPostgres(cfg *configuration.JobServiceConfiguration, log *log.Entry) (error, *JSRepoPostgres, func()) { + poolCfg, err := pgxpool.ParseConfig(database.CreateConnectionString(cfg.PostgresConfig.Connection)) + if err != nil { + return errors.Wrap(err, "cannot parse Postgres connection config"), nil, func() {} + } + + pool, err := pgxpool.NewWithConfig(context.Background(), poolCfg) + if err != nil { + return errors.Wrap(err, "cannot create Postgres connection pool"), nil, func() {} + } + + return nil, &JSRepoPostgres{jobServiceConfig: cfg, dbpool: pool}, func() {} +} + +// Set up the DB for use, create tables +func (s *JSRepoPostgres) Setup(ctx context.Context) { + _, err := s.dbpool.Exec(ctx, "DROP TABLE IF EXISTS jobservice") + if err != nil { + panic(err) + } + _, err = s.dbpool.Exec(ctx, ` + CREATE TABLE jobservice ( + Queue TEXT, + JobSetId TEXT, + JobId TEXT, + JobResponseState TEXT, + JobResponseError TEXT, + Timestamp INTEGER, + PRIMARY KEY(JobId))`) + + if err != nil { + panic(err) + } + _, errIndex := s.dbpool.Exec(ctx, `CREATE INDEX idx_job_set_queue ON jobservice (Queue, JobSetId)`) + if errIndex != nil { + panic(errIndex) + } + _, err = s.dbpool.Exec(ctx, "DROP TABLE IF EXISTS jobsets") + if err != nil { + panic(err) + } + + _, err = s.dbpool.Exec(ctx, ` + CREATE TABLE jobsets ( + Queue TEXT, + JobSetId TEXT, + Timestamp INTEGER, + ConnectionError TEXT, + FromMessageId TEXT, + UNIQUE(Queue,JobSetId))`) + if err != nil { + panic(err) + } +} + +// Get the JobStatus given the jodId +func (s *JSRepoPostgres) GetJobStatus(ctx context.Context, jobId string) (*js.JobServiceResponse, error) { + sqlStmt := "SELECT Queue, JobSetId, JobResponseState, JobResponseError FROM jobservice WHERE JobId = $1" + + row := s.dbpool.QueryRow(ctx, sqlStmt, jobId) + var queue, jobSetId, jobState, jobError string + + err := row.Scan(&queue, &jobSetId, &jobState, &jobError) + + if err == pgx.ErrNoRows { + return &js.JobServiceResponse{State: js.JobServiceResponse_JOB_ID_NOT_FOUND}, nil + } else if err != nil { + return nil, err + } + + // indicate connnection error for jobset/queue subscription where present + connErr, err := s.GetSubscriptionError(ctx, queue, jobSetId) + if err != nil { + return nil, err + } + if connErr != "" { + return &js.JobServiceResponse{ + Error: connErr, + State: js.JobServiceResponse_CONNECTION_ERR, + }, nil + } + + jobJSRState, err := JobStateStrToJSRState(jobState) + if err != nil { + return nil, err + } + + return &js.JobServiceResponse{ + Error: jobError, + State: jobJSRState, + }, nil +} + +// Update database with JobTable. +func (s *JSRepoPostgres) UpdateJobServiceDb(ctx context.Context, jobTable *JobStatus) error { + sqlStmt := `INSERT INTO jobservice (Queue, JobSetId, JobId, JobResponseState, JobResponseError, Timestamp) + VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (JobId) DO UPDATE SET + (Queue, JobSetId, JobResponseState, JobResponseError, Timestamp) = + (excluded.Queue, excluded.JobSetId, excluded.JobResponseState, excluded.JobResponseError, excluded.Timestamp)` + + _, errExec := s.dbpool.Exec(ctx, sqlStmt, jobTable.queue, jobTable.jobSetId, jobTable.jobId, + jobTable.jobResponse.State.String(), jobTable.jobResponse.Error, jobTable.timeStamp) + return errExec +} + +func (s *JSRepoPostgres) UpdateJobSetDb(ctx context.Context, queue string, jobSet string, fromMessageId string) error { + subscribe, _, err := s.IsJobSetSubscribed(ctx, queue, jobSet) + if err != nil { + return err + } + if !subscribe { + return fmt.Errorf("queue %s jobSet %s is already unsubscribed", queue, jobSet) + } + + sqlStmt := `INSERT INTO jobsets (Queue, JobSetId, Timestamp, ConnectionError, FromMessageId) + VALUES ($1, $2, $3, $4, $5) ON CONFLICT (Queue, JobSetId) DO UPDATE SET + (Timestamp, ConnectionError, FromMessageId) = + (excluded.Timestamp, excluded.ConnectionError, excluded.FromMessageId)` + + _, jobSetErr := s.dbpool.Exec(ctx, sqlStmt, queue, jobSet, time.Now().Unix(), "", &fromMessageId) + if jobSetErr != nil { + return jobSetErr + } + return nil +} + +func (s *JSRepoPostgres) HealthCheck(ctx context.Context) (bool, error) { + row := s.dbpool.QueryRow(ctx, "SELECT 1") + var col int + err := row.Scan(&col) + if err == nil { + return true, nil + } else { + return false, fmt.Errorf("database health check failed: %v", err) + } +} + +// Check if JobSet is in our map. +func (s *JSRepoPostgres) IsJobSetSubscribed(ctx context.Context, queue string, jobSet string) (bool, string, error) { + sqlStmt := "SELECT Queue, JobSetId, FromMessageId FROM jobsets WHERE Queue = $1 AND JobSetId = $2" + row := s.dbpool.QueryRow(ctx, sqlStmt, queue, jobSet) + var queueScan, jobSetIdScan, fromMessageId string + + err := row.Scan(&queueScan, &jobSetIdScan, &fromMessageId) + + if err == pgx.ErrNoRows { + return false, "", nil + } else if err != nil { + return false, "", err + } + return true, fromMessageId, nil +} + +// Clear subscription error if present +func (s *JSRepoPostgres) AddMessageIdAndClearSubscriptionError(ctx context.Context, queue string, + jobSet string, fromMessageId string, +) error { + return s.SetSubscriptionError(ctx, queue, jobSet, "", fromMessageId) +} + +// Set subscription error if present +func (s *JSRepoPostgres) SetSubscriptionError(ctx context.Context, queue string, jobSet string, + connErr string, fromMessageId string, +) error { + sqlStmt := `INSERT INTO jobsets (Queue, JobSetId, Timestamp, ConnectionError, FromMessageId) + VALUES ($1, $2, $3, $4, $5) ON CONFLICT (Queue, JobSetId) DO UPDATE SET + (Timestamp, ConnectionError, FromMessageId) = + (excluded.Timestamp, excluded.ConnectionError, excluded.FromMessageId)` + + subscribeTable := NewSubscribeTable(queue, jobSet) + _, jobSetErr := s.dbpool.Exec(ctx, sqlStmt, subscribeTable.queue, jobSet, subscribeTable.lastRequestTimeStamp, + connErr, fromMessageId) + if jobSetErr != nil { + return jobSetErr + } + return jobSetErr +} + +// Get subscription error if present +func (s *JSRepoPostgres) GetSubscriptionError(ctx context.Context, queue string, jobSet string) (string, error) { + sqlStmt := "SELECT ConnectionError FROM jobsets WHERE Queue = $1 AND JobSetId = $2" + row := s.dbpool.QueryRow(ctx, sqlStmt, queue, jobSet) + var connError string + + err := row.Scan(&connError) + + if err == pgx.ErrNoRows { + return "", nil + } else if err != nil { + return "", err + } + return connError, nil +} + +// Mark our JobSet as being subscribed +// SubscribeTable contains Queue, JobSet and time when it was created. +func (s *JSRepoPostgres) SubscribeJobSet(ctx context.Context, queue string, jobSet string, + fromMessageId string, +) error { + sqlStmt := `INSERT INTO jobsets (Queue, JobSetId, Timestamp, ConnectionError, FromMessageId) + VALUES ($1, $2, $3, $4, $5) ON CONFLICT (Queue, JobSetId) DO UPDATE SET + (Timestamp, ConnectionError, FromMessageId) = + (excluded.Timestamp, excluded.ConnectionError, excluded.FromMessageId)` + + subscribeTable := NewSubscribeTable(queue, jobSet) + _, jobSetErr := s.dbpool.Exec(ctx, sqlStmt, subscribeTable.queue, subscribeTable.jobSet, + subscribeTable.lastRequestTimeStamp, "", fromMessageId) + return jobSetErr +} + +// UnSubscribe to JobSet and delete all the jobs in the database +func (s *JSRepoPostgres) CleanupJobSetAndJobs(ctx context.Context, queue string, jobSet string) (int64, error) { + _, errUnsubscribe := s.UnsubscribeJobSet(ctx, queue, jobSet) + if errUnsubscribe != nil { + return 0, errUnsubscribe + } + return s.DeleteJobsInJobSet(ctx, queue, jobSet) +} + +// Checks JobSet table to make determine if we should unsubscribe from JobSet +// configTimeWithoutUpdates is a configurable value that is read from the config +// We allow unsubscribing if the jobset hasn't been updated in configTime +// TODO implement this +func (s *JSRepoPostgres) CheckToUnSubscribe(ctx context.Context, queue string, jobSet string, + configTimeWithoutUpdates int64, +) (bool, error) { + jobSetFound, _, err := s.IsJobSetSubscribed(ctx, queue, jobSet) + if err != nil { + return false, nil + } + if !jobSetFound { + return false, nil + } + + sqlStmt := "SELECT Timestamp FROM jobsets WHERE Queue = $1 AND JobSetId = $2" + + row := s.dbpool.QueryRow(ctx, sqlStmt, queue, jobSet) + var timeStamp int + + timeErr := row.Scan(&timeStamp) + + if timeErr == pgx.ErrNoRows { + return false, nil + } else if err != nil { + return false, err + } + + currentTime := time.Now().Unix() + if (currentTime - configTimeWithoutUpdates) > int64(timeStamp) { + return true, nil + } + return false, nil +} + +func (s *JSRepoPostgres) UnsubscribeJobSet(ctx context.Context, queue, jobSet string) (int64, error) { + sqlStmt := "DELETE FROM jobsets WHERE Queue = $1 AND JobSetId = $2" + + result, err := s.dbpool.Exec(ctx, sqlStmt, queue, jobSet) + if err != nil { + return 0, err + } + return result.RowsAffected(), nil +} + +// Delete Jobs in the database +func (s *JSRepoPostgres) DeleteJobsInJobSet(ctx context.Context, queue string, jobSet string) (int64, error) { + sqlStmt := "DELETE FROM jobservice WHERE Queue = $1 AND JobSetId = $2" + + result, err := s.dbpool.Exec(ctx, sqlStmt, queue, jobSet) + if err != nil { + return 0, err + } + return result.RowsAffected(), nil +} + +func (s *JSRepoPostgres) GetSubscribedJobSets(ctx context.Context) ([]SubscribedTuple, error) { + rows, err := s.dbpool.Query(ctx, "SELECT Queue, JobSetId, FromMessageId FROM jobsets") + if err != nil { + return nil, err + } + defer rows.Close() + + var tuples []SubscribedTuple + + // Loop through rows, using Scan to assign column data to struct fields. + for rows.Next() { + var st SubscribedTuple + if err := rows.Scan(&st.Queue, &st.JobSet, &st.FromMessageId); err != nil { + return tuples, err + } + tuples = append(tuples, st) + } + if err = rows.Err(); err != nil { + return tuples, err + } + return tuples, nil +} diff --git a/internal/jobservice/repository/sql_job_service.go b/internal/jobservice/repository/sql_job_service.go index f83ee606dbe..f0d15e77c61 100644 --- a/internal/jobservice/repository/sql_job_service.go +++ b/internal/jobservice/repository/sql_job_service.go @@ -2,57 +2,56 @@ package repository import ( - "database/sql" + "context" + "errors" "fmt" - "sync" - "time" _ "modernc.org/sqlite" + log "github.com/sirupsen/logrus" + "github.com/armadaproject/armada/internal/jobservice/configuration" js "github.com/armadaproject/armada/pkg/api/jobservice" ) type JobTableUpdater interface { - SubscribeJobSet(queue string, jobSet string, fromMessageId string) error - IsJobSetSubscribed(queue string, jobSet string) (bool, string, error) - UpdateJobServiceDb(jobTable *JobStatus) error - UpdateJobSetDb(queue string, jobSet string, fromMessageId string) error - SetSubscriptionError(queue string, jobSet string, err string, fromMessageId string) error - GetSubscriptionError(queue string, jobSet string) (string, error) - AddMessageIdAndClearSubscriptionError(queue string, jobSet string, messageId string) error - UnsubscribeJobSet(queue string, jobSet string) (int64, error) + SubscribeJobSet(ctx context.Context, queue string, jobSet string, fromMessageId string) error + IsJobSetSubscribed(ctx context.Context, queue string, jobSet string) (bool, string, error) + UpdateJobServiceDb(ctx context.Context, jobTable *JobStatus) error + UpdateJobSetDb(ctx context.Context, queue string, jobSet string, fromMessageId string) error + SetSubscriptionError(ctx context.Context, queue string, jobSet string, err string, fromMessageId string) error + GetSubscriptionError(ctx context.Context, queue string, jobSet string) (string, error) + AddMessageIdAndClearSubscriptionError(ctx context.Context, queue string, jobSet string, messageId string) error + UnsubscribeJobSet(ctx context.Context, queue string, jobSet string) (int64, error) } // SQLJobService for persisting to DB. -type SQLJobService struct { - jobServiceConfig *configuration.JobServiceConfiguration - db *sql.DB - - lock sync.RWMutex -} - -func NewSQLJobService(config *configuration.JobServiceConfiguration, db *sql.DB) *SQLJobService { - return &SQLJobService{jobServiceConfig: config, db: db} -} - -// Call on a newly created SQLJobService object to setup the DB for use. -func (s *SQLJobService) Setup() { - if s.jobServiceConfig.DatabaseType == "sqlite" { - s.useWAL() - } - - s.CreateTable() -} - -func (s *SQLJobService) useWAL() { - s.lock.Lock() - defer s.lock.Unlock() - - _, err := s.db.Exec("PRAGMA journal_mode=WAL") - if err != nil { - panic(err) - } +type SQLJobService interface { + AddMessageIdAndClearSubscriptionError(ctx context.Context, queue string, jobSet string, fromMessageId string) error + CheckToUnSubscribe(ctx context.Context, queue string, jobSet string, configTimeWithoutUpdates int64) (bool, error) + CleanupJobSetAndJobs(ctx context.Context, queue string, jobSet string) (int64, error) + DeleteJobsInJobSet(ctx context.Context, queue string, jobSet string) (int64, error) + GetJobStatus(ctx context.Context, jobId string) (*js.JobServiceResponse, error) + GetSubscribedJobSets(ctx context.Context) ([]SubscribedTuple, error) + GetSubscriptionError(ctx context.Context, queue string, jobSet string) (string, error) + HealthCheck(ctx context.Context) (bool, error) + IsJobSetSubscribed(ctx context.Context, queue string, jobSet string) (bool, string, error) + SetSubscriptionError(ctx context.Context, queue string, jobSet string, connErr string, fromMessageId string) error + Setup(ctx context.Context) + SubscribeJobSet(ctx context.Context, queue string, jobSet string, fromMessageId string) error + UnsubscribeJobSet(ctx context.Context, queue, jobSet string) (int64, error) + UpdateJobServiceDb(ctx context.Context, jobTable *JobStatus) error + UpdateJobSetDb(ctx context.Context, queue string, jobSet string, fromMessageId string) error +} + +func NewSQLJobService(cfg *configuration.JobServiceConfiguration, log *log.Entry) (error, SQLJobService, func()) { + if cfg.DatabaseType == "postgres" { + return NewJSRepoPostgres(cfg, log) + } else if cfg.DatabaseType == "sqlite" { + return NewJSRepoSQLite(cfg, log) + } + + return errors.New("database type must be either 'postgres' or 'sqlite'"), nil, func() {} } type SubscribedTuple struct { @@ -61,108 +60,7 @@ type SubscribedTuple struct { FromMessageId string } -// Create a Table from a hard-coded schema. -func (s *SQLJobService) CreateTable() { - var integerType string - if s.jobServiceConfig.DatabaseType == "sqlite" { - integerType = "INT" - } else if s.jobServiceConfig.DatabaseType == "postgres" { - integerType = "INTEGER" - } - - if s.jobServiceConfig.DatabaseType == "sqlite" { - s.lock.Lock() - defer s.lock.Unlock() - } - - _, err := s.db.Exec("DROP TABLE IF EXISTS jobservice") - if err != nil { - panic(err) - } - _, err = s.db.Exec(fmt.Sprintf(` - CREATE TABLE jobservice ( - Queue TEXT, - JobSetId TEXT, - JobId TEXT, - JobResponseState TEXT, - JobResponseError TEXT, - Timestamp %s, - PRIMARY KEY(JobId))`, integerType)) - - if err != nil { - panic(err) - } - _, errIndex := s.db.Exec(`CREATE INDEX idx_job_set_queue ON jobservice (Queue, JobSetId)`) - if errIndex != nil { - panic(errIndex) - } - _, err = s.db.Exec("DROP TABLE IF EXISTS jobsets") - if err != nil { - panic(err) - } - - _, err = s.db.Exec(fmt.Sprintf(` - CREATE TABLE jobsets ( - Queue TEXT, - JobSetId TEXT, - Timestamp %s, - ConnectionError TEXT, - FromMessageId TEXT, - UNIQUE(Queue,JobSetId))`, integerType)) - if err != nil { - panic(err) - } -} - -// Get the JobStatus given the jodId -func (s *SQLJobService) GetJobStatus(jobId string) (*js.JobServiceResponse, error) { - if s.jobServiceConfig.DatabaseType == "sqlite" { - s.lock.Lock() - defer s.lock.Unlock() - } - - var sqlStmt string - if s.jobServiceConfig.DatabaseType == "sqlite" { - sqlStmt = "SELECT Queue, JobSetId, JobResponseState, JobResponseError FROM jobservice WHERE JobId = ?" - } else if s.jobServiceConfig.DatabaseType == "postgres" { - sqlStmt = "SELECT Queue, JobSetId, JobResponseState, JobResponseError FROM jobservice WHERE JobId = $1" - } - - row := s.db.QueryRow(sqlStmt, jobId) - var queue, jobSetId, jobState, jobError string - - err := row.Scan(&queue, &jobSetId, &jobState, &jobError) - - if err == sql.ErrNoRows { - return &js.JobServiceResponse{State: js.JobServiceResponse_JOB_ID_NOT_FOUND}, nil - } else if err != nil { - return nil, err - } - - // indicate connnection error for jobset/queue subscription where present - connErr, err := s.GetSubscriptionError(queue, jobSetId) - if err != nil { - return nil, err - } - if connErr != "" { - return &js.JobServiceResponse{ - Error: connErr, - State: js.JobServiceResponse_CONNECTION_ERR, - }, nil - } - - jobJSRState, err := jobStateStrToJSRState(jobState) - if err != nil { - return nil, err - } - - return &js.JobServiceResponse{ - Error: jobError, - State: jobJSRState, - }, nil -} - -func jobStateStrToJSRState(jobState string) (js.JobServiceResponse_State, error) { +func JobStateStrToJSRState(jobState string) (js.JobServiceResponse_State, error) { switch jobState { case "SUBMITTED": return js.JobServiceResponse_SUBMITTED, nil @@ -181,322 +79,5 @@ func jobStateStrToJSRState(jobState string) (js.JobServiceResponse_State, error) } return js.JobServiceResponse_JOB_ID_NOT_FOUND, - fmt.Errorf("jobStateStrToJSRState: invalid job state string '%s'", jobState) -} - -// Update database with JobTable. -func (s *SQLJobService) UpdateJobServiceDb(jobTable *JobStatus) error { - if s.jobServiceConfig.DatabaseType == "sqlite" { - // SQLite only allows one write at a time. Therefore we must serialize - // writes in order to avoid SQL_BUSY errors. - s.lock.Lock() - defer s.lock.Unlock() - } - - var sqlStmt string - - if s.jobServiceConfig.DatabaseType == "sqlite" { - sqlStmt = "INSERT OR REPLACE INTO jobservice VALUES (?, ?, ?, ?, ?, ?)" - } else if s.jobServiceConfig.DatabaseType == "postgres" { - sqlStmt = `INSERT INTO jobservice (Queue, JobSetId, JobId, JobResponseState, JobResponseError, Timestamp) - VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (JobId) DO UPDATE SET - (Queue, JobSetId, JobResponseState, JobResponseError, Timestamp) = - (excluded.Queue, excluded.JobSetId, excluded.JobResponseState, excluded.JobResponseError, excluded.Timestamp)` - } - - stmt, err := s.db.Prepare(sqlStmt) - if err != nil { - return err - } - defer stmt.Close() - _, errExec := stmt.Exec(jobTable.queue, jobTable.jobSetId, jobTable.jobId, - jobTable.jobResponse.State.String(), jobTable.jobResponse.Error, jobTable.timeStamp) - return errExec -} - -func (s *SQLJobService) UpdateJobSetDb(queue string, jobSet string, fromMessageId string) error { - subscribe, _, err := s.IsJobSetSubscribed(queue, jobSet) - if err != nil { - return err - } - if !subscribe { - return fmt.Errorf("queue %s jobSet %s is already unsubscribed", queue, jobSet) - } - if s.jobServiceConfig.DatabaseType == "sqlite" { - s.lock.Lock() - defer s.lock.Unlock() - } - - var sqlStmt string - if s.jobServiceConfig.DatabaseType == "sqlite" { - sqlStmt = "INSERT OR REPLACE INTO jobsets VALUES(?, ?, ?, ?, ?)" - } else if s.jobServiceConfig.DatabaseType == "postgres" { - sqlStmt = `INSERT INTO jobsets (Queue, JobSetId, Timestamp, ConnectionError, FromMessageId) - VALUES ($1, $2, $3, $4, $5) ON CONFLICT (Queue, JobSetId) DO UPDATE SET - (Timestamp, ConnectionError, FromMessageId) = - (excluded.Timestamp, excluded.ConnectionError, excluded.FromMessageId)` - } - - jobSetState, err := s.db.Prepare(sqlStmt) - if err != nil { - return err - } - defer jobSetState.Close() - _, jobSetErr := jobSetState.Exec(queue, jobSet, time.Now().Unix(), "", &fromMessageId) - if jobSetErr != nil { - return jobSetErr - } - return nil -} - -// Simple Health Check to Verify if SQLite is working. -func (s *SQLJobService) HealthCheck() (bool, error) { - if s.jobServiceConfig.DatabaseType == "sqlite" { - s.lock.Lock() - defer s.lock.Unlock() - } - - row := s.db.QueryRow("SELECT 1") - var col int - err := row.Scan(&col) - if err == nil { - return true, nil - } else { - return false, fmt.Errorf("SQL health check failed: %v", err) - } -} - -// Check if JobSet is in our map. -func (s *SQLJobService) IsJobSetSubscribed(queue string, jobSet string) (bool, string, error) { - if s.jobServiceConfig.DatabaseType == "sqlite" { - s.lock.Lock() - defer s.lock.Unlock() - } - - var sqlStmt string - - if s.jobServiceConfig.DatabaseType == "sqlite" { - sqlStmt = "SELECT Queue, JobSetId, FromMessageId FROM jobsets WHERE Queue = ? AND JobSetId = ?" - } else if s.jobServiceConfig.DatabaseType == "postgres" { - sqlStmt = "SELECT Queue, JobSetId, FromMessageId FROM jobsets WHERE Queue = $1 AND JobSetId = $2" - } - row := s.db.QueryRow(sqlStmt, queue, jobSet) - var queueScan, jobSetIdScan, fromMessageId string - - err := row.Scan(&queueScan, &jobSetIdScan, &fromMessageId) - - if err == sql.ErrNoRows { - return false, "", nil - } else if err != nil { - return false, "", err - } - return true, fromMessageId, nil -} - -// Clear subscription error if present -func (s *SQLJobService) AddMessageIdAndClearSubscriptionError(queue string, jobSet string, fromMessageId string) error { - return s.SetSubscriptionError(queue, jobSet, "", fromMessageId) -} - -// Set subscription error if present -func (s *SQLJobService) SetSubscriptionError(queue string, jobSet string, connErr string, fromMessageId string) error { - if s.jobServiceConfig.DatabaseType == "sqlite" { - s.lock.Lock() - defer s.lock.Unlock() - } - - var sqlStmt string - if s.jobServiceConfig.DatabaseType == "sqlite" { - sqlStmt = "INSERT OR REPLACE INTO jobsets VALUES(?, ?, ?, ?, ?)" - } else if s.jobServiceConfig.DatabaseType == "postgres" { - sqlStmt = `INSERT INTO jobsets (Queue, JobSetId, Timestamp, ConnectionError, FromMessageId) - VALUES ($1, $2, $3, $4, $5) ON CONFLICT (Queue, JobSetId) DO UPDATE SET - (Timestamp, ConnectionError, FromMessageId) = - (excluded.Timestamp, excluded.ConnectionError, excluded.FromMessageId)` - } - - jobSetState, err := s.db.Prepare(sqlStmt) - if err != nil { - return err - } - defer jobSetState.Close() - subscribeTable := NewSubscribeTable(queue, jobSet) - _, jobSetErr := jobSetState.Exec(subscribeTable.queue, jobSet, subscribeTable.lastRequestTimeStamp, - connErr, fromMessageId) - if jobSetErr != nil { - return jobSetErr - } - return jobSetErr -} - -// Get subscription error if present -func (s *SQLJobService) GetSubscriptionError(queue string, jobSet string) (string, error) { - var sqlStmt string - if s.jobServiceConfig.DatabaseType == "sqlite" { - sqlStmt = "SELECT ConnectionError FROM jobsets WHERE Queue = ? AND JobSetId = ?" - } else if s.jobServiceConfig.DatabaseType == "postgres" { - sqlStmt = "SELECT ConnectionError FROM jobsets WHERE Queue = $1 AND JobSetId = $2" - } - row := s.db.QueryRow(sqlStmt, queue, jobSet) - var connError string - - err := row.Scan(&connError) - - if err == sql.ErrNoRows { - return "", nil - } else if err != nil { - return "", err - } - return connError, nil -} - -// Mark our JobSet as being subscribed -// SubscribeTable contains Queue, JobSet and time when it was created. - -func (s *SQLJobService) SubscribeJobSet(queue string, jobSet string, fromMessageId string) error { - if s.jobServiceConfig.DatabaseType == "sqlite" { - s.lock.Lock() - defer s.lock.Unlock() - } - - var sqlStmt string - if s.jobServiceConfig.DatabaseType == "sqlite" { - sqlStmt = "INSERT OR REPLACE INTO jobsets VALUES(?, ?, ?, ?, ?)" - } else if s.jobServiceConfig.DatabaseType == "postgres" { - sqlStmt = `INSERT INTO jobsets (Queue, JobSetId, Timestamp, ConnectionError, FromMessageId) - VALUES ($1, $2, $3, $4, $5) ON CONFLICT (Queue, JobSetId) DO UPDATE SET - (Timestamp, ConnectionError, FromMessageId) = - (excluded.Timestamp, excluded.ConnectionError, excluded.FromMessageId)` - } - - jobSetState, err := s.db.Prepare(sqlStmt) - if err != nil { - return err - } - defer jobSetState.Close() - subscribeTable := NewSubscribeTable(queue, jobSet) - _, jobSetErr := jobSetState.Exec(subscribeTable.queue, subscribeTable.jobSet, - subscribeTable.lastRequestTimeStamp, "", fromMessageId) - return jobSetErr -} - -// UnSubscribe to JobSet and delete all the jobs in the database -func (s *SQLJobService) CleanupJobSetAndJobs(queue string, jobSet string) (int64, error) { - _, errUnsubscribe := s.UnsubscribeJobSet(queue, jobSet) - if errUnsubscribe != nil { - return 0, errUnsubscribe - } - return s.DeleteJobsInJobSet(queue, jobSet) -} - -// Checks JobSet table to make determine if we should unsubscribe from JobSet -// configTimeWithoutUpdates is a configurable value that is read from the config -// We allow unsubscribing if the jobset hasn't been updated in configTime -// TODO implement this -func (s *SQLJobService) CheckToUnSubscribe(queue string, jobSet string, configTimeWithoutUpdates int64) (bool, error) { - jobSetFound, _, err := s.IsJobSetSubscribed(queue, jobSet) - if err != nil { - return false, nil - } - if !jobSetFound { - return false, nil - } - - if s.jobServiceConfig.DatabaseType == "sqlite" { - s.lock.Lock() - defer s.lock.Unlock() - } - - var sqlStmt string - if s.jobServiceConfig.DatabaseType == "sqlite" { - sqlStmt = "SELECT Timestamp FROM jobsets WHERE Queue = ? AND JobSetId = ?" - } else if s.jobServiceConfig.DatabaseType == "postgres" { - sqlStmt = "SELECT Timestamp FROM jobsets WHERE Queue = $1 AND JobSetId = $2" - } - - row := s.db.QueryRow(sqlStmt, queue, jobSet) - var timeStamp int - - timeErr := row.Scan(&timeStamp) - - if timeErr == sql.ErrNoRows { - return false, nil - } else if err != nil { - return false, err - } - - currentTime := time.Now().Unix() - if (currentTime - configTimeWithoutUpdates) > int64(timeStamp) { - return true, nil - } - return false, nil -} - -func (s *SQLJobService) UnsubscribeJobSet(queue, jobSet string) (int64, error) { - if s.jobServiceConfig.DatabaseType == "sqlite" { - s.lock.Lock() - defer s.lock.Unlock() - } - - var sqlStmt string - if s.jobServiceConfig.DatabaseType == "sqlite" { - sqlStmt = "DELETE FROM jobsets WHERE Queue = ? AND JobSetId = ?" - } else if s.jobServiceConfig.DatabaseType == "postgres" { - sqlStmt = "DELETE FROM jobsets WHERE Queue = $1 AND JobSetId = $2" - } - - result, err := s.db.Exec(sqlStmt, queue, jobSet) - if err != nil { - return 0, err - } - return result.RowsAffected() -} - -// Delete Jobs in the database -func (s *SQLJobService) DeleteJobsInJobSet(queue string, jobSet string) (int64, error) { - if s.jobServiceConfig.DatabaseType == "sqlite" { - s.lock.Lock() - defer s.lock.Unlock() - } - - var sqlStmt string - if s.jobServiceConfig.DatabaseType == "sqlite" { - sqlStmt = "DELETE FROM jobservice WHERE Queue = ? AND JobSetId = ?" - } else if s.jobServiceConfig.DatabaseType == "postgres" { - sqlStmt = "DELETE FROM jobservice WHERE Queue = $1 AND JobSetId = $2" - } - - result, err := s.db.Exec(sqlStmt, queue, jobSet) - if err != nil { - return 0, err - } - return result.RowsAffected() -} - -func (s *SQLJobService) GetSubscribedJobSets() ([]SubscribedTuple, error) { - if s.jobServiceConfig.DatabaseType == "sqlite" { - s.lock.Lock() - defer s.lock.Unlock() - } - - rows, err := s.db.Query("SELECT Queue, JobSetId, FromMessageId FROM jobsets") - if err != nil { - return nil, err - } - defer rows.Close() - - var tuples []SubscribedTuple - - // Loop through rows, using Scan to assign column data to struct fields. - for rows.Next() { - var st SubscribedTuple - if err := rows.Scan(&st.Queue, &st.JobSet, &st.FromMessageId); err != nil { - return tuples, err - } - tuples = append(tuples, st) - } - if err = rows.Err(); err != nil { - return tuples, err - } - return tuples, nil + fmt.Errorf("JobStateStrToJSRState: invalid job state string '%s'", jobState) } diff --git a/internal/jobservice/repository/sql_job_service_moq.go b/internal/jobservice/repository/sql_job_service_moq.go index c669f69a97b..1f81417520f 100644 --- a/internal/jobservice/repository/sql_job_service_moq.go +++ b/internal/jobservice/repository/sql_job_service_moq.go @@ -4,6 +4,7 @@ package repository import ( + "context" "sync" ) @@ -17,24 +18,30 @@ var _ JobTableUpdater = &JobTableUpdaterMock{} // // // make and configure a mocked JobTableUpdater // mockedJobTableUpdater := &JobTableUpdaterMock{ -// ClearSubscriptionErrorFunc: func(queue string, jobSet string) { -// panic("mock out the ClearSubscriptionError method") +// AddMessageIdAndClearSubscriptionErrorFunc: func(ctx context.Context, queue string, jobSet string, messageId string) error { +// panic("mock out the AddMessageIdAndClearSubscriptionError method") // }, -// GetSubscriptionErrorFunc: func(queue string, jobSet string) string { +// GetSubscriptionErrorFunc: func(ctx context.Context, queue string, jobSet string) (string, error) { // panic("mock out the GetSubscriptionError method") // }, -// IsJobSetSubscribedFunc: func(queue string, jobSet string) bool { +// IsJobSetSubscribedFunc: func(ctx context.Context, queue string, jobSet string) (bool, string, error) { // panic("mock out the IsJobSetSubscribed method") // }, -// SetSubscriptionErrorFunc: func(queue string, jobSet string, err string) { +// SetSubscriptionErrorFunc: func(ctx context.Context, queue string, jobSet string, err string, fromMessageId string) error { // panic("mock out the SetSubscriptionError method") // }, -// SubscribeJobSetFunc: func(queue string, jobSet string) { +// SubscribeJobSetFunc: func(ctx context.Context, queue string, jobSet string, fromMessageId string) error { // panic("mock out the SubscribeJobSet method") // }, -// UpdateJobServiceDbFunc: func(jobStatus *JobStatus) error { +// UnsubscribeJobSetFunc: func(ctx context.Context, queue string, jobSet string) (int64, error) { +// panic("mock out the UnsubscribeJobSet method") +// }, +// UpdateJobServiceDbFunc: func(ctx context.Context, jobTable *JobStatus) error { // panic("mock out the UpdateJobServiceDb method") // }, +// UpdateJobSetDbFunc: func(ctx context.Context, queue string, jobSet string, fromMessageId string) error { +// panic("mock out the UpdateJobSetDb method") +// }, // } // // // use mockedJobTableUpdater in code that requires JobTableUpdater @@ -42,43 +49,47 @@ var _ JobTableUpdater = &JobTableUpdaterMock{} // // } type JobTableUpdaterMock struct { - // ClearSubscriptionErrorFunc mocks the ClearSubscriptionError method. - AddMessageIdAndClearSubscriptionErrorFunc func(queue string, jobSet string, fromMessageId string) error + // AddMessageIdAndClearSubscriptionErrorFunc mocks the AddMessageIdAndClearSubscriptionError method. + AddMessageIdAndClearSubscriptionErrorFunc func(ctx context.Context, queue string, jobSet string, messageId string) error // GetSubscriptionErrorFunc mocks the GetSubscriptionError method. - GetSubscriptionErrorFunc func(queue string, jobSet string) (string, error) + GetSubscriptionErrorFunc func(ctx context.Context, queue string, jobSet string) (string, error) // IsJobSetSubscribedFunc mocks the IsJobSetSubscribed method. - IsJobSetSubscribedFunc func(queue string, jobSet string) (bool, string, error) + IsJobSetSubscribedFunc func(ctx context.Context, queue string, jobSet string) (bool, string, error) // SetSubscriptionErrorFunc mocks the SetSubscriptionError method. - SetSubscriptionErrorFunc func(queue string, jobSet string, err string, fromMessageId string) error + SetSubscriptionErrorFunc func(ctx context.Context, queue string, jobSet string, err string, fromMessageId string) error // SubscribeJobSetFunc mocks the SubscribeJobSet method. - SubscribeJobSetFunc func(queue string, jobSet string, fromMessageId string) error + SubscribeJobSetFunc func(ctx context.Context, queue string, jobSet string, fromMessageId string) error - // UpdateJobServiceDbFunc mocks the UpdateJobServiceDb method. - UpdateJobServiceDbFunc func(jobStatus *JobStatus) error + // UnsubscribeJobSetFunc mocks the UnsubscribeJobSet method. + UnsubscribeJobSetFunc func(ctx context.Context, queue string, jobSet string) (int64, error) - // Mock for Unsubscribe job set - UnsubscribeJobSetFunc func(queue, jobSet string) (int64, error) + // UpdateJobServiceDbFunc mocks the UpdateJobServiceDb method. + UpdateJobServiceDbFunc func(ctx context.Context, jobTable *JobStatus) error - // Mock for UpdateJobSetDb - UpdateJobSetDbFunc func(queue, jobSet, fromMessageId string) error + // UpdateJobSetDbFunc mocks the UpdateJobSetDb method. + UpdateJobSetDbFunc func(ctx context.Context, queue string, jobSet string, fromMessageId string) error // calls tracks calls to the methods. calls struct { - // ClearSubscriptionError holds details about calls to the ClearSubscriptionError method. + // AddMessageIdAndClearSubscriptionError holds details about calls to the AddMessageIdAndClearSubscriptionError method. AddMessageIdAndClearSubscriptionError []struct { + // Ctx is the ctx argument value. + Ctx context.Context // Queue is the queue argument value. Queue string // JobSet is the jobSet argument value. JobSet string - // FromMessageId - FromMessageId string + // MessageId is the messageId argument value. + MessageId string } // GetSubscriptionError holds details about calls to the GetSubscriptionError method. GetSubscriptionError []struct { + // Ctx is the ctx argument value. + Ctx context.Context // Queue is the queue argument value. Queue string // JobSet is the jobSet argument value. @@ -86,6 +97,8 @@ type JobTableUpdaterMock struct { } // IsJobSetSubscribed holds details about calls to the IsJobSetSubscribed method. IsJobSetSubscribed []struct { + // Ctx is the ctx argument value. + Ctx context.Context // Queue is the queue argument value. Queue string // JobSet is the jobSet argument value. @@ -93,112 +106,128 @@ type JobTableUpdaterMock struct { } // SetSubscriptionError holds details about calls to the SetSubscriptionError method. SetSubscriptionError []struct { + // Ctx is the ctx argument value. + Ctx context.Context // Queue is the queue argument value. Queue string // JobSet is the jobSet argument value. JobSet string // Err is the err argument value. Err string - // FromMessageId is where job set subscription should start from + // FromMessageId is the fromMessageId argument value. FromMessageId string } // SubscribeJobSet holds details about calls to the SubscribeJobSet method. SubscribeJobSet []struct { + // Ctx is the ctx argument value. + Ctx context.Context // Queue is the queue argument value. Queue string // JobSet is the jobSet argument value. JobSet string - // FromMessageId is where job set subscription should start from + // FromMessageId is the fromMessageId argument value. FromMessageId string } - // UpdateJobServiceDb holds details about calls to the UpdateJobServiceDb method. - UpdateJobServiceDb []struct { - // JobStatus is the jobStatus argument value. - JobStatus *JobStatus - } - // UpdateJobSetDb - UpdateJobSetDb []struct { + // UnsubscribeJobSet holds details about calls to the UnsubscribeJobSet method. + UnsubscribeJobSet []struct { + // Ctx is the ctx argument value. + Ctx context.Context // Queue is the queue argument value. Queue string // JobSet is the jobSet argument value. JobSet string - // FromMessageId is where job set subscription should start from - FromMessageId string } - // Unsubscribe Jobset - UnsubscribeJobSet []struct { + // UpdateJobServiceDb holds details about calls to the UpdateJobServiceDb method. + UpdateJobServiceDb []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // JobTable is the jobTable argument value. + JobTable *JobStatus + } + // UpdateJobSetDb holds details about calls to the UpdateJobSetDb method. + UpdateJobSetDb []struct { + // Ctx is the ctx argument value. + Ctx context.Context // Queue is the queue argument value. Queue string // JobSet is the jobSet argument value. JobSet string + // FromMessageId is the fromMessageId argument value. + FromMessageId string } } - lockClearSubscriptionError sync.RWMutex - lockGetSubscriptionError sync.RWMutex - lockIsJobSetSubscribed sync.RWMutex - lockUnsubscribeJobSet sync.RWMutex - lockSetSubscriptionError sync.RWMutex - lockSubscribeJobSet sync.RWMutex - lockUpdateJobServiceDb sync.RWMutex - lockUpdateJobSetDb sync.RWMutex + lockAddMessageIdAndClearSubscriptionError sync.RWMutex + lockGetSubscriptionError sync.RWMutex + lockIsJobSetSubscribed sync.RWMutex + lockSetSubscriptionError sync.RWMutex + lockSubscribeJobSet sync.RWMutex + lockUnsubscribeJobSet sync.RWMutex + lockUpdateJobServiceDb sync.RWMutex + lockUpdateJobSetDb sync.RWMutex } -// ClearSubscriptionError calls ClearSubscriptionErrorFunc. -func (mock *JobTableUpdaterMock) AddMessageIdAndClearSubscriptionError(queue string, jobSet string, fromMessageId string) error { +// AddMessageIdAndClearSubscriptionError calls AddMessageIdAndClearSubscriptionErrorFunc. +func (mock *JobTableUpdaterMock) AddMessageIdAndClearSubscriptionError(ctx context.Context, queue string, jobSet string, messageId string) error { if mock.AddMessageIdAndClearSubscriptionErrorFunc == nil { panic("JobTableUpdaterMock.AddMessageIdAndClearSubscriptionErrorFunc: method is nil but JobTableUpdater.AddMessageIdAndClearSubscriptionError was just called") } callInfo := struct { - Queue string - JobSet string - FromMessageId string + Ctx context.Context + Queue string + JobSet string + MessageId string }{ - Queue: queue, - JobSet: jobSet, - FromMessageId: fromMessageId, + Ctx: ctx, + Queue: queue, + JobSet: jobSet, + MessageId: messageId, } - mock.lockClearSubscriptionError.Lock() + mock.lockAddMessageIdAndClearSubscriptionError.Lock() mock.calls.AddMessageIdAndClearSubscriptionError = append(mock.calls.AddMessageIdAndClearSubscriptionError, callInfo) - mock.lockClearSubscriptionError.Unlock() - return mock.AddMessageIdAndClearSubscriptionErrorFunc(queue, jobSet, fromMessageId) + mock.lockAddMessageIdAndClearSubscriptionError.Unlock() + return mock.AddMessageIdAndClearSubscriptionErrorFunc(ctx, queue, jobSet, messageId) } -// ClearSubscriptionErrorCalls gets all the calls that were made to ClearSubscriptionError. +// AddMessageIdAndClearSubscriptionErrorCalls gets all the calls that were made to AddMessageIdAndClearSubscriptionError. // Check the length with: // -// len(mockedJobTableUpdater.ClearSubscriptionErrorCalls()) -func (mock *JobTableUpdaterMock) ClearSubscriptionErrorCalls() []struct { - Queue string - JobSet string - FromMessageId string +// len(mockedJobTableUpdater.AddMessageIdAndClearSubscriptionErrorCalls()) +func (mock *JobTableUpdaterMock) AddMessageIdAndClearSubscriptionErrorCalls() []struct { + Ctx context.Context + Queue string + JobSet string + MessageId string } { var calls []struct { - Queue string - JobSet string - FromMessageId string + Ctx context.Context + Queue string + JobSet string + MessageId string } - mock.lockClearSubscriptionError.RLock() + mock.lockAddMessageIdAndClearSubscriptionError.RLock() calls = mock.calls.AddMessageIdAndClearSubscriptionError - mock.lockClearSubscriptionError.RUnlock() + mock.lockAddMessageIdAndClearSubscriptionError.RUnlock() return calls } // GetSubscriptionError calls GetSubscriptionErrorFunc. -func (mock *JobTableUpdaterMock) GetSubscriptionError(queue string, jobSet string) (string, error) { +func (mock *JobTableUpdaterMock) GetSubscriptionError(ctx context.Context, queue string, jobSet string) (string, error) { if mock.GetSubscriptionErrorFunc == nil { panic("JobTableUpdaterMock.GetSubscriptionErrorFunc: method is nil but JobTableUpdater.GetSubscriptionError was just called") } callInfo := struct { + Ctx context.Context Queue string JobSet string }{ + Ctx: ctx, Queue: queue, JobSet: jobSet, } mock.lockGetSubscriptionError.Lock() mock.calls.GetSubscriptionError = append(mock.calls.GetSubscriptionError, callInfo) mock.lockGetSubscriptionError.Unlock() - return mock.GetSubscriptionErrorFunc(queue, jobSet) + return mock.GetSubscriptionErrorFunc(ctx, queue, jobSet) } // GetSubscriptionErrorCalls gets all the calls that were made to GetSubscriptionError. @@ -206,10 +235,12 @@ func (mock *JobTableUpdaterMock) GetSubscriptionError(queue string, jobSet strin // // len(mockedJobTableUpdater.GetSubscriptionErrorCalls()) func (mock *JobTableUpdaterMock) GetSubscriptionErrorCalls() []struct { + Ctx context.Context Queue string JobSet string } { var calls []struct { + Ctx context.Context Queue string JobSet string } @@ -220,21 +251,23 @@ func (mock *JobTableUpdaterMock) GetSubscriptionErrorCalls() []struct { } // IsJobSetSubscribed calls IsJobSetSubscribedFunc. -func (mock *JobTableUpdaterMock) IsJobSetSubscribed(queue string, jobSet string) (bool, string, error) { +func (mock *JobTableUpdaterMock) IsJobSetSubscribed(ctx context.Context, queue string, jobSet string) (bool, string, error) { if mock.IsJobSetSubscribedFunc == nil { panic("JobTableUpdaterMock.IsJobSetSubscribedFunc: method is nil but JobTableUpdater.IsJobSetSubscribed was just called") } callInfo := struct { + Ctx context.Context Queue string JobSet string }{ + Ctx: ctx, Queue: queue, JobSet: jobSet, } mock.lockIsJobSetSubscribed.Lock() mock.calls.IsJobSetSubscribed = append(mock.calls.IsJobSetSubscribed, callInfo) mock.lockIsJobSetSubscribed.Unlock() - return mock.IsJobSetSubscribedFunc(queue, jobSet) + return mock.IsJobSetSubscribedFunc(ctx, queue, jobSet) } // IsJobSetSubscribedCalls gets all the calls that were made to IsJobSetSubscribed. @@ -242,10 +275,12 @@ func (mock *JobTableUpdaterMock) IsJobSetSubscribed(queue string, jobSet string) // // len(mockedJobTableUpdater.IsJobSetSubscribedCalls()) func (mock *JobTableUpdaterMock) IsJobSetSubscribedCalls() []struct { + Ctx context.Context Queue string JobSet string } { var calls []struct { + Ctx context.Context Queue string JobSet string } @@ -256,16 +291,18 @@ func (mock *JobTableUpdaterMock) IsJobSetSubscribedCalls() []struct { } // SetSubscriptionError calls SetSubscriptionErrorFunc. -func (mock *JobTableUpdaterMock) SetSubscriptionError(queue string, jobSet string, err string, fromMessageId string) error { +func (mock *JobTableUpdaterMock) SetSubscriptionError(ctx context.Context, queue string, jobSet string, err string, fromMessageId string) error { if mock.SetSubscriptionErrorFunc == nil { panic("JobTableUpdaterMock.SetSubscriptionErrorFunc: method is nil but JobTableUpdater.SetSubscriptionError was just called") } callInfo := struct { + Ctx context.Context Queue string JobSet string Err string FromMessageId string }{ + Ctx: ctx, Queue: queue, JobSet: jobSet, Err: err, @@ -274,7 +311,7 @@ func (mock *JobTableUpdaterMock) SetSubscriptionError(queue string, jobSet strin mock.lockSetSubscriptionError.Lock() mock.calls.SetSubscriptionError = append(mock.calls.SetSubscriptionError, callInfo) mock.lockSetSubscriptionError.Unlock() - return mock.SetSubscriptionErrorFunc(queue, jobSet, err, fromMessageId) + return mock.SetSubscriptionErrorFunc(ctx, queue, jobSet, err, fromMessageId) } // SetSubscriptionErrorCalls gets all the calls that were made to SetSubscriptionError. @@ -282,12 +319,14 @@ func (mock *JobTableUpdaterMock) SetSubscriptionError(queue string, jobSet strin // // len(mockedJobTableUpdater.SetSubscriptionErrorCalls()) func (mock *JobTableUpdaterMock) SetSubscriptionErrorCalls() []struct { + Ctx context.Context Queue string JobSet string Err string FromMessageId string } { var calls []struct { + Ctx context.Context Queue string JobSet string Err string @@ -300,15 +339,17 @@ func (mock *JobTableUpdaterMock) SetSubscriptionErrorCalls() []struct { } // SubscribeJobSet calls SubscribeJobSetFunc. -func (mock *JobTableUpdaterMock) SubscribeJobSet(queue string, jobSet string, fromMessageId string) error { +func (mock *JobTableUpdaterMock) SubscribeJobSet(ctx context.Context, queue string, jobSet string, fromMessageId string) error { if mock.SubscribeJobSetFunc == nil { panic("JobTableUpdaterMock.SubscribeJobSetFunc: method is nil but JobTableUpdater.SubscribeJobSet was just called") } callInfo := struct { + Ctx context.Context Queue string JobSet string FromMessageId string }{ + Ctx: ctx, Queue: queue, JobSet: jobSet, FromMessageId: fromMessageId, @@ -316,7 +357,7 @@ func (mock *JobTableUpdaterMock) SubscribeJobSet(queue string, jobSet string, fr mock.lockSubscribeJobSet.Lock() mock.calls.SubscribeJobSet = append(mock.calls.SubscribeJobSet, callInfo) mock.lockSubscribeJobSet.Unlock() - return mock.SubscribeJobSetFunc(queue, jobSet, fromMessageId) + return mock.SubscribeJobSetFunc(ctx, queue, jobSet, fromMessageId) } // SubscribeJobSetCalls gets all the calls that were made to SubscribeJobSet. @@ -324,11 +365,13 @@ func (mock *JobTableUpdaterMock) SubscribeJobSet(queue string, jobSet string, fr // // len(mockedJobTableUpdater.SubscribeJobSetCalls()) func (mock *JobTableUpdaterMock) SubscribeJobSetCalls() []struct { + Ctx context.Context Queue string JobSet string FromMessageId string } { var calls []struct { + Ctx context.Context Queue string JobSet string FromMessageId string @@ -339,37 +382,62 @@ func (mock *JobTableUpdaterMock) SubscribeJobSetCalls() []struct { return calls } -func (mock *JobTableUpdaterMock) UnsubscribeJobSet(queue string, jobSet string) (int64, error) { +// UnsubscribeJobSet calls UnsubscribeJobSetFunc. +func (mock *JobTableUpdaterMock) UnsubscribeJobSet(ctx context.Context, queue string, jobSet string) (int64, error) { if mock.UnsubscribeJobSetFunc == nil { - panic("JobTableUpdaterMock.UnsubscribeJobSetFunc: method is nil but JobTableUpdater.UnsubscribeJobSetFunc was just called") + panic("JobTableUpdaterMock.UnsubscribeJobSetFunc: method is nil but JobTableUpdater.UnsubscribeJobSet was just called") } callInfo := struct { + Ctx context.Context Queue string JobSet string }{ + Ctx: ctx, Queue: queue, JobSet: jobSet, } mock.lockUnsubscribeJobSet.Lock() mock.calls.UnsubscribeJobSet = append(mock.calls.UnsubscribeJobSet, callInfo) mock.lockUnsubscribeJobSet.Unlock() - return mock.UnsubscribeJobSetFunc(queue, jobSet) + return mock.UnsubscribeJobSetFunc(ctx, queue, jobSet) +} + +// UnsubscribeJobSetCalls gets all the calls that were made to UnsubscribeJobSet. +// Check the length with: +// +// len(mockedJobTableUpdater.UnsubscribeJobSetCalls()) +func (mock *JobTableUpdaterMock) UnsubscribeJobSetCalls() []struct { + Ctx context.Context + Queue string + JobSet string +} { + var calls []struct { + Ctx context.Context + Queue string + JobSet string + } + mock.lockUnsubscribeJobSet.RLock() + calls = mock.calls.UnsubscribeJobSet + mock.lockUnsubscribeJobSet.RUnlock() + return calls } // UpdateJobServiceDb calls UpdateJobServiceDbFunc. -func (mock *JobTableUpdaterMock) UpdateJobServiceDb(jobStatus *JobStatus) error { +func (mock *JobTableUpdaterMock) UpdateJobServiceDb(ctx context.Context, jobTable *JobStatus) error { if mock.UpdateJobServiceDbFunc == nil { panic("JobTableUpdaterMock.UpdateJobServiceDbFunc: method is nil but JobTableUpdater.UpdateJobServiceDb was just called") } callInfo := struct { - JobStatus *JobStatus + Ctx context.Context + JobTable *JobStatus }{ - JobStatus: jobStatus, + Ctx: ctx, + JobTable: jobTable, } mock.lockUpdateJobServiceDb.Lock() mock.calls.UpdateJobServiceDb = append(mock.calls.UpdateJobServiceDb, callInfo) mock.lockUpdateJobServiceDb.Unlock() - return mock.UpdateJobServiceDbFunc(jobStatus) + return mock.UpdateJobServiceDbFunc(ctx, jobTable) } // UpdateJobServiceDbCalls gets all the calls that were made to UpdateJobServiceDb. @@ -377,10 +445,12 @@ func (mock *JobTableUpdaterMock) UpdateJobServiceDb(jobStatus *JobStatus) error // // len(mockedJobTableUpdater.UpdateJobServiceDbCalls()) func (mock *JobTableUpdaterMock) UpdateJobServiceDbCalls() []struct { - JobStatus *JobStatus + Ctx context.Context + JobTable *JobStatus } { var calls []struct { - JobStatus *JobStatus + Ctx context.Context + JobTable *JobStatus } mock.lockUpdateJobServiceDb.RLock() calls = mock.calls.UpdateJobServiceDb @@ -388,36 +458,40 @@ func (mock *JobTableUpdaterMock) UpdateJobServiceDbCalls() []struct { return calls } -// UpdateJobServiceDb calls UpdateJobServiceDbFunc. -func (mock *JobTableUpdaterMock) UpdateJobSetDb(queue, jobSet, fromMessageId string) error { - if mock.UpdateJobServiceDbFunc == nil { - panic("JobTableUpdaterMock.UpdateJobServiceDbFunc: method is nil but JobTableUpdater.UpdateJobServiceDb was just called") +// UpdateJobSetDb calls UpdateJobSetDbFunc. +func (mock *JobTableUpdaterMock) UpdateJobSetDb(ctx context.Context, queue string, jobSet string, fromMessageId string) error { + if mock.UpdateJobSetDbFunc == nil { + panic("JobTableUpdaterMock.UpdateJobSetDbFunc: method is nil but JobTableUpdater.UpdateJobSetDb was just called") } callInfo := struct { + Ctx context.Context Queue string JobSet string FromMessageId string }{ + Ctx: ctx, Queue: queue, JobSet: jobSet, FromMessageId: fromMessageId, } mock.lockUpdateJobSetDb.Lock() mock.calls.UpdateJobSetDb = append(mock.calls.UpdateJobSetDb, callInfo) - mock.lockUpdateJobServiceDb.Unlock() - return mock.UpdateJobSetDbFunc(queue, jobSet, fromMessageId) + mock.lockUpdateJobSetDb.Unlock() + return mock.UpdateJobSetDbFunc(ctx, queue, jobSet, fromMessageId) } -// UpdateJobServiceDbCalls gets all the calls that were made to UpdateJobServiceDb. +// UpdateJobSetDbCalls gets all the calls that were made to UpdateJobSetDb. // Check the length with: // -// len(mockedJobTableUpdater.UpdateJobServiceDbCalls()) +// len(mockedJobTableUpdater.UpdateJobSetDbCalls()) func (mock *JobTableUpdaterMock) UpdateJobSetDbCalls() []struct { + Ctx context.Context Queue string JobSet string FromMessageId string } { var calls []struct { + Ctx context.Context Queue string JobSet string FromMessageId string diff --git a/internal/jobservice/repository/sql_job_service_test.go b/internal/jobservice/repository/sql_job_service_test.go index 0a2e2522961..1314df7057b 100644 --- a/internal/jobservice/repository/sql_job_service_test.go +++ b/internal/jobservice/repository/sql_job_service_test.go @@ -1,101 +1,112 @@ package repository import ( - "database/sql" + "context" "fmt" "os" "sync" "testing" "time" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/armadaproject/armada/internal/common/database" "github.com/armadaproject/armada/internal/jobservice/configuration" "github.com/armadaproject/armada/pkg/api/jobservice" ) func TestConstructInMemoryDoesNotExist(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - responseExpected := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_JOB_ID_NOT_FOUND} + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + responseExpected := &jobservice.JobServiceResponse{ + State: jobservice.JobServiceResponse_JOB_ID_NOT_FOUND, + } jobStatus := NewJobStatus("test", "job-set-1", "job-id", *responseExpected) - err := r.UpdateJobServiceDb(jobStatus) + err := r.UpdateJobServiceDb(ctx, jobStatus) require.NoError(t, err) - resp, err := r.GetJobStatus("job-set-1") + resp, err := r.GetJobStatus(ctx, "job-set-1") assert.NoError(t, err) assert.Equal(t, resp, responseExpected) }) } func TestSubscriptionError(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - err := r.SubscribeJobSet("queue-1", "job-set-1", "") + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + err := r.SubscribeJobSet(ctx, "queue-1", "job-set-1", "") require.NoError(t, err) - err = r.SetSubscriptionError("queue-1", "job-set-1", "conn-error", "test") + err = r.SetSubscriptionError(ctx, "queue-1", "job-set-1", "conn-error", "test") require.NoError(t, err) - conErr, subErr := r.GetSubscriptionError("queue-1", "job-set-1") + conErr, subErr := r.GetSubscriptionError(ctx, "queue-1", "job-set-1") require.NoError(t, subErr) assert.Equal(t, conErr, "conn-error") }) } func TestUpdateJobSetDb(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - err := r.SubscribeJobSet("test", "job-set-1", "test") + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + err := r.SubscribeJobSet(ctx, "test", "job-set-1", "test") require.NoError(t, err) - err = r.UpdateJobSetDb("test", "job-set-1", "test") + err = r.UpdateJobSetDb(ctx, "test", "job-set-1", "test") require.NoError(t, err) }) } func TestConstructInMemoryServiceFailed(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() responseExpected := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_FAILED, Error: "TestFail"} jobStatus := NewJobStatus("test", "job-set-1", "job-id", *responseExpected) - err := r.UpdateJobServiceDb(jobStatus) + err := r.UpdateJobServiceDb(ctx, jobStatus) require.NoError(t, err) - resp, err := r.GetJobStatus("job-id") + resp, err := r.GetJobStatus(ctx, "job-id") require.NoError(t, err) require.Equal(t, resp, responseExpected) }) } func TestConstructInMemoryServiceNoJob(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - responseExpected := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_JOB_ID_NOT_FOUND} - resp, err := r.GetJobStatus("job-set-1") + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + responseExpected := &jobservice.JobServiceResponse{ + State: jobservice.JobServiceResponse_JOB_ID_NOT_FOUND, + } + resp, err := r.GetJobStatus(ctx, "job-set-1") require.NoError(t, err) require.Equal(t, resp, responseExpected) }) } func TestIsJobSubscribed(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - resp, _, err := r.IsJobSetSubscribed("queue-1", "job-set-1") + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + resp, _, err := r.IsJobSetSubscribed(ctx, "queue-1", "job-set-1") require.NoError(t, err) require.False(t, resp) - err = r.SubscribeJobSet("queue-1", "job-set-1", "") + err = r.SubscribeJobSet(ctx, "queue-1", "job-set-1", "") require.NoError(t, err) - resp2, _, err := r.IsJobSetSubscribed("queue-1", "job-set-1") + resp2, _, err := r.IsJobSetSubscribed(ctx, "queue-1", "job-set-1") require.NoError(t, err) require.True(t, resp2) - err = r.SubscribeJobSet("queue-1", "job-set-1", "") + err = r.SubscribeJobSet(ctx, "queue-1", "job-set-1", "") require.NoError(t, err) }) } func TestSubscribeList(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - err := r.SubscribeJobSet("queue", "job-set-1", "") + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + err := r.SubscribeJobSet(ctx, "queue", "job-set-1", "") require.NoError(t, err) - err = r.SubscribeJobSet("queue", "job-set-2", "") + err = r.SubscribeJobSet(ctx, "queue", "job-set-2", "") require.NoError(t, err) - subscribeList, err := r.GetSubscribedJobSets() + subscribeList, err := r.GetSubscribedJobSets(ctx) require.NoError(t, err) for _, val := range subscribeList { if val.Queue == "queue" && val.JobSet == "job-set-1" { @@ -112,10 +123,11 @@ func TestSubscribeList(t *testing.T) { } func TestCleanupJobSetAndJobsIfNonExist(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - rowsAffected, err := r.CleanupJobSetAndJobs("queue", "job-set-1") + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + rowsAffected, err := r.CleanupJobSetAndJobs(ctx, "queue", "job-set-1") require.NoError(t, err) - subscribe, _, err := r.IsJobSetSubscribed("queue", "job-set-1") + subscribe, _, err := r.IsJobSetSubscribed(ctx, "queue", "job-set-1") require.NoError(t, err) require.False(t, subscribe) require.Equal(t, rowsAffected, int64(0)) @@ -124,13 +136,14 @@ func TestCleanupJobSetAndJobsIfNonExist(t *testing.T) { } func TestCleanupJobSetAndJobsHappy(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - err := r.SubscribeJobSet("queue", "job-set-1", "") + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + err := r.SubscribeJobSet(ctx, "queue", "job-set-1", "") require.NoError(t, err) - respHappy, _, _ := r.IsJobSetSubscribed("queue", "job-set-1") + respHappy, _, _ := r.IsJobSetSubscribed(ctx, "queue", "job-set-1") require.True(t, respHappy) - rowsAffected, err := r.CleanupJobSetAndJobs("queue", "job-set-1") - subscribe, _, _ := r.IsJobSetSubscribed("queue", "job-set-1") + rowsAffected, err := r.CleanupJobSetAndJobs(ctx, "queue", "job-set-1") + subscribe, _, _ := r.IsJobSetSubscribed(ctx, "queue", "job-set-1") require.False(t, subscribe) require.Equal(t, rowsAffected, int64(0)) require.NoError(t, err) @@ -138,49 +151,57 @@ func TestCleanupJobSetAndJobsHappy(t *testing.T) { } func TestDeleteJobsInJobSet(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - responseExpected1 := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_FAILED, Error: "TestFail"} + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + responseExpected1 := &jobservice.JobServiceResponse{ + State: jobservice.JobServiceResponse_FAILED, Error: "TestFail", + } jobStatus1 := NewJobStatus("test", "job-set-1", "job-id", *responseExpected1) - err := r.UpdateJobServiceDb(jobStatus1) + err := r.UpdateJobServiceDb(ctx, jobStatus1) require.NoError(t, err) - jobResponse1, _ := r.GetJobStatus("job-id") + jobResponse1, _ := r.GetJobStatus(ctx, "job-id") require.Equal(t, jobResponse1, responseExpected1) - err = r.SubscribeJobSet("test", "job-set-1", "") + err = r.SubscribeJobSet(ctx, "test", "job-set-1", "") require.NoError(t, err) - rows, err := r.DeleteJobsInJobSet("test", "job-set-1") + rows, err := r.DeleteJobsInJobSet(ctx, "test", "job-set-1") require.Equal(t, rows, int64(1)) require.NoError(t, err) - jobResponseDelete1, _ := r.GetJobStatus("job-id") - responseDoesNotExist := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_JOB_ID_NOT_FOUND} + jobResponseDelete1, _ := r.GetJobStatus(ctx, "job-id") + responseDoesNotExist := &jobservice.JobServiceResponse{ + State: jobservice.JobServiceResponse_JOB_ID_NOT_FOUND, + } require.Equal(t, jobResponseDelete1, responseDoesNotExist) - rowsEmpty, errEmpty := r.DeleteJobsInJobSet("test", "job-set-1") + rowsEmpty, errEmpty := r.DeleteJobsInJobSet(ctx, "test", "job-set-1") require.Equal(t, rowsEmpty, int64(0)) require.NoError(t, errEmpty) }) } func TestCheckToUnSubscribe(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - responseExpected1 := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_FAILED, Error: "TestFail"} + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + responseExpected1 := &jobservice.JobServiceResponse{ + State: jobservice.JobServiceResponse_FAILED, Error: "TestFail", + } jobStatus1 := NewJobStatus("test", "job-set-1", "job-id", *responseExpected1) - err := r.UpdateJobServiceDb(jobStatus1) + err := r.UpdateJobServiceDb(ctx, jobStatus1) require.NoError(t, err) - err = r.SubscribeJobSet("test", "job-set-1", "") + err = r.SubscribeJobSet(ctx, "test", "job-set-1", "") require.NoError(t, err) - subscribe, _, err := r.IsJobSetSubscribed("test", "job-set-1") + subscribe, _, err := r.IsJobSetSubscribed(ctx, "test", "job-set-1") require.NoError(t, err) assert.True(t, subscribe) - flag, errTrue := r.CheckToUnSubscribe("test", "job-set-1", 100000) + flag, errTrue := r.CheckToUnSubscribe(ctx, "test", "job-set-1", 100000) require.NoError(t, errTrue) - flagFalse, errFalse := r.CheckToUnSubscribe("test", "job-set-1", -1) + flagFalse, errFalse := r.CheckToUnSubscribe(ctx, "test", "job-set-1", -1) require.NoError(t, errFalse) assert.False(t, flag) assert.True(t, flagFalse) @@ -188,58 +209,67 @@ func TestCheckToUnSubscribe(t *testing.T) { } func TestCheckToUnSubscribeWithoutSubscribing(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - responseExpected1 := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_FAILED, Error: "TestFail"} - responseExpected2 := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_SUCCEEDED} + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + responseExpected1 := &jobservice.JobServiceResponse{ + State: jobservice.JobServiceResponse_FAILED, Error: "TestFail", + } + responseExpected2 := &jobservice.JobServiceResponse{ + State: jobservice.JobServiceResponse_SUCCEEDED, + } jobStatus1 := NewJobStatus("test", "job-set-1", "job-id", *responseExpected1) jobStatus2 := NewJobStatus("test", "job-set-2", "job-id-3", *responseExpected2) - err := r.UpdateJobServiceDb(jobStatus1) + err := r.UpdateJobServiceDb(ctx, jobStatus1) require.NoError(t, err) - err = r.UpdateJobServiceDb(jobStatus2) + err = r.UpdateJobServiceDb(ctx, jobStatus2) assert.NoError(t, err) - subscribe, _, err := r.IsJobSetSubscribed("test", "job-set-1") + subscribe, _, err := r.IsJobSetSubscribed(ctx, "test", "job-set-1") require.NoError(t, err) assert.False(t, subscribe) - flag, err := r.CheckToUnSubscribe("test", "job-set-1", 100000) + flag, err := r.CheckToUnSubscribe(ctx, "test", "job-set-1", 100000) require.NoError(t, err) assert.False(t, flag) }) } func TestUnsubscribe(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - err := r.SubscribeJobSet("test", "testjobset", "") + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + err := r.SubscribeJobSet(ctx, "test", "testjobset", "") require.NoError(t, err) - numberOfJobSets, err := r.UnsubscribeJobSet("test", "testjobset") + numberOfJobSets, err := r.UnsubscribeJobSet(ctx, "test", "testjobset") require.NoError(t, err) assert.Equal(t, numberOfJobSets, int64(1)) - subscribe, _, err := r.IsJobSetSubscribed("test", "testjobset") + subscribe, _, err := r.IsJobSetSubscribed(ctx, "test", "testjobset") require.NoError(t, err) assert.False(t, subscribe) }) } func TestUpdateJobSetTime(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - err := r.SubscribeJobSet("test", "job-set-1", "") + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + err := r.SubscribeJobSet(ctx, "test", "job-set-1", "") require.NoError(t, err) - err = r.UpdateJobSetDb("test", "job-set-1", "") + err = r.UpdateJobSetDb(ctx, "test", "job-set-1", "") require.NoError(t, err) }) } func TestUpdateJobSetTimeWithoutSubscribe(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - updateErr := r.UpdateJobSetDb("test", "job-set-1", "") + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() + updateErr := r.UpdateJobSetDb(ctx, "test", "job-set-1", "") assert.EqualError(t, updateErr, "queue test jobSet job-set-1 is already unsubscribed") }) } func TestGetJobStatusAllStates(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() responseFailed := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_FAILED, Error: "TestFail"} responseSuccess := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_SUCCEEDED} responseDuplicate := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_DUPLICATE_FOUND} @@ -256,28 +286,28 @@ func TestGetJobStatusAllStates(t *testing.T) { jobStatus6 := NewJobStatus("test", "job-set-1", "job-id-6", *responseCancelled) jobStatus7 := NewJobStatus("test", "job-set-1", "job-id-7", *responseDoesNotExist) - err := r.UpdateJobServiceDb(jobStatus1) + err := r.UpdateJobServiceDb(ctx, jobStatus1) require.NoError(t, err) - err = r.UpdateJobServiceDb(jobStatus2) + err = r.UpdateJobServiceDb(ctx, jobStatus2) require.NoError(t, err) - err = r.UpdateJobServiceDb(jobStatus3) + err = r.UpdateJobServiceDb(ctx, jobStatus3) require.NoError(t, err) - err = r.UpdateJobServiceDb(jobStatus4) + err = r.UpdateJobServiceDb(ctx, jobStatus4) require.NoError(t, err) - err = r.UpdateJobServiceDb(jobStatus5) + err = r.UpdateJobServiceDb(ctx, jobStatus5) require.NoError(t, err) - err = r.UpdateJobServiceDb(jobStatus6) + err = r.UpdateJobServiceDb(ctx, jobStatus6) require.NoError(t, err) - err = r.UpdateJobServiceDb(jobStatus7) + err = r.UpdateJobServiceDb(ctx, jobStatus7) require.NoError(t, err) - actualFailed, errFailed := r.GetJobStatus("job-id") - actualSuccess, errSuccess := r.GetJobStatus("job-id-2") - actualDuplicate, errDup := r.GetJobStatus("job-id-3") - actualRunning, errRunning := r.GetJobStatus("job-id-4") - actualSubmitted, errSubmitted := r.GetJobStatus("job-id-5") - actualCancelled, errCancel := r.GetJobStatus("job-id-6") - actualNotExist, errNotExist := r.GetJobStatus("job-id-7") + actualFailed, errFailed := r.GetJobStatus(ctx, "job-id") + actualSuccess, errSuccess := r.GetJobStatus(ctx, "job-id-2") + actualDuplicate, errDup := r.GetJobStatus(ctx, "job-id-3") + actualRunning, errRunning := r.GetJobStatus(ctx, "job-id-4") + actualSubmitted, errSubmitted := r.GetJobStatus(ctx, "job-id-5") + actualCancelled, errCancel := r.GetJobStatus(ctx, "job-id-6") + actualNotExist, errNotExist := r.GetJobStatus(ctx, "job-id-7") require.NoError(t, errFailed) require.Equal(t, responseFailed, actualFailed) @@ -297,63 +327,66 @@ func TestGetJobStatusAllStates(t *testing.T) { } func TestDeleteJobsBeforePersistingRaceError(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() responseSuccess := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_SUCCEEDED} noExist := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_JOB_ID_NOT_FOUND} var expectedNumberOfJobs int64 = 1 jobStatus1 := NewJobStatus("test-race", "job-set-race", "job-race", *responseSuccess) - err := r.UpdateJobServiceDb(jobStatus1) + err := r.UpdateJobServiceDb(ctx, jobStatus1) require.NoError(t, err) - err = r.SubscribeJobSet("test-race", "job-set-race", "") + err = r.SubscribeJobSet(ctx, "test-race", "job-set-race", "") require.NoError(t, err) - numberOfJobs, deleteErr := r.CleanupJobSetAndJobs("test-race", "job-set-race") + numberOfJobs, deleteErr := r.CleanupJobSetAndJobs(ctx, "test-race", "job-set-race") assert.Equal(t, expectedNumberOfJobs, numberOfJobs) require.NoError(t, deleteErr) - actualSuccess, actualError := r.GetJobStatus("job-race") + actualSuccess, actualError := r.GetJobStatus(ctx, "job-race") assert.Equal(t, actualSuccess, noExist) require.NoError(t, actualError) - sqlNoExist, sqlError := r.GetJobStatus("job-race") + sqlNoExist, sqlError := r.GetJobStatus(ctx, "job-race") assert.Equal(t, sqlNoExist, noExist) require.NoError(t, sqlError) }) } func TestGetJobStatusAfterPersisting(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() responseSuccess := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_SUCCEEDED} jobStatus1 := NewJobStatus("test", "job-set-1", "job-id", *responseSuccess) - err := r.UpdateJobServiceDb(jobStatus1) + err := r.UpdateJobServiceDb(ctx, jobStatus1) require.NoError(t, err) - actual, actualErr := r.GetJobStatus("job-id") + actual, actualErr := r.GetJobStatus(ctx, "job-id") assert.Nil(t, actualErr) assert.Equal(t, actual, responseSuccess) }) } func TestDuplicateIdDatabaseInsert(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() responseRunning := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_RUNNING} responseSuccess := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_SUCCEEDED} jobStatus1 := NewJobStatus("test", "job-set-1", "job-id", *responseRunning) - err := r.UpdateJobServiceDb(jobStatus1) + err := r.UpdateJobServiceDb(ctx, jobStatus1) require.NoError(t, err) - actualSql, actualErr := r.GetJobStatus("job-id") + actualSql, actualErr := r.GetJobStatus(ctx, "job-id") assert.Equal(t, actualSql, responseRunning) require.NoError(t, actualErr) jobStatus2 := NewJobStatus("test", "job-set-1", "job-id", *responseSuccess) - err = r.UpdateJobServiceDb(jobStatus2) + err = r.UpdateJobServiceDb(ctx, jobStatus2) require.NoError(t, err) - actualSuccessSql, actualSuccessErr := r.GetJobStatus("job-id") + actualSuccessSql, actualSuccessErr := r.GetJobStatus(ctx, "job-id") assert.Equal(t, actualSuccessSql, responseSuccess) require.NoError(t, actualSuccessErr) }) } func TestHealthCheck(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { - healthCheck, err := r.HealthCheck() + WithSqlServiceRepo(func(r SQLJobService) { + healthCheck, err := r.HealthCheck(context.Background()) assert.True(t, healthCheck) require.NoError(t, err) }) @@ -362,7 +395,8 @@ func TestHealthCheck(t *testing.T) { // This test will fail if sqlite writes are not serialised somehow due to // SQLITE_BUSY errors. func TestConcurrentJobStatusUpdating(t *testing.T) { - WithSqlServiceRepo(func(r *SQLJobService) { + WithSqlServiceRepo(func(r SQLJobService) { + ctx := context.Background() responseRunning := &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_RUNNING} concurrency := 10 @@ -380,9 +414,9 @@ func TestConcurrentJobStatusUpdating(t *testing.T) { jobStatus := NewJobStatus("test", "job-set-1", jobId, *responseRunning) startWg.Wait() - err := r.UpdateJobServiceDb(jobStatus) + err := r.UpdateJobServiceDb(ctx, jobStatus) assert.Nil(t, err) - actualSql, actualErr := r.GetJobStatus(jobId) + actualSql, actualErr := r.GetJobStatus(ctx, jobId) assert.Equal(t, actualSql, responseRunning) assert.Nil(t, actualErr) }(i) @@ -393,31 +427,20 @@ func TestConcurrentJobStatusUpdating(t *testing.T) { }) } -func WithSqlServiceRepo(action func(r *SQLJobService)) { - var db *sql.DB - var err error +func WithSqlServiceRepo(action func(r SQLJobService)) { + var repo SQLJobService config := &configuration.JobServiceConfiguration{} + log := log.WithField("JobService", "Startup") - // If JSDBTYPE is not specified in the environment, default to 'sqlite' - jsDatabase := "sqlite" - - if os.Getenv("JSDBTYPE") == "postgres" { - jsDatabase = "postgres" - } - - if jsDatabase == "sqlite" { + if os.Getenv("JSDBTYPE") == "sqlite" { config.DatabaseType = "sqlite" - - db, err = sql.Open("sqlite", "test.db") - if err != nil { - panic(err) - } - } else if jsDatabase == "postgres" { + config.DatabasePath = "test.db" + } else if os.Getenv("JSDBTYPE") == "postgres" { config.DatabaseType = "postgres" config.PostgresConfig = configuration.PostgresConfig{ - MaxOpenConns: 20, - MaxIdleConns: 5, - ConnMaxLifetime: 30 * time.Second, + PoolMaxOpenConns: 20, + PoolMaxIdleConns: 5, + PoolMaxConnLifetime: 30 * time.Second, Connection: map[string]string{ "host": "localhost", "port": "5432", @@ -427,22 +450,22 @@ func WithSqlServiceRepo(action func(r *SQLJobService)) { "sslmode": "disable", }, } + } - db, err = sql.Open("pgx", database.CreateConnectionString(config.PostgresConfig.Connection)) - if err != nil { - panic(err) - } - db.SetMaxOpenConns(config.PostgresConfig.MaxOpenConns) - db.SetMaxIdleConns(config.PostgresConfig.MaxIdleConns) - db.SetConnMaxLifetime(config.PostgresConfig.ConnMaxLifetime) + err, repo, dbCallbackFn := NewSQLJobService(config, log) + if err != nil { + panic(err) } + defer dbCallbackFn() - repo := NewSQLJobService(config, db) - repo.Setup() + repo.Setup(context.Background()) action(repo) - db.Close() if config.DatabaseType == "sqlite" { - os.Remove("test.db") + // Besides the base sqlite storage file (e.g. "test.db"), there + // are also two others to be removed ("test.db-shm", "test.db-wal") + for _, suffix := range []string{"", "-shm", "-wal"} { + os.Remove(config.DatabasePath + suffix) + } } } diff --git a/internal/jobservice/repository/sqlite.go b/internal/jobservice/repository/sqlite.go new file mode 100644 index 00000000000..298788c7064 --- /dev/null +++ b/internal/jobservice/repository/sqlite.go @@ -0,0 +1,379 @@ +//go:generate moq -out sql_job_service_moq.go . JobTableUpdater +package repository + +import ( + "context" + "database/sql" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + _ "modernc.org/sqlite" + + log "github.com/sirupsen/logrus" + + "github.com/armadaproject/armada/internal/jobservice/configuration" + js "github.com/armadaproject/armada/pkg/api/jobservice" +) + +// JSRepoSQLite for persisting to DB. +type JSRepoSQLite struct { + jobServiceConfig *configuration.JobServiceConfiguration + db *sql.DB + lock sync.RWMutex +} + +func NewJSRepoSQLite(config *configuration.JobServiceConfiguration, log *log.Entry) (error, *JSRepoSQLite, func()) { + var err error + + dbDir := filepath.Dir(config.DatabasePath) + if _, err := os.Stat(dbDir); os.IsNotExist(err) { + if errMkDir := os.Mkdir(dbDir, 0o755); errMkDir != nil { + errMsg := fmt.Sprintf("error: could not make directory at %s for sqlite db: %v", dbDir, errMkDir) + return errors.New(errMsg), nil, func() {} + } + } + + sqliteDb, err := sql.Open("sqlite", config.DatabasePath) + if err != nil { + errMsg := fmt.Sprintf("error opening sqlite DB from %s %v", config.DatabasePath, err) + return errors.New(errMsg), nil, func() {} + } + + return nil, &JSRepoSQLite{jobServiceConfig: config, db: sqliteDb}, func() { + if err := sqliteDb.Close(); err != nil { + log.Warnf("error closing database: %v", err) + } + } +} + +// Set up the DB for use, create tables +func (s *JSRepoSQLite) Setup(ctx context.Context) { + s.lock.Lock() + defer s.lock.Unlock() + + _, err := s.db.Exec("PRAGMA journal_mode=WAL") + if err != nil { + panic(err) + } + + _, err = s.db.Exec("DROP TABLE IF EXISTS jobservice") + if err != nil { + panic(err) + } + + _, err = s.db.Exec(` + CREATE TABLE jobservice ( + Queue TEXT, + JobSetId TEXT, + JobId TEXT, + JobResponseState TEXT, + JobResponseError TEXT, + Timestamp INT, + PRIMARY KEY(JobId))`) + + if err != nil { + panic(err) + } + + _, errIndex := s.db.Exec(`CREATE INDEX idx_job_set_queue ON jobservice (Queue, JobSetId)`) + if errIndex != nil { + panic(errIndex) + } + _, err = s.db.Exec("DROP TABLE IF EXISTS jobsets") + if err != nil { + panic(err) + } + + _, err = s.db.Exec(` + CREATE TABLE jobsets ( + Queue TEXT, + JobSetId TEXT, + Timestamp INT, + ConnectionError TEXT, + FromMessageId TEXT, + UNIQUE(Queue,JobSetId))`) + if err != nil { + panic(err) + } +} + +// Get the JobStatus given the jodId +func (s *JSRepoSQLite) GetJobStatus(ctx context.Context, jobId string) (*js.JobServiceResponse, error) { + s.lock.Lock() + defer s.lock.Unlock() + + var queue, jobSetId, jobState, jobError string + sqlStmt := "SELECT Queue, JobSetId, JobResponseState, JobResponseError FROM jobservice WHERE JobId = ?" + + row := s.db.QueryRow(sqlStmt, jobId) + err := row.Scan(&queue, &jobSetId, &jobState, &jobError) + + if err == sql.ErrNoRows { + return &js.JobServiceResponse{State: js.JobServiceResponse_JOB_ID_NOT_FOUND}, nil + } else if err != nil { + return nil, err + } + + // indicate connnection error for jobset/queue subscription where present + connErr, err := s.GetSubscriptionError(ctx, queue, jobSetId) + if err != nil { + return nil, err + } + if connErr != "" { + return &js.JobServiceResponse{ + Error: connErr, + State: js.JobServiceResponse_CONNECTION_ERR, + }, nil + } + + jobJSRState, err := JobStateStrToJSRState(jobState) + if err != nil { + return nil, err + } + + return &js.JobServiceResponse{Error: jobError, State: jobJSRState}, nil +} + +// Update database with JobTable. +func (s *JSRepoSQLite) UpdateJobServiceDb(ctx context.Context, jobTable *JobStatus) error { + // SQLite only allows one write at a time. Therefore we must serialize + // writes in order to avoid SQL_BUSY errors. + s.lock.Lock() + defer s.lock.Unlock() + + sqlStmt := "INSERT OR REPLACE INTO jobservice VALUES (?, ?, ?, ?, ?, ?)" + stmt, err := s.db.Prepare(sqlStmt) + if err != nil { + return err + } + defer stmt.Close() + _, errExec := stmt.Exec(jobTable.queue, jobTable.jobSetId, jobTable.jobId, + jobTable.jobResponse.State.String(), jobTable.jobResponse.Error, jobTable.timeStamp) + return errExec +} + +func (s *JSRepoSQLite) UpdateJobSetDb(ctx context.Context, queue string, jobSet string, fromMessageId string) error { + subscribe, _, err := s.IsJobSetSubscribed(ctx, queue, jobSet) + if err != nil { + return err + } + if !subscribe { + return fmt.Errorf("queue %s jobSet %s is already unsubscribed", queue, jobSet) + } + s.lock.Lock() + defer s.lock.Unlock() + + sqlStmt := "INSERT OR REPLACE INTO jobsets VALUES(?, ?, ?, ?, ?)" + + jobSetState, err := s.db.Prepare(sqlStmt) + if err != nil { + return err + } + defer jobSetState.Close() + _, jobSetErr := jobSetState.Exec(queue, jobSet, time.Now().Unix(), "", &fromMessageId) + if jobSetErr != nil { + return jobSetErr + } + return nil +} + +func (s *JSRepoSQLite) HealthCheck(ctx context.Context) (bool, error) { + s.lock.Lock() + defer s.lock.Unlock() + + row := s.db.QueryRow("SELECT 1") + var col int + err := row.Scan(&col) + if err == nil { + return true, nil + } else { + return false, fmt.Errorf("SQL health check failed: %v", err) + } +} + +// Check if JobSet is in our map. +func (s *JSRepoSQLite) IsJobSetSubscribed(ctx context.Context, queue string, jobSet string) (bool, string, error) { + s.lock.Lock() + defer s.lock.Unlock() + + sqlStmt := "SELECT Queue, JobSetId, FromMessageId FROM jobsets WHERE Queue = ? AND JobSetId = ?" + row := s.db.QueryRow(sqlStmt, queue, jobSet) + var queueScan, jobSetIdScan, fromMessageId string + + err := row.Scan(&queueScan, &jobSetIdScan, &fromMessageId) + + if err == sql.ErrNoRows { + return false, "", nil + } else if err != nil { + return false, "", err + } + return true, fromMessageId, nil +} + +// Clear subscription error if present +func (s *JSRepoSQLite) AddMessageIdAndClearSubscriptionError(ctx context.Context, queue string, + jobSet string, fromMessageId string, +) error { + return s.SetSubscriptionError(ctx, queue, jobSet, "", fromMessageId) +} + +// Set subscription error if present +func (s *JSRepoSQLite) SetSubscriptionError(ctx context.Context, queue string, jobSet string, + connErr string, fromMessageId string, +) error { + s.lock.Lock() + defer s.lock.Unlock() + + sqlStmt := "INSERT OR REPLACE INTO jobsets VALUES(?, ?, ?, ?, ?)" + jobSetState, err := s.db.Prepare(sqlStmt) + if err != nil { + return err + } + defer jobSetState.Close() + + subscribeTable := NewSubscribeTable(queue, jobSet) + _, jobSetErr := jobSetState.Exec(subscribeTable.queue, jobSet, subscribeTable.lastRequestTimeStamp, + connErr, fromMessageId) + if jobSetErr != nil { + return jobSetErr + } + return jobSetErr +} + +// Get subscription error if present +func (s *JSRepoSQLite) GetSubscriptionError(ctx context.Context, queue string, jobSet string) (string, error) { + sqlStmt := "SELECT ConnectionError FROM jobsets WHERE Queue = ? AND JobSetId = ?" + var connError string + + row := s.db.QueryRow(sqlStmt, queue, jobSet) + err := row.Scan(&connError) + + if err == sql.ErrNoRows { + return "", nil + } else if err != nil { + return "", err + } + return connError, nil +} + +// Mark our JobSet as being subscribed +// SubscribeTable contains Queue, JobSet and time when it was created. +func (s *JSRepoSQLite) SubscribeJobSet(ctx context.Context, queue string, jobSet string, fromMessageId string) error { + s.lock.Lock() + defer s.lock.Unlock() + + sqlStmt := "INSERT OR REPLACE INTO jobsets VALUES(?, ?, ?, ?, ?)" + + jobSetState, err := s.db.Prepare(sqlStmt) + if err != nil { + return err + } + defer jobSetState.Close() + subscribeTable := NewSubscribeTable(queue, jobSet) + _, jobSetErr := jobSetState.Exec(subscribeTable.queue, subscribeTable.jobSet, + subscribeTable.lastRequestTimeStamp, "", fromMessageId) + return jobSetErr +} + +// UnSubscribe to JobSet and delete all the jobs in the database +func (s *JSRepoSQLite) CleanupJobSetAndJobs(ctx context.Context, queue string, jobSet string) (int64, error) { + _, errUnsubscribe := s.UnsubscribeJobSet(ctx, queue, jobSet) + if errUnsubscribe != nil { + return 0, errUnsubscribe + } + return s.DeleteJobsInJobSet(ctx, queue, jobSet) +} + +// Checks JobSet table to make determine if we should unsubscribe from JobSet +// configTimeWithoutUpdates is a configurable value that is read from the config +// We allow unsubscribing if the jobset hasn't been updated in configTime +// TODO implement this +func (s *JSRepoSQLite) CheckToUnSubscribe(ctx context.Context, queue string, jobSet string, + configTimeWithoutUpdates int64, +) (bool, error) { + jobSetFound, _, err := s.IsJobSetSubscribed(ctx, queue, jobSet) + if err != nil { + return false, nil + } + if !jobSetFound { + return false, nil + } + + s.lock.Lock() + defer s.lock.Unlock() + + sqlStmt := "SELECT Timestamp FROM jobsets WHERE Queue = ? AND JobSetId = ?" + row := s.db.QueryRow(sqlStmt, queue, jobSet) + var timeStamp int + + timeErr := row.Scan(&timeStamp) + + if timeErr == sql.ErrNoRows { + return false, nil + } else if err != nil { + return false, err + } + + currentTime := time.Now().Unix() + if (currentTime - configTimeWithoutUpdates) > int64(timeStamp) { + return true, nil + } + return false, nil +} + +func (s *JSRepoSQLite) UnsubscribeJobSet(ctx context.Context, queue, jobSet string) (int64, error) { + s.lock.Lock() + defer s.lock.Unlock() + + sqlStmt := "DELETE FROM jobsets WHERE Queue = ? AND JobSetId = ?" + + result, err := s.db.Exec(sqlStmt, queue, jobSet) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +// Delete Jobs in the database +func (s *JSRepoSQLite) DeleteJobsInJobSet(ctx context.Context, queue string, jobSet string) (int64, error) { + s.lock.Lock() + defer s.lock.Unlock() + + sqlStmt := "DELETE FROM jobservice WHERE Queue = ? AND JobSetId = ?" + + result, err := s.db.Exec(sqlStmt, queue, jobSet) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +func (s *JSRepoSQLite) GetSubscribedJobSets(ctx context.Context) ([]SubscribedTuple, error) { + s.lock.Lock() + defer s.lock.Unlock() + + rows, err := s.db.Query("SELECT Queue, JobSetId, FromMessageId FROM jobsets") + if err != nil { + return nil, err + } + defer rows.Close() + + var tuples []SubscribedTuple + + // Loop through rows, using Scan to assign column data to struct fields. + for rows.Next() { + var st SubscribedTuple + if err := rows.Scan(&st.Queue, &st.JobSet, &st.FromMessageId); err != nil { + return tuples, err + } + tuples = append(tuples, st) + } + if err = rows.Err(); err != nil { + return tuples, err + } + return tuples, nil +} diff --git a/internal/jobservice/server/server.go b/internal/jobservice/server/server.go index dc47e0eb65b..adc8ee4875b 100644 --- a/internal/jobservice/server/server.go +++ b/internal/jobservice/server/server.go @@ -15,10 +15,10 @@ import ( type JobServiceServer struct { jobServiceConfig *configuration.JobServiceConfiguration - jobRepository *repository.SQLJobService + jobRepository repository.SQLJobService } -func NewJobService(config *configuration.JobServiceConfiguration, sqlService *repository.SQLJobService) *JobServiceServer { +func NewJobService(config *configuration.JobServiceConfiguration, sqlService repository.SQLJobService) *JobServiceServer { return &JobServiceServer{jobServiceConfig: config, jobRepository: sqlService} } @@ -29,21 +29,21 @@ func (s *JobServiceServer) GetJobStatus(ctx context.Context, opts *js.JobService "queue": opts.Queue, } - jobSetExists, fromMessageId, err := s.jobRepository.IsJobSetSubscribed(opts.Queue, opts.JobSetId) + jobSetExists, fromMessageId, err := s.jobRepository.IsJobSetSubscribed(ctx, opts.Queue, opts.JobSetId) if err != nil { log.Error("error checking if job is subscribed", err) } if !jobSetExists { - errsubscribe := s.jobRepository.SubscribeJobSet(opts.Queue, opts.JobSetId, fromMessageId) + errsubscribe := s.jobRepository.SubscribeJobSet(ctx, opts.Queue, opts.JobSetId, fromMessageId) if errsubscribe != nil { log.Error("unable to subscribe job set", err) } log.Infof("Subscribing %s-%s", opts.Queue, opts.JobSetId) } - if err := s.jobRepository.UpdateJobSetDb(opts.Queue, opts.JobSetId, fromMessageId); err != nil { + if err := s.jobRepository.UpdateJobSetDb(ctx, opts.Queue, opts.JobSetId, fromMessageId); err != nil { log.WithFields(requestFields).Warn(err) } - response, err := s.jobRepository.GetJobStatus(opts.JobId) + response, err := s.jobRepository.GetJobStatus(ctx, opts.JobId) if err != nil { log.WithFields(requestFields).Error(err) return nil, err diff --git a/magefiles/main.go b/magefiles/main.go index f0782a64832..37974d93f06 100644 --- a/magefiles/main.go +++ b/magefiles/main.go @@ -111,6 +111,7 @@ func BuildCICluster() { // run integration test func CiIntegrationTests() { mg.Deps(BuildCICluster) + time.Sleep(120 * time.Second) mg.Deps(ciRunTests) }