Skip to content

Commit

Permalink
Model transformer: model reconciliation for agent upgrades (#3878)
Browse files Browse the repository at this point in the history
  • Loading branch information
Realmonia authored Sep 7, 2023
1 parent 06b7c01 commit d985fb1
Show file tree
Hide file tree
Showing 10 changed files with 751 additions and 5 deletions.
19 changes: 16 additions & 3 deletions agent/data/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (

"github.com/aws/amazon-ecs-agent/agent/api/container"
"github.com/aws/amazon-ecs-agent/agent/api/task"
"github.com/aws/amazon-ecs-agent/agent/data/transformationfunctions"
"github.com/aws/amazon-ecs-agent/agent/engine/image"
"github.com/aws/amazon-ecs-agent/ecs-agent/modeltransformer"
"github.com/aws/amazon-ecs-agent/ecs-agent/netlib/model/networkinterface"
bolt "go.etcd.io/bbolt"
)
Expand All @@ -33,6 +35,7 @@ const (
imagesBucketName = "images"
eniAttachmentsBucketName = "eniattachments"
metadataBucketName = "metadata"
emptyAgentVersionMsg = "No version info available in boltDB. Either this is a fresh instance, or we were using state file to persist data. Transformer not applicable."
)

var (
Expand Down Expand Up @@ -94,7 +97,8 @@ type Client interface {

// client implements the Client interface using boltdb as the backing data store.
type client struct {
db *bolt.DB
db *bolt.DB
transformer *modeltransformer.Transformer
}

// New returns a data client that implements the Client interface with boltdb.
Expand All @@ -115,7 +119,8 @@ func NewWithSetup(dataDir string) (Client, error) {
return setup(dataDir)
}

// setup initiates the boltdb client and makes sure the buckets we use are created.
// setup initiates the boltdb client and makes sure the buckets we use and transformer are created, and
// registers transformation functions to transformer.
func setup(dataDir string) (*client, error) {
db, err := bolt.Open(filepath.Join(dataDir, dbName), dbMode, nil)
err = db.Update(func(tx *bolt.Tx) error {
Expand All @@ -128,11 +133,19 @@ func setup(dataDir string) (*client, error) {

return nil
})

// create transformer
transformer := modeltransformer.NewTransformer()

// registering task transformation functions
transformationfunctions.RegisterTaskTransformationFunctions(transformer)

if err != nil {
return nil, err
}
return &client{
db: db,
db: db,
transformer: transformer,
}, nil
}

Expand Down
6 changes: 5 additions & 1 deletion agent/data/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"path/filepath"
"testing"

"github.com/aws/amazon-ecs-agent/ecs-agent/modeltransformer"

"github.com/stretchr/testify/require"
bolt "go.etcd.io/bbolt"
)
Expand All @@ -28,6 +30,7 @@ func newTestClient(t *testing.T) Client {
testDir := t.TempDir()

testDB, err := bolt.Open(filepath.Join(testDir, dbName), dbMode, nil)
transformer := modeltransformer.NewTransformer()
require.NoError(t, err)
require.NoError(t, testDB.Update(func(tx *bolt.Tx) error {
for _, b := range buckets {
Expand All @@ -40,7 +43,8 @@ func newTestClient(t *testing.T) Client {
return nil
}))
testClient := &client{
db: testDB,
db: testDB,
transformer: transformer,
}

t.Cleanup(func() {
Expand Down
115 changes: 115 additions & 0 deletions agent/data/models/task_models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

//lint:file-ignore U1000 Ignore unused fields as some of them are only used by Fargate

package models

import (
"sync"
"time"

apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container"
"github.com/aws/amazon-ecs-agent/agent/api/serviceconnect"
"github.com/aws/amazon-ecs-agent/agent/api/task"
apitaskstatus "github.com/aws/amazon-ecs-agent/agent/api/task/status"
resourcetype "github.com/aws/amazon-ecs-agent/agent/taskresource/types"
nlappmesh "github.com/aws/amazon-ecs-agent/ecs-agent/netlib/model/appmesh"
)

// Task_1_0_0 is "the original model" before model transformer is created.
type Task_1_0_0 struct {
Arn string
id string
Overrides task.TaskOverrides `json:"-"`
Family string
Version string
ServiceName string
Containers []*apicontainer.Container
Associations []task.Association `json:"associations"`
ResourcesMapUnsafe resourcetype.ResourcesMap `json:"resources"`
Volumes []task.TaskVolume `json:"volumes"`
CPU float64 `json:"Cpu,omitempty"`
Memory int64 `json:"Memory,omitempty"`
DesiredStatusUnsafe apitaskstatus.TaskStatus `json:"DesiredStatus"`
KnownStatusUnsafe apitaskstatus.TaskStatus `json:"KnownStatus"`
KnownStatusTimeUnsafe time.Time `json:"KnownTime"`
PullStartedAtUnsafe time.Time `json:"PullStartedAt"`
PullStoppedAtUnsafe time.Time `json:"PullStoppedAt"`
ExecutionStoppedAtUnsafe time.Time `json:"ExecutionStoppedAt"`
SentStatusUnsafe apitaskstatus.TaskStatus `json:"SentStatus"`
ExecutionCredentialsID string `json:"executionCredentialsID"`
credentialsID string
credentialsRelativeURIUnsafe string
ENIs task.TaskENIs `json:"ENI"`
AppMesh *nlappmesh.AppMesh
MemoryCPULimitsEnabled bool `json:"MemoryCPULimitsEnabled,omitempty"`
PlatformFields task.PlatformFields `json:"PlatformFields,omitempty"`
terminalReason string
terminalReasonOnce sync.Once
PIDMode string `json:"PidMode,omitempty"`
IPCMode string `json:"IpcMode,omitempty"`
NvidiaRuntime string `json:"NvidiaRuntime,omitempty"`
LocalIPAddressUnsafe string `json:"LocalIPAddress,omitempty"`
LaunchType string `json:"LaunchType,omitempty"`
lock sync.RWMutex
setIdOnce sync.Once
ServiceConnectConfig *serviceconnect.Config `json:"ServiceConnectConfig,omitempty"`
ServiceConnectConnectionDrainingUnsafe bool `json:"ServiceConnectConnectionDraining,omitempty"`
NetworkMode string `json:"NetworkMode,omitempty"`
IsInternal bool `json:"IsInternal,omitempty"`
}

// Task_1_x_0 is an example new model with breaking change. Latest Task_1_x_0 should be the same as current Task model.
// TODO: update this model when introducing first actual transformation function
type Task_1_x_0 struct {
Arn string
id string
Overrides task.TaskOverrides `json:"-"`
Family string
Version string
ServiceName string
Containers []*apicontainer.Container
Associations []task.Association `json:"associations"`
ResourcesMapUnsafe resourcetype.ResourcesMap `json:"resources"`
Volumes []task.TaskVolume `json:"volumes"`
CPU float64 `json:"Cpu,omitempty"`
Memory int64 `json:"Memory,omitempty"`
DesiredStatusUnsafe apitaskstatus.TaskStatus `json:"DesiredStatus"`
KnownStatusUnsafe apitaskstatus.TaskStatus `json:"KnownStatus"`
KnownStatusTimeUnsafe time.Time `json:"KnownTime"`
PullStartedAtUnsafe time.Time `json:"PullStartedAt"`
PullStoppedAtUnsafe time.Time `json:"PullStoppedAt"`
ExecutionStoppedAtUnsafe time.Time `json:"ExecutionStoppedAt"`
SentStatusUnsafe apitaskstatus.TaskStatus `json:"SentStatus"`
ExecutionCredentialsID string `json:"executionCredentialsID"`
credentialsID string
credentialsRelativeURIUnsafe string
NetworkInterfaces task.TaskENIs `json:"NetworkInterfaces"`
AppMesh *nlappmesh.AppMesh
MemoryCPULimitsEnabled bool `json:"MemoryCPULimitsEnabled,omitempty"`
PlatformFields task.PlatformFields `json:"PlatformFields,omitempty"`
terminalReason string
terminalReasonOnce sync.Once
PIDMode string `json:"PidMode,omitempty"`
IPCMode string `json:"IpcMode,omitempty"`
NvidiaRuntime string `json:"NvidiaRuntime,omitempty"`
LocalIPAddressUnsafe string `json:"LocalIPAddress,omitempty"`
LaunchType string `json:"LaunchType,omitempty"`
lock sync.RWMutex
setIdOnce sync.Once
ServiceConnectConfig *serviceconnect.Config `json:"ServiceConnectConfig,omitempty"`
ServiceConnectConnectionDrainingUnsafe bool `json:"ServiceConnectConnectionDraining,omitempty"`
NetworkMode string `json:"NetworkMode,omitempty"`
IsInternal bool `json:"IsInternal,omitempty"`
}
16 changes: 15 additions & 1 deletion agent/data/task_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

apitask "github.com/aws/amazon-ecs-agent/agent/api/task"
"github.com/aws/amazon-ecs-agent/agent/utils"
"github.com/aws/amazon-ecs-agent/agent/version"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"

"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
Expand Down Expand Up @@ -50,7 +52,19 @@ func (c *client) GetTasks() ([]*apitask.Task, error) {
bucket := tx.Bucket([]byte(tasksBucketName))
return walk(bucket, func(id string, data []byte) error {
task := apitask.Task{}
if err := json.Unmarshal(data, &task); err != nil {
// transform the model before loading it to agent state. this is a noop for now.
agentVersionInDB, err := c.GetMetadata(AgentVersionKey)
if err != nil {
logger.Info(emptyAgentVersionMsg)
} else {
if c.transformer.IsUpgrade(version.Version, agentVersionInDB) {
data, err = c.transformer.TransformTask(agentVersionInDB, data)
if err != nil {
return err
}
}
}
if err = json.Unmarshal(data, &task); err != nil {
return err
}
tasks = append(tasks, &task)
Expand Down
69 changes: 69 additions & 0 deletions agent/data/transformationfunctions/tasktf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package transformationfunctions

import (
"encoding/json"
"fmt"

"github.com/aws/amazon-ecs-agent/agent/data/models"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/modeltransformer"
)

// RegisterTaskTransformationFunctions calls all registerTaskTransformationFunctions<x_y_z> in ascending order.
// (from lower threshold version to higher threshold version) thresholdVersion is the version we introduce a breaking change in.
// All versions below threshold version need to go through that specific transformation function
func RegisterTaskTransformationFunctions(t *modeltransformer.Transformer) {
registerTaskTransformationFunction1_x_0(t)
}

// registerTaskTransformationFunction1_x_0 is a template RegisterTaskTransformation function.
// It registers the transformation functions that translate the task model from models.Task_1_0_0 to models.Task_1_x_0
// Future addition to transformation functions should follow the same pattern. This current performs noop
// TODO: edit this function when introducing first actual transformation function, and add unit test
func registerTaskTransformationFunction1_x_0(t *modeltransformer.Transformer) {
thresholdVersion := "1.0.0" // this assures it never actually gets executed
t.AddTaskTransformationFunctions(thresholdVersion, func(dataIn []byte) ([]byte, error) {
logger.Info(fmt.Sprintf("Executing transformation function with threshold %s.", thresholdVersion))
oldModel := models.Task_1_0_0{}
newModel := models.Task_1_x_0{}
var intermediate map[string]interface{}

// Load json to old model (so that we can capture some fields before it is deleted)
err := json.Unmarshal(dataIn, &oldModel)
if err != nil {
return nil, err
}

// Load json to intermediate model to process
err = json.Unmarshal(dataIn, &intermediate)
if err != nil {
return nil, err
}

// Actual process to process
delete(intermediate, "ENIs")
modifiedJSON, err := json.Marshal(intermediate)
if err != nil {
return nil, err
}
err = json.Unmarshal(modifiedJSON, &newModel)
newModel.NetworkInterfaces = oldModel.ENIs
dataOut, err := json.Marshal(&newModel)
logger.Info(fmt.Sprintf("Transform associated with version %s finished.", thresholdVersion))
return dataOut, err
})
logger.Info(fmt.Sprintf("Registered transformation function with threshold %s.", thresholdVersion))
}
35 changes: 35 additions & 0 deletions agent/data/transformationfunctions/tasktf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//go:build unit
// +build unit

// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package transformationfunctions

import (
"testing"

"github.com/aws/amazon-ecs-agent/ecs-agent/modeltransformer"

"github.com/stretchr/testify/assert"
)

const (
expectedTaskTransformationChainLength = 1
)

func TestRegisterTaskTransformationFunctions(t *testing.T) {
transformer := modeltransformer.NewTransformer()
RegisterTaskTransformationFunctions(transformer)
assert.Equal(t, expectedTaskTransformationChainLength, transformer.GetNumberOfTransformationFunctions("Task"))
}
Loading

0 comments on commit d985fb1

Please sign in to comment.