Skip to content

Commit

Permalink
Merge pull request #8061 from Lyndon-Li/data-mover-ms-restore-1
Browse files Browse the repository at this point in the history
Data mover micro service restore
  • Loading branch information
Lyndon-Li authored Aug 1, 2024
2 parents 7811b9f + 86e5480 commit 54bd7ce
Show file tree
Hide file tree
Showing 7 changed files with 1,046 additions and 11 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8061-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Data mover micro service restore according to design #7576
193 changes: 186 additions & 7 deletions pkg/cmd/cli/datamover/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,37 @@ limitations under the License.
package datamover

import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/buildinfo"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"

ctlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

type dataMoverRestoreConfig struct {
Expand Down Expand Up @@ -52,7 +73,10 @@ func NewRestoreCommand(f client.Factory) *cobra.Command {
logger.Infof("Starting Velero data-mover restore %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA())

f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
s := newdataMoverRestore(logger, config)
s, err := newdataMoverRestore(logger, f, config)
if err != nil {
exitWithMessage(logger, false, "Failed to create data mover restore, %v", err)
}

s.run()
},
Expand All @@ -74,19 +98,174 @@ func NewRestoreCommand(f client.Factory) *cobra.Command {
}

type dataMoverRestore struct {
logger logrus.FieldLogger
config dataMoverRestoreConfig
logger logrus.FieldLogger
ctx context.Context
cancelFunc context.CancelFunc
client ctlclient.Client
cache ctlcache.Cache
namespace string
nodeName string
config dataMoverRestoreConfig
kubeClient kubernetes.Interface
dataPathMgr *datapath.Manager
}

func newdataMoverRestore(logger logrus.FieldLogger, config dataMoverRestoreConfig) *dataMoverRestore {
func newdataMoverRestore(logger logrus.FieldLogger, factory client.Factory, config dataMoverRestoreConfig) (*dataMoverRestore, error) {
ctx, cancelFunc := context.WithCancel(context.Background())

clientConfig, err := factory.ClientConfig()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client config")
}

ctrl.SetLogger(zap.New(zap.UseDevMode(true)))

scheme := runtime.NewScheme()
if err := velerov1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v1 scheme")
}

if err := velerov2alpha1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v2alpha1 scheme")
}

if err := v1.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add core v1 scheme")
}

nodeName := os.Getenv("NODE_NAME")

// use a field selector to filter to only pods scheduled on this node.
cacheOption := ctlcache.Options{
Scheme: scheme,
ByObject: map[ctlclient.Object]ctlcache.ByObject{
&v1.Pod{}: {
Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(),
},
&velerov2alpha1api.DataDownload{}: {
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
},
},
}

cli, err := ctlclient.New(clientConfig, ctlclient.Options{
Scheme: scheme,
})
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client")
}

cache, err := ctlcache.New(clientConfig, cacheOption)
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client cache")
}

s := &dataMoverRestore{
logger: logger,
config: config,
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
client: cli,
cache: cache,
config: config,
namespace: factory.Namespace(),
nodeName: nodeName,
}

s.kubeClient, err = factory.KubeClient()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create kube client")
}

return s
s.dataPathMgr = datapath.NewManager(1)

return s, nil
}

var funcCreateDataPathRestore = (*dataMoverRestore).createDataPathService

func (s *dataMoverRestore) run() {
signals.CancelOnShutdown(s.cancelFunc, s.logger)
go func() {
if err := s.cache.Start(s.ctx); err != nil {
s.logger.WithError(err).Warn("error starting cache")
}
}()

// TODOOO: call s.runDataPath()
time.Sleep(time.Duration(1<<63 - 1))
}

func (s *dataMoverRestore) runDataPath() {
s.logger.Infof("Starting micro service in node %s for dd %s", s.nodeName, s.config.ddName)

dpService, err := funcCreateDataPathRestore(s)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to create data path service for DataDownload %s: %v", s.config.ddName, err)
return
}

s.logger.Infof("Starting data path service %s", s.config.ddName)

err = dpService.Init()
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to init data path service for DataDownload %s: %v", s.config.ddName, err)
return
}

result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to run data path service for DataDownload %s: %v", s.config.ddName, err)
return
}

s.logger.WithField("dd", s.config.ddName).Info("Data path service completed")

dpService.Shutdown()

s.logger.WithField("dd", s.config.ddName).Info("Data path service is shut down")

s.cancelFunc()

funcExitWithMessage(s.logger, true, result)
}

func (s *dataMoverRestore) createDataPathService() (dataPathService, error) {
credentialFileStore, err := funcNewCredentialFileStore(
s.client,
s.namespace,
defaultCredentialsDirectory,
filesystem.NewFileSystem(),
)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential file store")
}

credSecretStore, err := funcNewCredentialSecretStore(s.client, s.namespace)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential secret store")
}

credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}

duInformer, err := s.cache.GetInformer(s.ctx, &velerov2alpha1api.DataDownload{})
if err != nil {
return nil, errors.Wrap(err, "error to get controller-runtime informer from manager")
}

repoEnsurer := repository.NewEnsurer(s.client, s.logger, s.config.resourceTimeout)

return datamover.NewRestoreMicroService(s.ctx, s.client, s.kubeClient, s.config.ddName, s.namespace, s.nodeName, datapath.AccessPoint{
ByPath: s.config.volumePath,
VolMode: uploader.PersistentVolumeMode(s.config.volumeMode),
}, s.dataPathMgr, repoEnsurer, credGetter, duInformer, s.logger), nil
}
Loading

0 comments on commit 54bd7ce

Please sign in to comment.