Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data mover micro service restore #8061

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/8061-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Data mover micro service restore according to design #7576
193 changes: 186 additions & 7 deletions pkg/cmd/cli/datamover/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,37 @@
package datamover

import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/buildinfo"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"

ctlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

type dataMoverRestoreConfig struct {
Expand Down Expand Up @@ -52,7 +73,10 @@
logger.Infof("Starting Velero data-mover restore %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA())

f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
s := newdataMoverRestore(logger, config)
s, err := newdataMoverRestore(logger, f, config)
if err != nil {
exitWithMessage(logger, false, "Failed to create data mover restore, %v", err)

Check warning on line 78 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L76-L78

Added lines #L76 - L78 were not covered by tests
}

s.run()
},
Expand All @@ -74,19 +98,174 @@
}

type dataMoverRestore struct {
logger logrus.FieldLogger
config dataMoverRestoreConfig
logger logrus.FieldLogger
ctx context.Context
cancelFunc context.CancelFunc
client ctlclient.Client
cache ctlcache.Cache
namespace string
nodeName string
config dataMoverRestoreConfig
kubeClient kubernetes.Interface
dataPathMgr *datapath.Manager
}

func newdataMoverRestore(logger logrus.FieldLogger, config dataMoverRestoreConfig) *dataMoverRestore {
func newdataMoverRestore(logger logrus.FieldLogger, factory client.Factory, config dataMoverRestoreConfig) (*dataMoverRestore, error) {
ctx, cancelFunc := context.WithCancel(context.Background())

Check warning on line 114 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L113-L114

Added lines #L113 - L114 were not covered by tests

clientConfig, err := factory.ClientConfig()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client config")

Check warning on line 119 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L116-L119

Added lines #L116 - L119 were not covered by tests
}

ctrl.SetLogger(zap.New(zap.UseDevMode(true)))

Check warning on line 122 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L122

Added line #L122 was not covered by tests

scheme := runtime.NewScheme()
if err := velerov1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v1 scheme")

Check warning on line 127 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L124-L127

Added lines #L124 - L127 were not covered by tests
}

if err := velerov2alpha1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v2alpha1 scheme")

Check warning on line 132 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L130-L132

Added lines #L130 - L132 were not covered by tests
}

if err := v1.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add core v1 scheme")

Check warning on line 137 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L135-L137

Added lines #L135 - L137 were not covered by tests
}

nodeName := os.Getenv("NODE_NAME")

Check warning on line 140 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L140

Added line #L140 was not covered by tests

// use a field selector to filter to only pods scheduled on this node.
cacheOption := ctlcache.Options{
Scheme: scheme,
ByObject: map[ctlclient.Object]ctlcache.ByObject{
&v1.Pod{}: {
Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(),
},
&velerov2alpha1api.DataDownload{}: {
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
},
},

Check warning on line 152 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L143-L152

Added lines #L143 - L152 were not covered by tests
}

cli, err := ctlclient.New(clientConfig, ctlclient.Options{
Scheme: scheme,
})
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client")

Check warning on line 160 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L155-L160

Added lines #L155 - L160 were not covered by tests
}

cache, err := ctlcache.New(clientConfig, cacheOption)
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client cache")

Check warning on line 166 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L163-L166

Added lines #L163 - L166 were not covered by tests
}

s := &dataMoverRestore{
logger: logger,
config: config,
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
client: cli,
cache: cache,
config: config,
namespace: factory.Namespace(),
nodeName: nodeName,

Check warning on line 177 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L170-L177

Added lines #L170 - L177 were not covered by tests
}

s.kubeClient, err = factory.KubeClient()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create kube client")

Check warning on line 183 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L180-L183

Added lines #L180 - L183 were not covered by tests
}

return s
s.dataPathMgr = datapath.NewManager(1)

Check warning on line 186 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L186

Added line #L186 was not covered by tests

return s, nil

Check warning on line 188 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L188

Added line #L188 was not covered by tests
}

var funcCreateDataPathRestore = (*dataMoverRestore).createDataPathService

func (s *dataMoverRestore) run() {
signals.CancelOnShutdown(s.cancelFunc, s.logger)
go func() {
if err := s.cache.Start(s.ctx); err != nil {
s.logger.WithError(err).Warn("error starting cache")

Check warning on line 197 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L194-L197

Added lines #L194 - L197 were not covered by tests
}
}()

// TODOOO: call s.runDataPath()
time.Sleep(time.Duration(1<<63 - 1))
}

func (s *dataMoverRestore) runDataPath() {
s.logger.Infof("Starting micro service in node %s for dd %s", s.nodeName, s.config.ddName)

dpService, err := funcCreateDataPathRestore(s)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to create data path service for DataDownload %s: %v", s.config.ddName, err)
return
}

s.logger.Infof("Starting data path service %s", s.config.ddName)

err = dpService.Init()
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to init data path service for DataDownload %s: %v", s.config.ddName, err)
return
}

result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to run data path service for DataDownload %s: %v", s.config.ddName, err)
return
}

s.logger.WithField("dd", s.config.ddName).Info("Data path service completed")

dpService.Shutdown()

s.logger.WithField("dd", s.config.ddName).Info("Data path service is shut down")

s.cancelFunc()

funcExitWithMessage(s.logger, true, result)
}

func (s *dataMoverRestore) createDataPathService() (dataPathService, error) {
credentialFileStore, err := funcNewCredentialFileStore(
s.client,
s.namespace,
defaultCredentialsDirectory,
filesystem.NewFileSystem(),
)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential file store")
}

credSecretStore, err := funcNewCredentialSecretStore(s.client, s.namespace)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential secret store")
}

credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}

duInformer, err := s.cache.GetInformer(s.ctx, &velerov2alpha1api.DataDownload{})
if err != nil {
return nil, errors.Wrap(err, "error to get controller-runtime informer from manager")
}

repoEnsurer := repository.NewEnsurer(s.client, s.logger, s.config.resourceTimeout)

return datamover.NewRestoreMicroService(s.ctx, s.client, s.kubeClient, s.config.ddName, s.namespace, s.nodeName, datapath.AccessPoint{
ByPath: s.config.volumePath,
VolMode: uploader.PersistentVolumeMode(s.config.volumeMode),
}, s.dataPathMgr, repoEnsurer, credGetter, duInformer, s.logger), nil
}
Loading
Loading