diff --git a/go.mod b/go.mod index 766b18bc..ad1c22c6 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/go-retryablehttp v0.7.7 github.com/metal-toolbox/fleetdb v1.18.6 - github.com/metal-toolbox/rivets v1.0.6 + github.com/metal-toolbox/rivets v1.0.9 github.com/nats-io/nats-server/v2 v2.10.12 github.com/nats-io/nats.go v1.36.0 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 37ec8d9f..442cb9a1 100644 --- a/go.sum +++ b/go.sum @@ -540,8 +540,12 @@ github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxU github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/metal-toolbox/fleetdb v1.18.6 h1:VxTs4T2zKh9KbjPNo18Z/D6OmPqArPK2PNQ8w/VmVz4= github.com/metal-toolbox/fleetdb v1.18.6/go.mod h1:QoNpDNVXxt7YqrxRIWkWp1hR3LJ2YXMnYavXwefOUnQ= -github.com/metal-toolbox/rivets v1.0.6 h1:WUFC/Lay3qVOUiswI2Tadz5Zil9gZerupuIPzln9PcE= -github.com/metal-toolbox/rivets v1.0.6/go.mod h1:JBbPEDevQkQmNHNGi4zalTjqTTMs0/0/xCtx1EKe10c= +github.com/metal-toolbox/rivets v1.0.7 h1:Tly0k9/HrlnXVaPUjUBNGUhEPq6LOBeWU7O4OqmLgOc= +github.com/metal-toolbox/rivets v1.0.7/go.mod h1:FZ47HrHgezYyY/H7z4Emw9JaGxeFmBJCXGKqy5YW6ag= +github.com/metal-toolbox/rivets v1.0.8 h1:i7E0DLqNUHP8MGhQuR+03YInk/j/K8vhRtjdlPUkAcU= +github.com/metal-toolbox/rivets v1.0.8/go.mod h1:FZ47HrHgezYyY/H7z4Emw9JaGxeFmBJCXGKqy5YW6ag= +github.com/metal-toolbox/rivets v1.0.9 h1:zQxk5CWay3GCYvpIYemo6QYc0EdwlAw1LKpXMot5k00= +github.com/metal-toolbox/rivets v1.0.9/go.mod h1:FZ47HrHgezYyY/H7z4Emw9JaGxeFmBJCXGKqy5YW6ag= github.com/microsoft/go-mssqldb v0.17.0/go.mod h1:OkoNGhGEs8EZqchVTtochlXruEhEOaO4S0d2sB5aeGQ= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= diff --git a/internal/orchestrator/updates.go b/internal/orchestrator/updates.go index 073091d8..867c6cdd 100644 --- a/internal/orchestrator/updates.go +++ b/internal/orchestrator/updates.go @@ -541,7 +541,7 @@ func (o *Orchestrator) queueFollowingCondition(ctx context.Context, evt *v1types if active != nil && active.State == rctypes.Pending { byt := active.MustBytes() subject := fmt.Sprintf("%s.servers.%s", o.facility, active.Kind) - err := o.streamBroker.Publish(ctx, subject, byt) + err := o.streamBroker.Publish(ctx, subject, byt, false) if err != nil { o.logger.WithError(err).WithFields(logrus.Fields{ "condition.id": active.ID, diff --git a/pkg/api/v1/client/client_test.go b/pkg/api/v1/client/client_test.go index 54454466..a755246f 100644 --- a/pkg/api/v1/client/client_test.go +++ b/pkg/api/v1/client/client_test.go @@ -14,6 +14,7 @@ import ( "github.com/metal-toolbox/conditionorc/internal/fleetdb" "github.com/metal-toolbox/conditionorc/internal/model" "github.com/metal-toolbox/conditionorc/internal/server" + "github.com/metal-toolbox/conditionorc/internal/store" "github.com/sirupsen/logrus" "github.com/stretchr/testify/mock" @@ -287,7 +288,13 @@ func TestConditionCreate(t *testing.T) { tester.fleetDB.On("GetServer", mock.Anything, mock.Anything).Return(&model.Server{FacilityCode: "facility"}, nil).Once() if tc.expectPublish { - tester.stream.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + tester.stream.On( + "Publish", + mock.Anything, + mock.Anything, + mock.Anything, + false, + ).Return(nil).Once() } got, err := tester.client.ServerConditionCreate(context.TODO(), serverID, rctypes.FirmwareInstall, tc.payload) @@ -325,7 +332,16 @@ func TestFirmwareInstall(t *testing.T) { AssetID: serverID, }, mockStore: func(r *store.MockRepository) { - r.On("CreateMultiple", mock.Anything, serverID, mock.Anything, mock.Anything). + r.On( + "CreateMultiple", + mock.Anything, + serverID, + mock.Anything, // I haven't figured a way pass in a variable list of conditions to this r.On mock method + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ). Return(nil).Once() }, expectResponse: func() *v1types.ServerResponse { @@ -343,7 +359,16 @@ func TestFirmwareInstall(t *testing.T) { AssetID: serverID, }, mockStore: func(r *store.MockRepository) { - r.On("CreateMultiple", mock.Anything, serverID, mock.Anything, mock.Anything). + r.On( + "CreateMultiple", + mock.Anything, + serverID, + mock.Anything, // I haven't figured a way pass in a variable list of conditions to this r.On mock method + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ). Return(fmt.Errorf("%w:%s", store.ErrActiveCondition, "pound sand")).Once() }, expectResponse: func() *v1types.ServerResponse { @@ -367,7 +392,13 @@ func TestFirmwareInstall(t *testing.T) { tester.fleetDB.On("GetServer", mock.Anything, mock.Anything).Return(&model.Server{FacilityCode: "facility"}, nil).Times(1) if tc.expectPublish { - tester.stream.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(nil).Times(1) + tester.stream.On( + "Publish", + mock.Anything, + mock.Anything, + mock.Anything, + false, + ).Return(nil).Times(1) } got, err := tester.client.ServerFirmwareInstall(context.TODO(), tc.payload) @@ -517,7 +548,13 @@ func TestServerEnroll(t *testing.T) { } if tc.expectPublish { - tester.stream.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(nil).Times(1) + tester.stream.On( + "Publish", + mock.Anything, + mock.Anything, + mock.Anything, + false, + ).Return(nil).Times(1) } defer func() { @@ -592,8 +629,9 @@ func TestServerEnrollEmptyUUID(t *testing.T) { tester.stream.On("Publish", mock.Anything, mock.Anything, - mock.Anything). - Return(nil) + mock.Anything, + false, + ).Return(nil) got, err := tester.client.ServerEnroll(context.TODO(), "", v1types.ConditionCreate{Parameters: validParams.MustJSON()}) if err != nil { diff --git a/pkg/api/v1/events/handlers.go b/pkg/api/v1/events/handlers.go index 58b7eb9b..1b4bcc10 100644 --- a/pkg/api/v1/events/handlers.go +++ b/pkg/api/v1/events/handlers.go @@ -38,7 +38,7 @@ func NewHandler(repository store.Repository, stream events.Stream, logger *logru return &Handler{repository: repository, stream: stream, logger: logger} } -// UpdateCondition sanity checks the incoming condition update, merges it and applies the result to serverservice +// UpdateCondition sanity checks the incoming condition update, merges it and applies the result to the condition record. func (h *Handler) UpdateCondition(ctx context.Context, updEvt *v1types.ConditionUpdateEvent) error { _, span := otel.Tracer(pkgName).Start(ctx, "events.UpdateCondition") defer span.End() diff --git a/pkg/api/v1/routes/handlers.go b/pkg/api/v1/routes/handlers.go index 2d8b2194..106e70bc 100644 --- a/pkg/api/v1/routes/handlers.go +++ b/pkg/api/v1/routes/handlers.go @@ -313,27 +313,8 @@ func (r *Routes) firmwareInstall(c *gin.Context) (int, *v1types.ServerResponse) } } - createTime := time.Now() - - fwCondition := &rctypes.Condition{ - Kind: rctypes.FirmwareInstall, - Version: rctypes.ConditionStructVersion, - Parameters: fw.MustJSON(), - State: rctypes.Pending, - FailOnCheckpointError: true, - CreatedAt: createTime, - } - - invCondition := &rctypes.Condition{ - Kind: rctypes.Inventory, - Version: rctypes.ConditionStructVersion, - Parameters: rctypes.MustDefaultInventoryJSON(serverID), - State: rctypes.Pending, - FailOnCheckpointError: true, - CreatedAt: createTime, - } - - if err = r.repository.CreateMultiple(otelCtx, serverID, fwCondition, invCondition); err != nil { + serverConditions := r.firmwareInstallComposite(serverID, fw) + if err = r.repository.CreateMultiple(otelCtx, serverID, serverConditions.Conditions...); err != nil { if errors.Is(err, store.ErrActiveCondition) { return http.StatusConflict, &v1types.ServerResponse{ Message: err.Error(), @@ -345,12 +326,13 @@ func (r *Routes) firmwareInstall(c *gin.Context) (int, *v1types.ServerResponse) } } - if err = r.publishCondition(otelCtx, serverID, facilityCode, fwCondition); err != nil { - r.logger.WithError(err).Warn("publishing firmware-install condition") - // mark firmwareInstall as failed - fwCondition.State = rctypes.Failed - fwCondition.Status = failedPublishStatus - if markErr := r.repository.Update(otelCtx, serverID, fwCondition); markErr != nil { + if err = r.publishCondition(otelCtx, serverID, facilityCode, serverConditions.Conditions[0], false); err != nil { + r.logger.WithField("kind", serverConditions.Conditions[0].Kind).WithError(err).Warn("error publishing condition") + // mark first condition as failed + serverConditions.Conditions[0].State = rctypes.Failed + serverConditions.Conditions[0].Status = failedPublishStatus + + if markErr := r.repository.Update(otelCtx, serverID, serverConditions.Conditions[0]); markErr != nil { // an operator is going to have to sort this out r.logger.WithError(err).Warn("marking unpublished condition failed") } @@ -366,11 +348,62 @@ func (r *Routes) firmwareInstall(c *gin.Context) (int, *v1types.ServerResponse) return http.StatusOK, &v1types.ServerResponse{ Message: "firmware install scheduled", Records: &v1types.ConditionsResponse{ - ServerID: serverID, - State: rctypes.Pending, - Conditions: []*rctypes.Condition{ - fwCondition, - invCondition, + ServerID: serverID, + State: rctypes.Pending, + Conditions: serverConditions.Conditions, + }, + } +} + +func (r *Routes) firmwareInstallComposite(serverID uuid.UUID, fwtp rctypes.FirmwareInstallTaskParameters) *rctypes.ServerConditions { + createTime := time.Now() + return &rctypes.ServerConditions{ + ServerID: serverID, + Conditions: []*rctypes.Condition{ + { + Kind: rctypes.BrokerAcquireServer, + Version: rctypes.ConditionStructVersion, + Parameters: rctypes.NewBrokerTaskParameters( + serverID, + rctypes.AcquireServer, + rctypes.PurposeFirmwareInstall, + "Marked for firmware install", + ).MustMarshal(), + State: rctypes.Pending, + CreatedAt: createTime, + }, + { + Kind: rctypes.FirmwareInstallInband, + Version: rctypes.ConditionStructVersion, + Parameters: fwtp.MustJSON(), + State: rctypes.Pending, + CreatedAt: createTime, + }, + { + Kind: rctypes.FirmwareInstall, + Version: rctypes.ConditionStructVersion, + Parameters: fwtp.MustJSON(), + State: rctypes.Pending, + CreatedAt: createTime, + }, + { + Kind: rctypes.Inventory, + Version: rctypes.ConditionStructVersion, + Parameters: rctypes.MustDefaultInventoryJSON(serverID), + State: rctypes.Pending, + CreatedAt: createTime, + }, + { + Kind: rctypes.BrokerReleaseServer, + Version: rctypes.ConditionStructVersion, + Parameters: rctypes.NewBrokerTaskParameters( + serverID, + rctypes.ReleaseServer, + "", + "Firmware install process completed", + ).MustMarshal(), + State: rctypes.Pending, + CreatedAt: createTime, }, }, } @@ -388,7 +421,7 @@ func (r *Routes) conditionCreate(otelCtx context.Context, newCondition *rctypes. } // publish the condition and in case of publish failure - revert. - err = r.publishCondition(otelCtx, serverID, facilityCode, newCondition) + err = r.publishCondition(otelCtx, serverID, facilityCode, newCondition, false) if err != nil { r.logger.WithError(err).Warn("condition create failed to publish") @@ -453,7 +486,7 @@ func RegisterSpanEvent(span trace.Span, serverID, conditionID, conditionKind, ev )) } -func (r *Routes) publishCondition(ctx context.Context, serverID uuid.UUID, facilityCode string, publishCondition *rctypes.Condition) error { +func (r *Routes) publishCondition(ctx context.Context, serverID uuid.UUID, facilityCode string, publishCondition *rctypes.Condition, rollupSubject bool) error { errPublish := errors.New("error publishing condition") otelCtx, span := otel.Tracer(pkgName).Start( @@ -485,6 +518,7 @@ func (r *Routes) publishCondition(ctx context.Context, serverID uuid.UUID, facil otelCtx, subjectSuffix, byt, + rollupSubject, ); err != nil { return errors.Wrap(errPublish, err.Error()) } diff --git a/pkg/api/v1/routes/handlers_test.go b/pkg/api/v1/routes/handlers_test.go index 4a6b2bbe..81e8c8c4 100644 --- a/pkg/api/v1/routes/handlers_test.go +++ b/pkg/api/v1/routes/handlers_test.go @@ -151,9 +151,13 @@ func TestAddServer(t *testing.T) { Once() }, mockStream: func(r *eventsm.MockStream) { - r.On("Publish", mock.Anything, fmt.Sprintf("%s.servers.%s", mockFacilityCode, rctypes.Inventory), mock.Anything). - Return(nil). - Once() + r.On( + "Publish", + mock.Anything, + fmt.Sprintf("%s.servers.%s", mockFacilityCode, rctypes.Inventory), + mock.Anything, + false, + ).Return(nil).Once() }, request: func(t *testing.T) *http.Request { payload, err := json.Marshal(&v1types.ConditionCreate{Parameters: validParams.MustJSON()}) @@ -289,9 +293,13 @@ func TestAddServer(t *testing.T) { Once() }, mockStream: func(r *eventsm.MockStream) { - r.On("Publish", mock.Anything, fmt.Sprintf("%s.servers.%s", mockFacilityCode, rctypes.Inventory), mock.Anything). - Return(nil). - Once() + r.On( + "Publish", + mock.Anything, + fmt.Sprintf("%s.servers.%s", mockFacilityCode, rctypes.Inventory), + mock.Anything, + false, + ).Return(nil).Once() }, request: func(t *testing.T) *http.Request { payload, err := json.Marshal(&v1types.ConditionCreate{Parameters: validParams.MustJSON()}) @@ -606,7 +614,7 @@ func TestAddServerRollback(t *testing.T) { } if tc.mockStreamErr.calledTime > 0 { - stream.On("Publish", mock.Anything, mock.Anything, mock.Anything). + stream.On("Publish", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(tc.mockStreamErr.err). Times(tc.mockStreamErr.calledTime) } @@ -754,9 +762,13 @@ func TestServerConditionCreate(t *testing.T) { Once() }, mockStream: func(r *eventsm.MockStream) { - r.On("Publish", mock.Anything, fmt.Sprintf("%s.servers.%s", facilityCode, rctypes.FirmwareInstall), mock.Anything). - Return(nil). - Once() + r.On( + "Publish", + mock.Anything, + fmt.Sprintf("%s.servers.%s", facilityCode, rctypes.FirmwareInstall), + mock.Anything, + false, + ).Return(nil).Once() }, request: func(t *testing.T) *http.Request { payload, err := json.Marshal(&v1types.ConditionCreate{Parameters: []byte(`{"some param": "1"}`)}) @@ -803,9 +815,13 @@ func TestServerConditionCreate(t *testing.T) { Once() }, mockStream: func(r *eventsm.MockStream) { - r.On("Publish", mock.Anything, fmt.Sprintf("%s.servers.%s", facilityCode, rctypes.FirmwareInstall), mock.Anything). - Return(nil). - Once() + r.On( + "Publish", + mock.Anything, + fmt.Sprintf("%s.servers.%s", facilityCode, rctypes.FirmwareInstall), + mock.Anything, + false, + ).Return(nil).Once() }, request: func(t *testing.T) *http.Request { fault := rctypes.Fault{Panic: true, DelayDuration: "10s", FailAt: "foobar"} @@ -890,7 +906,13 @@ func TestServerConditionCreate(t *testing.T) { Once() }, mockStream: func(r *eventsm.MockStream) { - r.On("Publish", mock.Anything, fmt.Sprintf("%s.servers.%s", facilityCode, rctypes.FirmwareInstall), mock.Anything). + r.On( + "Publish", + mock.Anything, + fmt.Sprintf("%s.servers.%s", facilityCode, rctypes.FirmwareInstall), + mock.Anything, + false, + ). Return(errors.New("gremlins in the pipes")). Once() },