Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add tags to snapshots and restored volumes (#43933) #44064

Merged
56 changes: 28 additions & 28 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1048,8 +1048,8 @@ def go_deps():
name = "com_github_evanphx_json_patch",
build_file_proto_mode = "disable",
importpath = "github.com/evanphx/json-patch",
sum = "h1:K1MDoo4AZ4wU0GIU/fPmtZg7VpzLjCxu+UwBD1FvwOc=",
version = "v4.1.0+incompatible",
sum = "h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=",
version = "v4.12.0+incompatible",
)
go_repository(
name = "com_github_facebookgo_clock",
Expand Down Expand Up @@ -1347,15 +1347,15 @@ def go_deps():
name = "com_github_go_openapi_jsonpointer",
build_file_proto_mode = "disable",
importpath = "github.com/go-openapi/jsonpointer",
sum = "h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=",
version = "v0.19.5",
sum = "h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE=",
version = "v0.19.6",
)
go_repository(
name = "com_github_go_openapi_jsonreference",
build_file_proto_mode = "disable",
importpath = "github.com/go-openapi/jsonreference",
sum = "h1:UBIxjkht+AWIgYzCDSv2GN+E/togfwXUJFRTWhl2Jjs=",
version = "v0.19.6",
sum = "h1:FBLnyygC4/IZZr893oiomc9XaghoveYTrLC1F86HID8=",
version = "v0.20.1",
)
go_repository(
name = "com_github_go_openapi_spec",
Expand All @@ -1368,8 +1368,8 @@ def go_deps():
name = "com_github_go_openapi_swag",
build_file_proto_mode = "disable",
importpath = "github.com/go-openapi/swag",
sum = "h1:D2NRCBzS9/pEY3gP9Nl8aDqGUcPFrwG2p+CNFrLyrCM=",
version = "v0.19.15",
sum = "h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=",
version = "v0.22.3",
)
go_repository(
name = "com_github_go_playground_locales",
Expand Down Expand Up @@ -1756,8 +1756,8 @@ def go_deps():
name = "com_github_google_gofuzz",
build_file_proto_mode = "disable_global",
importpath = "github.com/google/gofuzz",
sum = "h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=",
version = "v1.0.0",
sum = "h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=",
version = "v1.1.0",
)
go_repository(
name = "com_github_google_licensecheck",
Expand Down Expand Up @@ -2759,8 +2759,8 @@ def go_deps():
name = "com_github_mailru_easyjson",
build_file_proto_mode = "disable",
importpath = "github.com/mailru/easyjson",
sum = "h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=",
version = "v0.7.6",
sum = "h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=",
version = "v0.7.7",
)

go_repository(
Expand Down Expand Up @@ -3217,15 +3217,15 @@ def go_deps():
name = "com_github_onsi_ginkgo_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/onsi/ginkgo/v2",
sum = "h1:+Ig9nvqgS5OBSACXNk15PLdp0U9XPYROt9CFzVdFGIs=",
version = "v2.4.0",
sum = "h1:zie5Ly042PD3bsCvsSOPvRnFwyo3rKe64TJlD6nu0mk=",
version = "v2.9.1",
)
go_repository(
name = "com_github_onsi_gomega",
build_file_proto_mode = "disable_global",
importpath = "github.com/onsi/gomega",
sum = "h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys=",
version = "v1.23.0",
sum = "h1:Z2AnStgsdSayCMDiCU42qIz+HLqEPcgiOCXjAU/w+8E=",
version = "v1.27.4",
)
go_repository(
name = "com_github_openpeedeep_depguard",
Expand Down Expand Up @@ -5716,15 +5716,15 @@ def go_deps():
name = "io_k8s_api",
build_file_proto_mode = "disable",
importpath = "k8s.io/api",
sum = "h1:aBGgKJUM9Hk/3AE8WaZIApnTxG35kbuQba2w+SXqezo=",
version = "v0.0.0-20190409021203-6e4e0e4f393b",
sum = "h1:+H17AJpUMvl+clT+BPnKf0E3ksMAzoBBg7CntpSuADo=",
version = "v0.27.2",
)
go_repository(
name = "io_k8s_apimachinery",
build_file_proto_mode = "disable",
importpath = "k8s.io/apimachinery",
sum = "h1:Jmdtdt1ZnoGfWWIIik61Z7nKYgO3J+swQJtPYsP9wHA=",
version = "v0.0.0-20190404173353-6a84e37a896d",
sum = "h1:vBjGaKKieaIreI+oQwELalVG4d8f3YAMNpWLzDXkxeg=",
version = "v0.27.2",
)
go_repository(
name = "io_k8s_client_go",
Expand All @@ -5744,23 +5744,23 @@ def go_deps():
name = "io_k8s_klog_v2",
build_file_proto_mode = "disable",
importpath = "k8s.io/klog/v2",
sum = "h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=",
version = "v2.80.1",
sum = "h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=",
version = "v2.90.1",
)

go_repository(
name = "io_k8s_kube_openapi",
build_file_proto_mode = "disable",
importpath = "k8s.io/kube-openapi",
sum = "h1:tHgpQvrWaYfrnC8G4N0Oszw5HHCsZxKilDi2R7HuCSM=",
version = "v0.0.0-20180629012420-d83b052f768a",
sum = "h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg=",
version = "v0.0.0-20230501164219-8b0f38b5fd1f",
)
go_repository(
name = "io_k8s_sigs_json",
build_file_proto_mode = "disable",
importpath = "sigs.k8s.io/json",
sum = "h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=",
version = "v0.0.0-20220713155537-f223a00ba0e2",
sum = "h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=",
version = "v0.0.0-20221116044647-bc3834ca7abd",
)
go_repository(
name = "io_k8s_sigs_structured_merge_diff_v4",
Expand All @@ -5781,8 +5781,8 @@ def go_deps():
name = "io_k8s_utils",
build_file_proto_mode = "disable",
importpath = "k8s.io/utils",
sum = "h1:8r+l4bNWjRlsFYlQJnKJ2p7s1YQPj4XyXiJVqDHRx7c=",
version = "v0.0.0-20190308190857-21c4ce38f2a7",
sum = "h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY=",
version = "v0.0.0-20230209194617-a36077c30491",
)
go_repository(
name = "io_moul_zapgorm2",
Expand Down
145 changes: 131 additions & 14 deletions br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ import (
"golang.org/x/sync/errgroup"
)

const (
AnnPodNameKey string = "tidb.pingcap.com/pod-name"
AnnTemporaryVolumeID string = "temporary/volume-id"
EC2K8SClusterNameKey string = "aws:eks:cluster-name"

SourcePvcNameKey string = "source/pvcName"
SourceVolumeIdKey string = "source/VolumeId"
SourceTikvNameKey string = "source/TikvName"
SourceNamespaceKey string = "source/Namespace"
SourceContextKey string = "source/context"
)

type EC2Session struct {
ec2 ec2iface.EC2API
// aws operation concurrency
Expand All @@ -31,6 +43,14 @@ type EC2Session struct {

type VolumeAZs map[string]string

type SnapshotTags struct {
sourcePVCName string
sourceTiKVName string
sourceNameSpace string
}

type VolumeSnapshotTags map[string]SnapshotTags

func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
// aws-sdk has builtin exponential backoff retry mechanism, see:
// https://github.com/aws/aws-sdk-go/blob/db4388e8b9b19d34dcde76c492b17607cd5651e2/aws/client/default_retryer.go#L12-L16
Expand All @@ -47,20 +67,66 @@ func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
return &EC2Session{ec2: ec2Session, concurrency: concurrency}, nil
}

func GenerateVolumeSnapshotTags(backupInfo *config.EBSBasedBRMeta, pvVolumeMap map[string]string) (VolumeSnapshotTags, error) {
vst := make(VolumeSnapshotTags)
for j := range backupInfo.KubernetesMeta.PVCs {
pvc := backupInfo.KubernetesMeta.PVCs[j]
volID := pvVolumeMap[pvc.Spec.VolumeName]
if volID == "" {
return vst, errors.Errorf("No matching pv is found with name of [%s]", pvc.Spec.VolumeName)
}
vst[volID] = SnapshotTags{
pvc.GetName(),
pvc.GetLabels()[AnnPodNameKey],
pvc.GetNamespace(),
}
}
return vst, nil
}

// CreateSnapshots is the mainly steps to control the data volume snapshots.
func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[string]string, VolumeAZs, error) {
snapIDMap := make(map[string]string)
var volumeIDs []*string

var mutex sync.Mutex
eg, _ := errgroup.WithContext(context.Background())
fillResult := func(createOutput *ec2.CreateSnapshotsOutput) {

pvVolumeMap := make(map[string]string)
for j := range backupInfo.KubernetesMeta.PVs {
pv := backupInfo.KubernetesMeta.PVs[j]
pvVolumeMap[pv.GetName()] = pv.GetAnnotations()[AnnTemporaryVolumeID]
}

vst, err := GenerateVolumeSnapshotTags(backupInfo, pvVolumeMap)
if err != nil {
return snapIDMap, nil, errors.Trace(err)
}
taggingAndFillResult := func(createOutput *ec2.CreateSnapshotsOutput, vst VolumeSnapshotTags, k8sClusterName *string) error {
mutex.Lock()
defer mutex.Unlock()
for j := range createOutput.Snapshots {
snapshot := createOutput.Snapshots[j]
snapIDMap[aws.StringValue(snapshot.VolumeId)] = aws.StringValue(snapshot.SnapshotId)

createTagInput := &ec2.CreateTagsInput{
Resources: []*string{
snapshot.SnapshotId,
},
Tags: []*ec2.Tag{
ec2Tag(SourcePvcNameKey, vst[aws.StringValue(snapshot.VolumeId)].sourcePVCName),
ec2Tag(SourceVolumeIdKey, aws.StringValue(snapshot.VolumeId)),
ec2Tag(SourceTikvNameKey, vst[aws.StringValue(snapshot.VolumeId)].sourceTiKVName),
ec2Tag(SourceNamespaceKey, vst[aws.StringValue(snapshot.VolumeId)].sourceNameSpace),
ec2Tag(SourceContextKey, aws.StringValue(k8sClusterName)),
},
}
_, err := e.ec2.CreateTags(createTagInput)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

workerPool := utils.NewWorkerPool(e.concurrency, "create snapshots")
Expand Down Expand Up @@ -93,6 +159,18 @@ func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[str
if err != nil {
return snapIDMap, nil, errors.Trace(err)
}

// retrieve the k8s cluster name from EC2 instance tags
var k8sClusterName *string

for j := range resp1.Reservations[0].Instances[0].Tags {
tag := resp1.Reservations[0].Instances[0].Tags[j]
if aws.StringValue(tag.Key) == EC2K8SClusterNameKey {
k8sClusterName = tag.Value
break
}
}

for j := range resp1.Reservations[0].Instances[0].BlockDeviceMappings {
device := resp1.Reservations[0].Instances[0].BlockDeviceMappings[j]
// skip root volume
Expand Down Expand Up @@ -120,18 +198,19 @@ func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[str
instanceSpecification := ec2.InstanceSpecification{}
createSnapshotInput := ec2.CreateSnapshotsInput{}

instanceSpecification.SetInstanceId(*ec2InstanceId)
instanceSpecification.SetExcludeBootVolume(true)
instanceSpecification.SetExcludeDataVolumeIds(excludedVolumeIDs)
instanceSpecification.SetInstanceId(aws.StringValue(ec2InstanceId)).SetExcludeBootVolume(true).SetExcludeDataVolumeIds(excludedVolumeIDs)

createSnapshotInput.SetCopyTagsFromSource("volume")
createSnapshotInput.SetInstanceSpecification(&instanceSpecification)

resp, err := e.ec2.CreateSnapshots(&createSnapshotInput)
if err != nil {
return errors.Trace(err)
}
fillResult(resp)
err = taggingAndFillResult(resp, vst, k8sClusterName)
if err != nil {
return errors.Trace(err)
}

return nil
})
}
Expand Down Expand Up @@ -263,14 +342,6 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) {
func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType string, iops, throughput int64, targetAZ string) (map[string]string, error) {
template := ec2.CreateVolumeInput{
VolumeType: &volumeType,
TagSpecifications: []*ec2.TagSpecification{
{
ResourceType: aws.String(ec2.ResourceTypeVolume),
Tags: []*ec2.Tag{
ec2Tag("TiDBCluster-BR", "new"),
},
},
},
}
if iops > 0 {
template.SetIops(iops)
Expand All @@ -287,6 +358,17 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
defer mutex.Unlock()
newVolumeIDMap[oldVol.ID] = *newVol.VolumeId
}

fetchTagValue := func(tags []*ec2.Tag, key string) string {
for i := range tags {
tag := tags[i]
if aws.StringValue(tag.Key) == key {
return aws.StringValue(tag.Value)
}
}
return ""
}

workerPool := utils.NewWorkerPool(e.concurrency, "create volume")
for i := range meta.TiKVComponent.Stores {
store := meta.TiKVComponent.Stores[i]
Expand All @@ -295,12 +377,47 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
workerPool.ApplyOnErrorGroup(eg, func() error {
log.Debug("create volume from snapshot", zap.Any("volume", oldVol))
req := template

req.SetSnapshotId(oldVol.SnapshotID)

// set target AZ
if targetAZ == "" {
req.SetAvailabilityZone(oldVol.VolumeAZ)
} else {
req.SetAvailabilityZone(targetAZ)
}

// Copy interested tags of snapshots to the restored volume
tags := []*ec2.Tag{
ec2Tag("TiDBCluster-BR", "new"),
ec2Tag("ebs.csi.aws.com/cluster", "true"),
}
snapshotIds := make([]*string, 0)

snapshotIds = append(snapshotIds, &oldVol.SnapshotID)
resp, err := e.ec2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{SnapshotIds: snapshotIds})
if err != nil {
return errors.Trace(err)
}
if len(resp.Snapshots) <= 0 {
return errors.Errorf("specified snapshot [%s] is not found", oldVol.SnapshotID)
}

snapshotTags := resp.Snapshots[0].Tags
tags = append(tags, ec2Tag("snapshot/createdFromSnapshotId", oldVol.SnapshotID),
ec2Tag("snapshot/"+SourcePvcNameKey, fetchTagValue(snapshotTags, SourcePvcNameKey)),
ec2Tag("snapshot/"+SourceVolumeIdKey, fetchTagValue(snapshotTags, SourceVolumeIdKey)),
ec2Tag("snapshot/"+SourceTikvNameKey, fetchTagValue(snapshotTags, SourceTikvNameKey)),
ec2Tag("snapshot/"+SourceNamespaceKey, fetchTagValue(snapshotTags, SourceNamespaceKey)),
ec2Tag("snapshot/"+SourceContextKey, fetchTagValue(snapshotTags, SourceContextKey)))

req.SetTagSpecifications([]*ec2.TagSpecification{
{
ResourceType: aws.String(ec2.ResourceTypeVolume),
Tags: tags,
},
})

newVol, err := e.ec2.CreateVolume(&req)
if err != nil {
return errors.Trace(err)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//br/pkg/storage",
"@com_github_masterminds_semver//:semver",
"@com_github_pingcap_errors//:errors",
"@io_k8s_api//core/v1:core",
],
)

Expand Down
Loading