Skip to content

Commit

Permalink
br: add tags to snapshots and restored volumes (pingcap#43933) (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 22, 2023
1 parent 3bc75f7 commit dabf92c
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 53 deletions.
60 changes: 30 additions & 30 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1048,8 +1048,8 @@ def go_deps():
name = "com_github_evanphx_json_patch",
build_file_proto_mode = "disable",
importpath = "github.com/evanphx/json-patch",
sum = "h1:K1MDoo4AZ4wU0GIU/fPmtZg7VpzLjCxu+UwBD1FvwOc=",
version = "v4.1.0+incompatible",
sum = "h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=",
version = "v4.12.0+incompatible",
)
go_repository(
name = "com_github_facebookgo_clock",
Expand Down Expand Up @@ -1347,15 +1347,15 @@ def go_deps():
name = "com_github_go_openapi_jsonpointer",
build_file_proto_mode = "disable",
importpath = "github.com/go-openapi/jsonpointer",
sum = "h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=",
version = "v0.19.5",
sum = "h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE=",
version = "v0.19.6",
)
go_repository(
name = "com_github_go_openapi_jsonreference",
build_file_proto_mode = "disable",
importpath = "github.com/go-openapi/jsonreference",
sum = "h1:UBIxjkht+AWIgYzCDSv2GN+E/togfwXUJFRTWhl2Jjs=",
version = "v0.19.6",
sum = "h1:FBLnyygC4/IZZr893oiomc9XaghoveYTrLC1F86HID8=",
version = "v0.20.1",
)
go_repository(
name = "com_github_go_openapi_spec",
Expand All @@ -1368,8 +1368,8 @@ def go_deps():
name = "com_github_go_openapi_swag",
build_file_proto_mode = "disable",
importpath = "github.com/go-openapi/swag",
sum = "h1:D2NRCBzS9/pEY3gP9Nl8aDqGUcPFrwG2p+CNFrLyrCM=",
version = "v0.19.15",
sum = "h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=",
version = "v0.22.3",
)
go_repository(
name = "com_github_go_playground_locales",
Expand Down Expand Up @@ -1756,8 +1756,8 @@ def go_deps():
name = "com_github_google_gofuzz",
build_file_proto_mode = "disable_global",
importpath = "github.com/google/gofuzz",
sum = "h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=",
version = "v1.0.0",
sum = "h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=",
version = "v1.1.0",
)
go_repository(
name = "com_github_google_licensecheck",
Expand Down Expand Up @@ -2759,8 +2759,8 @@ def go_deps():
name = "com_github_mailru_easyjson",
build_file_proto_mode = "disable",
importpath = "github.com/mailru/easyjson",
sum = "h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=",
version = "v0.7.6",
sum = "h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=",
version = "v0.7.7",
)

go_repository(
Expand Down Expand Up @@ -3217,15 +3217,15 @@ def go_deps():
name = "com_github_onsi_ginkgo_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/onsi/ginkgo/v2",
sum = "h1:+Ig9nvqgS5OBSACXNk15PLdp0U9XPYROt9CFzVdFGIs=",
version = "v2.4.0",
sum = "h1:zie5Ly042PD3bsCvsSOPvRnFwyo3rKe64TJlD6nu0mk=",
version = "v2.9.1",
)
go_repository(
name = "com_github_onsi_gomega",
build_file_proto_mode = "disable_global",
importpath = "github.com/onsi/gomega",
sum = "h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys=",
version = "v1.23.0",
sum = "h1:Z2AnStgsdSayCMDiCU42qIz+HLqEPcgiOCXjAU/w+8E=",
version = "v1.27.4",
)
go_repository(
name = "com_github_openpeedeep_depguard",
Expand Down Expand Up @@ -3653,8 +3653,8 @@ def go_deps():
name = "com_github_rogpeppe_go_internal",
build_file_proto_mode = "disable_global",
importpath = "github.com/rogpeppe/go-internal",
sum = "h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=",
version = "v1.9.0",
sum = "h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=",
version = "v1.10.0",
)
go_repository(
name = "com_github_rs_cors",
Expand Down Expand Up @@ -5716,15 +5716,15 @@ def go_deps():
name = "io_k8s_api",
build_file_proto_mode = "disable",
importpath = "k8s.io/api",
sum = "h1:aBGgKJUM9Hk/3AE8WaZIApnTxG35kbuQba2w+SXqezo=",
version = "v0.0.0-20190409021203-6e4e0e4f393b",
sum = "h1:+H17AJpUMvl+clT+BPnKf0E3ksMAzoBBg7CntpSuADo=",
version = "v0.27.2",
)
go_repository(
name = "io_k8s_apimachinery",
build_file_proto_mode = "disable",
importpath = "k8s.io/apimachinery",
sum = "h1:Jmdtdt1ZnoGfWWIIik61Z7nKYgO3J+swQJtPYsP9wHA=",
version = "v0.0.0-20190404173353-6a84e37a896d",
sum = "h1:vBjGaKKieaIreI+oQwELalVG4d8f3YAMNpWLzDXkxeg=",
version = "v0.27.2",
)
go_repository(
name = "io_k8s_client_go",
Expand All @@ -5744,23 +5744,23 @@ def go_deps():
name = "io_k8s_klog_v2",
build_file_proto_mode = "disable",
importpath = "k8s.io/klog/v2",
sum = "h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=",
version = "v2.80.1",
sum = "h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=",
version = "v2.90.1",
)

go_repository(
name = "io_k8s_kube_openapi",
build_file_proto_mode = "disable",
importpath = "k8s.io/kube-openapi",
sum = "h1:tHgpQvrWaYfrnC8G4N0Oszw5HHCsZxKilDi2R7HuCSM=",
version = "v0.0.0-20180629012420-d83b052f768a",
sum = "h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg=",
version = "v0.0.0-20230501164219-8b0f38b5fd1f",
)
go_repository(
name = "io_k8s_sigs_json",
build_file_proto_mode = "disable",
importpath = "sigs.k8s.io/json",
sum = "h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=",
version = "v0.0.0-20220713155537-f223a00ba0e2",
sum = "h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=",
version = "v0.0.0-20221116044647-bc3834ca7abd",
)
go_repository(
name = "io_k8s_sigs_structured_merge_diff_v4",
Expand All @@ -5781,8 +5781,8 @@ def go_deps():
name = "io_k8s_utils",
build_file_proto_mode = "disable",
importpath = "k8s.io/utils",
sum = "h1:8r+l4bNWjRlsFYlQJnKJ2p7s1YQPj4XyXiJVqDHRx7c=",
version = "v0.0.0-20190308190857-21c4ce38f2a7",
sum = "h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY=",
version = "v0.0.0-20230209194617-a36077c30491",
)
go_repository(
name = "io_moul_zapgorm2",
Expand Down
145 changes: 131 additions & 14 deletions br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ import (
"golang.org/x/sync/errgroup"
)

const (
AnnPodNameKey string = "tidb.pingcap.com/pod-name"
AnnTemporaryVolumeID string = "temporary/volume-id"
EC2K8SClusterNameKey string = "aws:eks:cluster-name"

SourcePvcNameKey string = "source/pvcName"
SourceVolumeIdKey string = "source/VolumeId"
SourceTikvNameKey string = "source/TikvName"
SourceNamespaceKey string = "source/Namespace"
SourceContextKey string = "source/context"
)

type EC2Session struct {
ec2 ec2iface.EC2API
// aws operation concurrency
Expand All @@ -31,6 +43,14 @@ type EC2Session struct {

type VolumeAZs map[string]string

type SnapshotTags struct {
sourcePVCName string
sourceTiKVName string
sourceNameSpace string
}

type VolumeSnapshotTags map[string]SnapshotTags

func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
// aws-sdk has builtin exponential backoff retry mechanism, see:
// https://github.com/aws/aws-sdk-go/blob/db4388e8b9b19d34dcde76c492b17607cd5651e2/aws/client/default_retryer.go#L12-L16
Expand All @@ -47,20 +67,66 @@ func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
return &EC2Session{ec2: ec2Session, concurrency: concurrency}, nil
}

func GenerateVolumeSnapshotTags(backupInfo *config.EBSBasedBRMeta, pvVolumeMap map[string]string) (VolumeSnapshotTags, error) {
vst := make(VolumeSnapshotTags)
for j := range backupInfo.KubernetesMeta.PVCs {
pvc := backupInfo.KubernetesMeta.PVCs[j]
volID := pvVolumeMap[pvc.Spec.VolumeName]
if volID == "" {
return vst, errors.Errorf("No matching pv is found with name of [%s]", pvc.Spec.VolumeName)
}
vst[volID] = SnapshotTags{
pvc.GetName(),
pvc.GetLabels()[AnnPodNameKey],
pvc.GetNamespace(),
}
}
return vst, nil
}

// CreateSnapshots is the mainly steps to control the data volume snapshots.
func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[string]string, VolumeAZs, error) {
snapIDMap := make(map[string]string)
var volumeIDs []*string

var mutex sync.Mutex
eg, _ := errgroup.WithContext(context.Background())
fillResult := func(createOutput *ec2.CreateSnapshotsOutput) {

pvVolumeMap := make(map[string]string)
for j := range backupInfo.KubernetesMeta.PVs {
pv := backupInfo.KubernetesMeta.PVs[j]
pvVolumeMap[pv.GetName()] = pv.GetAnnotations()[AnnTemporaryVolumeID]
}

vst, err := GenerateVolumeSnapshotTags(backupInfo, pvVolumeMap)
if err != nil {
return snapIDMap, nil, errors.Trace(err)
}
taggingAndFillResult := func(createOutput *ec2.CreateSnapshotsOutput, vst VolumeSnapshotTags, k8sClusterName *string) error {
mutex.Lock()
defer mutex.Unlock()
for j := range createOutput.Snapshots {
snapshot := createOutput.Snapshots[j]
snapIDMap[aws.StringValue(snapshot.VolumeId)] = aws.StringValue(snapshot.SnapshotId)

createTagInput := &ec2.CreateTagsInput{
Resources: []*string{
snapshot.SnapshotId,
},
Tags: []*ec2.Tag{
ec2Tag(SourcePvcNameKey, vst[aws.StringValue(snapshot.VolumeId)].sourcePVCName),
ec2Tag(SourceVolumeIdKey, aws.StringValue(snapshot.VolumeId)),
ec2Tag(SourceTikvNameKey, vst[aws.StringValue(snapshot.VolumeId)].sourceTiKVName),
ec2Tag(SourceNamespaceKey, vst[aws.StringValue(snapshot.VolumeId)].sourceNameSpace),
ec2Tag(SourceContextKey, aws.StringValue(k8sClusterName)),
},
}
_, err := e.ec2.CreateTags(createTagInput)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

workerPool := utils.NewWorkerPool(e.concurrency, "create snapshots")
Expand Down Expand Up @@ -93,6 +159,18 @@ func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[str
if err != nil {
return snapIDMap, nil, errors.Trace(err)
}

// retrieve the k8s cluster name from EC2 instance tags
var k8sClusterName *string

for j := range resp1.Reservations[0].Instances[0].Tags {
tag := resp1.Reservations[0].Instances[0].Tags[j]
if aws.StringValue(tag.Key) == EC2K8SClusterNameKey {
k8sClusterName = tag.Value
break
}
}

for j := range resp1.Reservations[0].Instances[0].BlockDeviceMappings {
device := resp1.Reservations[0].Instances[0].BlockDeviceMappings[j]
// skip root volume
Expand Down Expand Up @@ -120,18 +198,19 @@ func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[str
instanceSpecification := ec2.InstanceSpecification{}
createSnapshotInput := ec2.CreateSnapshotsInput{}

instanceSpecification.SetInstanceId(*ec2InstanceId)
instanceSpecification.SetExcludeBootVolume(true)
instanceSpecification.SetExcludeDataVolumeIds(excludedVolumeIDs)
instanceSpecification.SetInstanceId(aws.StringValue(ec2InstanceId)).SetExcludeBootVolume(true).SetExcludeDataVolumeIds(excludedVolumeIDs)

createSnapshotInput.SetCopyTagsFromSource("volume")
createSnapshotInput.SetInstanceSpecification(&instanceSpecification)

resp, err := e.ec2.CreateSnapshots(&createSnapshotInput)
if err != nil {
return errors.Trace(err)
}
fillResult(resp)
err = taggingAndFillResult(resp, vst, k8sClusterName)
if err != nil {
return errors.Trace(err)
}

return nil
})
}
Expand Down Expand Up @@ -263,14 +342,6 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) {
func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType string, iops, throughput int64, targetAZ string) (map[string]string, error) {
template := ec2.CreateVolumeInput{
VolumeType: &volumeType,
TagSpecifications: []*ec2.TagSpecification{
{
ResourceType: aws.String(ec2.ResourceTypeVolume),
Tags: []*ec2.Tag{
ec2Tag("TiDBCluster-BR", "new"),
},
},
},
}
if iops > 0 {
template.SetIops(iops)
Expand All @@ -287,6 +358,17 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
defer mutex.Unlock()
newVolumeIDMap[oldVol.ID] = *newVol.VolumeId
}

fetchTagValue := func(tags []*ec2.Tag, key string) string {
for i := range tags {
tag := tags[i]
if aws.StringValue(tag.Key) == key {
return aws.StringValue(tag.Value)
}
}
return ""
}

workerPool := utils.NewWorkerPool(e.concurrency, "create volume")
for i := range meta.TiKVComponent.Stores {
store := meta.TiKVComponent.Stores[i]
Expand All @@ -295,12 +377,47 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
workerPool.ApplyOnErrorGroup(eg, func() error {
log.Debug("create volume from snapshot", zap.Any("volume", oldVol))
req := template

req.SetSnapshotId(oldVol.SnapshotID)

// set target AZ
if targetAZ == "" {
req.SetAvailabilityZone(oldVol.VolumeAZ)
} else {
req.SetAvailabilityZone(targetAZ)
}

// Copy interested tags of snapshots to the restored volume
tags := []*ec2.Tag{
ec2Tag("TiDBCluster-BR", "new"),
ec2Tag("ebs.csi.aws.com/cluster", "true"),
}
snapshotIds := make([]*string, 0)

snapshotIds = append(snapshotIds, &oldVol.SnapshotID)
resp, err := e.ec2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{SnapshotIds: snapshotIds})
if err != nil {
return errors.Trace(err)
}
if len(resp.Snapshots) <= 0 {
return errors.Errorf("specified snapshot [%s] is not found", oldVol.SnapshotID)
}

snapshotTags := resp.Snapshots[0].Tags
tags = append(tags, ec2Tag("snapshot/createdFromSnapshotId", oldVol.SnapshotID),
ec2Tag("snapshot/"+SourcePvcNameKey, fetchTagValue(snapshotTags, SourcePvcNameKey)),
ec2Tag("snapshot/"+SourceVolumeIdKey, fetchTagValue(snapshotTags, SourceVolumeIdKey)),
ec2Tag("snapshot/"+SourceTikvNameKey, fetchTagValue(snapshotTags, SourceTikvNameKey)),
ec2Tag("snapshot/"+SourceNamespaceKey, fetchTagValue(snapshotTags, SourceNamespaceKey)),
ec2Tag("snapshot/"+SourceContextKey, fetchTagValue(snapshotTags, SourceContextKey)))

req.SetTagSpecifications([]*ec2.TagSpecification{
{
ResourceType: aws.String(ec2.ResourceTypeVolume),
Tags: tags,
},
})

newVol, err := e.ec2.CreateVolume(&req)
if err != nil {
return errors.Trace(err)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//br/pkg/storage",
"@com_github_masterminds_semver//:semver",
"@com_github_pingcap_errors//:errors",
"@io_k8s_api//core/v1:core",
],
)

Expand Down
Loading

0 comments on commit dabf92c

Please sign in to comment.