Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Firmware install with server acquire/release #220

Open
wants to merge 4 commits into
base: fw-inband-devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/metal-toolbox/fleetdb v1.18.6
github.com/metal-toolbox/rivets v1.0.6
github.com/metal-toolbox/rivets v1.0.9
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.36.0
github.com/pkg/errors v0.9.1
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,12 @@ github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxU
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/metal-toolbox/fleetdb v1.18.6 h1:VxTs4T2zKh9KbjPNo18Z/D6OmPqArPK2PNQ8w/VmVz4=
github.com/metal-toolbox/fleetdb v1.18.6/go.mod h1:QoNpDNVXxt7YqrxRIWkWp1hR3LJ2YXMnYavXwefOUnQ=
github.com/metal-toolbox/rivets v1.0.6 h1:WUFC/Lay3qVOUiswI2Tadz5Zil9gZerupuIPzln9PcE=
github.com/metal-toolbox/rivets v1.0.6/go.mod h1:JBbPEDevQkQmNHNGi4zalTjqTTMs0/0/xCtx1EKe10c=
github.com/metal-toolbox/rivets v1.0.7 h1:Tly0k9/HrlnXVaPUjUBNGUhEPq6LOBeWU7O4OqmLgOc=
github.com/metal-toolbox/rivets v1.0.7/go.mod h1:FZ47HrHgezYyY/H7z4Emw9JaGxeFmBJCXGKqy5YW6ag=
github.com/metal-toolbox/rivets v1.0.8 h1:i7E0DLqNUHP8MGhQuR+03YInk/j/K8vhRtjdlPUkAcU=
github.com/metal-toolbox/rivets v1.0.8/go.mod h1:FZ47HrHgezYyY/H7z4Emw9JaGxeFmBJCXGKqy5YW6ag=
github.com/metal-toolbox/rivets v1.0.9 h1:zQxk5CWay3GCYvpIYemo6QYc0EdwlAw1LKpXMot5k00=
github.com/metal-toolbox/rivets v1.0.9/go.mod h1:FZ47HrHgezYyY/H7z4Emw9JaGxeFmBJCXGKqy5YW6ag=
github.com/microsoft/go-mssqldb v0.17.0/go.mod h1:OkoNGhGEs8EZqchVTtochlXruEhEOaO4S0d2sB5aeGQ=
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func (o *Orchestrator) queueFollowingCondition(ctx context.Context, evt *v1types
if active != nil && active.State == rctypes.Pending {
byt := active.MustBytes()
subject := fmt.Sprintf("%s.servers.%s", o.facility, active.Kind)
err := o.streamBroker.Publish(ctx, subject, byt)
err := o.streamBroker.Publish(ctx, subject, byt, false)
if err != nil {
o.logger.WithError(err).WithFields(logrus.Fields{
"condition.id": active.ID,
Expand Down
52 changes: 45 additions & 7 deletions pkg/api/v1/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/metal-toolbox/conditionorc/internal/fleetdb"
"github.com/metal-toolbox/conditionorc/internal/model"
"github.com/metal-toolbox/conditionorc/internal/server"

"github.com/metal-toolbox/conditionorc/internal/store"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -287,7 +288,13 @@ func TestConditionCreate(t *testing.T) {
tester.fleetDB.On("GetServer", mock.Anything, mock.Anything).Return(&model.Server{FacilityCode: "facility"}, nil).Once()

if tc.expectPublish {
tester.stream.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
tester.stream.On(
"Publish",
mock.Anything,
mock.Anything,
mock.Anything,
false,
).Return(nil).Once()
}

got, err := tester.client.ServerConditionCreate(context.TODO(), serverID, rctypes.FirmwareInstall, tc.payload)
Expand Down Expand Up @@ -325,7 +332,16 @@ func TestFirmwareInstall(t *testing.T) {
AssetID: serverID,
},
mockStore: func(r *store.MockRepository) {
r.On("CreateMultiple", mock.Anything, serverID, mock.Anything, mock.Anything).
r.On(
"CreateMultiple",
mock.Anything,
serverID,
mock.Anything, // I haven't figured a way pass in a variable list of conditions to this r.On mock method
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).
Return(nil).Once()
},
expectResponse: func() *v1types.ServerResponse {
Expand All @@ -343,7 +359,16 @@ func TestFirmwareInstall(t *testing.T) {
AssetID: serverID,
},
mockStore: func(r *store.MockRepository) {
r.On("CreateMultiple", mock.Anything, serverID, mock.Anything, mock.Anything).
r.On(
"CreateMultiple",
mock.Anything,
serverID,
mock.Anything, // I haven't figured a way pass in a variable list of conditions to this r.On mock method
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).
Return(fmt.Errorf("%w:%s", store.ErrActiveCondition, "pound sand")).Once()
},
expectResponse: func() *v1types.ServerResponse {
Expand All @@ -367,7 +392,13 @@ func TestFirmwareInstall(t *testing.T) {

tester.fleetDB.On("GetServer", mock.Anything, mock.Anything).Return(&model.Server{FacilityCode: "facility"}, nil).Times(1)
if tc.expectPublish {
tester.stream.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(nil).Times(1)
tester.stream.On(
"Publish",
mock.Anything,
mock.Anything,
mock.Anything,
false,
).Return(nil).Times(1)
}

got, err := tester.client.ServerFirmwareInstall(context.TODO(), tc.payload)
Expand Down Expand Up @@ -517,7 +548,13 @@ func TestServerEnroll(t *testing.T) {
}

if tc.expectPublish {
tester.stream.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(nil).Times(1)
tester.stream.On(
"Publish",
mock.Anything,
mock.Anything,
mock.Anything,
false,
).Return(nil).Times(1)
}

defer func() {
Expand Down Expand Up @@ -592,8 +629,9 @@ func TestServerEnrollEmptyUUID(t *testing.T) {
tester.stream.On("Publish",
mock.Anything,
mock.Anything,
mock.Anything).
Return(nil)
mock.Anything,
false,
).Return(nil)

got, err := tester.client.ServerEnroll(context.TODO(), "", v1types.ConditionCreate{Parameters: validParams.MustJSON()})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/v1/events/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewHandler(repository store.Repository, stream events.Stream, logger *logru
return &Handler{repository: repository, stream: stream, logger: logger}
}

// UpdateCondition sanity checks the incoming condition update, merges it and applies the result to serverservice
// UpdateCondition sanity checks the incoming condition update, merges it and applies the result to the condition record.
func (h *Handler) UpdateCondition(ctx context.Context, updEvt *v1types.ConditionUpdateEvent) error {
_, span := otel.Tracer(pkgName).Start(ctx, "events.UpdateCondition")
defer span.End()
Expand Down
102 changes: 68 additions & 34 deletions pkg/api/v1/routes/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,27 +313,8 @@ func (r *Routes) firmwareInstall(c *gin.Context) (int, *v1types.ServerResponse)
}
}

createTime := time.Now()

fwCondition := &rctypes.Condition{
Kind: rctypes.FirmwareInstall,
Version: rctypes.ConditionStructVersion,
Parameters: fw.MustJSON(),
State: rctypes.Pending,
FailOnCheckpointError: true,
CreatedAt: createTime,
}

invCondition := &rctypes.Condition{
Kind: rctypes.Inventory,
Version: rctypes.ConditionStructVersion,
Parameters: rctypes.MustDefaultInventoryJSON(serverID),
State: rctypes.Pending,
FailOnCheckpointError: true,
CreatedAt: createTime,
}

if err = r.repository.CreateMultiple(otelCtx, serverID, fwCondition, invCondition); err != nil {
serverConditions := r.firmwareInstallComposite(serverID, fw)
if err = r.repository.CreateMultiple(otelCtx, serverID, serverConditions.Conditions...); err != nil {
if errors.Is(err, store.ErrActiveCondition) {
return http.StatusConflict, &v1types.ServerResponse{
Message: err.Error(),
Expand All @@ -345,12 +326,13 @@ func (r *Routes) firmwareInstall(c *gin.Context) (int, *v1types.ServerResponse)
}
}

if err = r.publishCondition(otelCtx, serverID, facilityCode, fwCondition); err != nil {
r.logger.WithError(err).Warn("publishing firmware-install condition")
// mark firmwareInstall as failed
fwCondition.State = rctypes.Failed
fwCondition.Status = failedPublishStatus
if markErr := r.repository.Update(otelCtx, serverID, fwCondition); markErr != nil {
if err = r.publishCondition(otelCtx, serverID, facilityCode, serverConditions.Conditions[0], false); err != nil {
r.logger.WithField("kind", serverConditions.Conditions[0].Kind).WithError(err).Warn("error publishing condition")
// mark first condition as failed
serverConditions.Conditions[0].State = rctypes.Failed
serverConditions.Conditions[0].Status = failedPublishStatus

if markErr := r.repository.Update(otelCtx, serverID, serverConditions.Conditions[0]); markErr != nil {
// an operator is going to have to sort this out
r.logger.WithError(err).Warn("marking unpublished condition failed")
}
Expand All @@ -366,11 +348,62 @@ func (r *Routes) firmwareInstall(c *gin.Context) (int, *v1types.ServerResponse)
return http.StatusOK, &v1types.ServerResponse{
Message: "firmware install scheduled",
Records: &v1types.ConditionsResponse{
ServerID: serverID,
State: rctypes.Pending,
Conditions: []*rctypes.Condition{
fwCondition,
invCondition,
ServerID: serverID,
State: rctypes.Pending,
Conditions: serverConditions.Conditions,
},
}
}

func (r *Routes) firmwareInstallComposite(serverID uuid.UUID, fwtp rctypes.FirmwareInstallTaskParameters) *rctypes.ServerConditions {
createTime := time.Now()
return &rctypes.ServerConditions{
ServerID: serverID,
Conditions: []*rctypes.Condition{
{
Kind: rctypes.BrokerAcquireServer,
Version: rctypes.ConditionStructVersion,
Parameters: rctypes.NewBrokerTaskParameters(
serverID,
rctypes.AcquireServer,
rctypes.PurposeFirmwareInstall,
"Marked for firmware install",
).MustMarshal(),
State: rctypes.Pending,
CreatedAt: createTime,
},
{
Kind: rctypes.FirmwareInstallInband,
Version: rctypes.ConditionStructVersion,
Parameters: fwtp.MustJSON(),
State: rctypes.Pending,
CreatedAt: createTime,
},
{
Kind: rctypes.FirmwareInstall,
Version: rctypes.ConditionStructVersion,
Parameters: fwtp.MustJSON(),
State: rctypes.Pending,
CreatedAt: createTime,
},
{
Kind: rctypes.Inventory,
Version: rctypes.ConditionStructVersion,
Parameters: rctypes.MustDefaultInventoryJSON(serverID),
State: rctypes.Pending,
CreatedAt: createTime,
},
{
Kind: rctypes.BrokerReleaseServer,
Version: rctypes.ConditionStructVersion,
Parameters: rctypes.NewBrokerTaskParameters(
serverID,
rctypes.ReleaseServer,
"",
"Firmware install process completed",
).MustMarshal(),
State: rctypes.Pending,
CreatedAt: createTime,
},
},
}
Expand All @@ -388,7 +421,7 @@ func (r *Routes) conditionCreate(otelCtx context.Context, newCondition *rctypes.
}

// publish the condition and in case of publish failure - revert.
err = r.publishCondition(otelCtx, serverID, facilityCode, newCondition)
err = r.publishCondition(otelCtx, serverID, facilityCode, newCondition, false)
if err != nil {
r.logger.WithError(err).Warn("condition create failed to publish")

Expand Down Expand Up @@ -453,7 +486,7 @@ func RegisterSpanEvent(span trace.Span, serverID, conditionID, conditionKind, ev
))
}

func (r *Routes) publishCondition(ctx context.Context, serverID uuid.UUID, facilityCode string, publishCondition *rctypes.Condition) error {
func (r *Routes) publishCondition(ctx context.Context, serverID uuid.UUID, facilityCode string, publishCondition *rctypes.Condition, rollupSubject bool) error {
errPublish := errors.New("error publishing condition")

otelCtx, span := otel.Tracer(pkgName).Start(
Expand Down Expand Up @@ -485,6 +518,7 @@ func (r *Routes) publishCondition(ctx context.Context, serverID uuid.UUID, facil
otelCtx,
subjectSuffix,
byt,
rollupSubject,
); err != nil {
return errors.Wrap(errPublish, err.Error())
}
Expand Down
50 changes: 36 additions & 14 deletions pkg/api/v1/routes/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,13 @@ func TestAddServer(t *testing.T) {
Once()
},
mockStream: func(r *eventsm.MockStream) {
r.On("Publish", mock.Anything, fmt.Sprintf("%s.servers.%s", mockFacilityCode, rctypes.Inventory), mock.Anything).
Return(nil).
Once()
r.On(
"Publish",
mock.Anything,
fmt.Sprintf("%s.servers.%s", mockFacilityCode, rctypes.Inventory),
mock.Anything,
false,
).Return(nil).Once()
},
request: func(t *testing.T) *http.Request {
payload, err := json.Marshal(&v1types.ConditionCreate{Parameters: validParams.MustJSON()})
Expand Down Expand Up @@ -289,9 +293,13 @@ func TestAddServer(t *testing.T) {
Once()
},
mockStream: func(r *eventsm.MockStream) {
r.On("Publish", mock.Anything, fmt.Sprintf("%s.servers.%s", mockFacilityCode, rctypes.Inventory), mock.Anything).
Return(nil).
Once()
r.On(
"Publish",
mock.Anything,
fmt.Sprintf("%s.servers.%s", mockFacilityCode, rctypes.Inventory),
mock.Anything,
false,
).Return(nil).Once()
},
request: func(t *testing.T) *http.Request {
payload, err := json.Marshal(&v1types.ConditionCreate{Parameters: validParams.MustJSON()})
Expand Down Expand Up @@ -606,7 +614,7 @@ func TestAddServerRollback(t *testing.T) {
}

if tc.mockStreamErr.calledTime > 0 {
stream.On("Publish", mock.Anything, mock.Anything, mock.Anything).
stream.On("Publish", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(tc.mockStreamErr.err).
Times(tc.mockStreamErr.calledTime)
}
Expand Down Expand Up @@ -754,9 +762,13 @@ func TestServerConditionCreate(t *testing.T) {
Once()
},
mockStream: func(r *eventsm.MockStream) {
r.On("Publish", mock.Anything, fmt.Sprintf("%s.servers.%s", facilityCode, rctypes.FirmwareInstall), mock.Anything).
Return(nil).
Once()
r.On(
"Publish",
mock.Anything,
fmt.Sprintf("%s.servers.%s", facilityCode, rctypes.FirmwareInstall),
mock.Anything,
false,
).Return(nil).Once()
},
request: func(t *testing.T) *http.Request {
payload, err := json.Marshal(&v1types.ConditionCreate{Parameters: []byte(`{"some param": "1"}`)})
Expand Down Expand Up @@ -803,9 +815,13 @@ func TestServerConditionCreate(t *testing.T) {
Once()
},
mockStream: func(r *eventsm.MockStream) {
r.On("Publish", mock.Anything, fmt.Sprintf("%s.servers.%s", facilityCode, rctypes.FirmwareInstall), mock.Anything).
Return(nil).
Once()
r.On(
"Publish",
mock.Anything,
fmt.Sprintf("%s.servers.%s", facilityCode, rctypes.FirmwareInstall),
mock.Anything,
false,
).Return(nil).Once()
},
request: func(t *testing.T) *http.Request {
fault := rctypes.Fault{Panic: true, DelayDuration: "10s", FailAt: "foobar"}
Expand Down Expand Up @@ -890,7 +906,13 @@ func TestServerConditionCreate(t *testing.T) {
Once()
},
mockStream: func(r *eventsm.MockStream) {
r.On("Publish", mock.Anything, fmt.Sprintf("%s.servers.%s", facilityCode, rctypes.FirmwareInstall), mock.Anything).
r.On(
"Publish",
mock.Anything,
fmt.Sprintf("%s.servers.%s", facilityCode, rctypes.FirmwareInstall),
mock.Anything,
false,
).
Return(errors.New("gremlins in the pipes")).
Once()
},
Expand Down
Loading