Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync secondary before lock #1945

Merged
merged 6 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 35 additions & 146 deletions pkg/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic

if parameters.ConfigServer != "" {
// sharded cluster. so disable the balancer first. then perform the 'usual' tasks.
primary, secondary, err := getPrimaryNSecondaryMember(parameters.ConfigServer)
primary, secondary, secondaryMembers, err := getPrimaryNSecondaryMember(parameters.ConfigServer)
if err != nil {
return nil, err
}
Expand All @@ -464,8 +464,19 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic
backupHost = secondary
}

err = lockConfigServer(parameters.ConfigServer, secondary)
// Check if secondary is already locked before locking it.
// If yes, unlock it and sync with primary
for _, secondary := range secondaryMembers {
if err := checkIfSecondaryLockedAndSync(secondary); err != nil {
return nil, err
}
}

if err := setupConfigServer(parameters.ConfigServer, secondary); err != nil {
return nil, err
}

err = lockSecondaryMember(secondary)
cleanupFuncs = append(cleanupFuncs, func() error {
// even if error occurs, try to unlock the server
return unlockSecondaryMember(secondary)
Expand All @@ -479,7 +490,7 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic

for key, host := range parameters.ReplicaSets {
// do the task
primary, secondary, err := getPrimaryNSecondaryMember(host)
primary, secondary, secondaryMembers, err := getPrimaryNSecondaryMember(host)
if err != nil {
klog.Errorf("error while getting primary and secondary member of %v. error: %v", host, err)
return nil, err
Expand All @@ -492,6 +503,14 @@ func (opt *mongoOptions) backupMongoDB(targetRef api_v1beta1.TargetRef) (*restic
backupHost = secondary
}

// Check if secondary is already locked before locking it.
// If yes, unlock it and sync with primary
for _, secondary := range secondaryMembers {
if err := checkIfSecondaryLockedAndSync(secondary); err != nil {
return nil, err
}
}

err = lockSecondaryMember(secondary)
cleanupFuncs = append(cleanupFuncs, func() error {
// even if error occurs, try to unlock the server
Expand Down Expand Up @@ -573,11 +592,10 @@ func getSSLUser(path string) (string, error) {
return strings.TrimSpace(user), nil
}

func getPrimaryNSecondaryMember(mongoDSN string) (primary, secondary string, err error) {
func getPrimaryNSecondaryMember(mongoDSN string) (primary, secondary string, secondaryMembers []string, err error) {
klog.Infoln("finding primary and secondary instances of", mongoDSN)
v := make(map[string]interface{})

// stop balancer
args := append([]interface{}{
"config",
"--host", mongoDSN,
Expand All @@ -586,33 +604,35 @@ func getPrimaryNSecondaryMember(mongoDSN string) (primary, secondary string, err
}, mongoCreds...)
// even --quiet doesn't skip replicaset PrimaryConnection log. so take tha last line. issue tracker: https://jira.mongodb.org/browse/SERVER-27159
if err := sh.Command(MongoCMD, args...).Command("/usr/bin/tail", "-1").UnmarshalJSON(&v); err != nil {
return "", "", err
return "", "", secondaryMembers, err
}

primary, ok := v["primary"].(string)
if !ok || primary == "" {
return "", "", fmt.Errorf("unable to get primary instance using rs.isMaster(). got response: %v", v)
return "", "", secondaryMembers, fmt.Errorf("unable to get primary instance using rs.isMaster(). got response: %v", v)
}

hosts, ok := v["hosts"].([]interface{})
if !ok {
return "", "", fmt.Errorf("unable to get hosts using rs.isMaster(). got response: %v", v)
return "", "", secondaryMembers, fmt.Errorf("unable to get hosts using rs.isMaster(). got response: %v", v)
}

for _, host := range hosts {
secHost, ok := host.(string)
if !ok || secHost == "" {
curHost, ok := host.(string)

if !ok || curHost == "" {
err = fmt.Errorf("unable to get secondary instance using rs.isMaster(). got response: %v", v)
continue
}

if secHost != primary {
klog.Infof("Primary %s & Secondary %s found for mongoDSN %s \n", primary, secHost, mongoDSN)
return primary, secHost, nil
if curHost != primary {
secondaryMembers = append(secondaryMembers, curHost)
}
}
if len(secondaryMembers) > 0 {
return primary, secondaryMembers[0], secondaryMembers, err
}

return primary, "", err
return primary, "", secondaryMembers, err
}

// run from mongos instance
Expand Down Expand Up @@ -699,134 +719,3 @@ func enableBalancer(mongosHost string) error {
klog.Info("Balancer successfully re-enabled.")
return nil
}

func lockConfigServer(configSVRDSN, secondaryHost string) error {
klog.Infoln("Attempting to lock configserver", configSVRDSN)

if secondaryHost == "" {
klog.Warningln("locking configserver is skipped. secondary host is empty")
return nil
}
v := make(map[string]interface{})
// findAndModify BackupControlDocument. skip single quote inside single quote: https://stackoverflow.com/a/28786747/4628962
args := append([]interface{}{
"config",
"--host", configSVRDSN,
"--quiet",
"--eval", "db.BackupControl.findAndModify({query: { _id: 'BackupControlDocument' }, update: { $inc: { counter : 1 } }, new: true, upsert: true, writeConcern: { w: 'majority', wtimeout: 15000 }});",
}, mongoCreds...)

output, err := sh.Command(MongoCMD, args...).Output()
if err != nil {
klog.Errorf("Error while running findAndModify to lock configServer : %s ; output : %s \n", err.Error(), output)
return err
}

err = json.Unmarshal(output, &v)
if err != nil {
klog.Errorf("Unmarshal error while running findAndModify to lock configServer : %s \n", err.Error())
return err
}
val, ok := v["counter"].(float64)
if !ok || int(val) == 0 {
return fmt.Errorf("unable to modify BackupControlDocument. got response: %v", v)
}
val2 := float64(0)
timer := 0 // wait approximately 5 minutes.
for timer < 60 && (int(val2) == 0 || int(val) != int(val2)) {
timer++
// find backupDocument from secondary configServer
args = append([]interface{}{
"config",
"--host", secondaryHost,
"--quiet",
"--eval", "rs.secondaryOk(); db.BackupControl.find({ '_id' : 'BackupControlDocument' }).readConcern('majority');",
}, mongoCreds...)

if err := sh.Command(MongoCMD, args...).UnmarshalJSON(&v); err != nil {
return err
}

val2, ok = v["counter"].(float64)
if !ok {
return fmt.Errorf("unable to get BackupControlDocument. got response: %v", v)
}
if int(val) != int(val2) {
klog.V(5).Infof("BackupDocument counter in secondary is not same. Expected %v, but got %v. Full response: %v", val, val2, v)
time.Sleep(time.Second * 5)
}
}
if timer >= 60 {
return fmt.Errorf("timeout while waiting for BackupDocument counter in secondary to be same as primary. Expected %v, but got %v. Full response: %v", val, val2, v)
}
// lock secondary
return lockSecondaryMember(secondaryHost)
}

func lockSecondaryMember(mongohost string) error {
klog.Infoln("Attempting to lock secondary member", mongohost)
if mongohost == "" {
klog.Warningln("locking secondary member is skipped. secondary host is empty")
return nil
}
v := make(map[string]interface{})

// lock file
args := append([]interface{}{
"config",
"--host", mongohost,
"--quiet",
"--eval", "JSON.stringify(db.fsyncLock())",
}, mongoCreds...)

output, err := sh.Command(MongoCMD, args...).Output()
if err != nil {
klog.Errorf("Error while running fsyncLock on secondary : %s ; output : %s \n", err.Error(), output)
return err
}

err = json.Unmarshal(output, &v)
if err != nil {
klog.Errorf("Unmarshal error while running fsyncLock on secondary : %s \n", err.Error())
return err
}

if val, ok := v["ok"].(float64); !ok || int(val) != 1 {
return fmt.Errorf("unable to lock the secondary host. got response: %v", v)
}
klog.Infof("secondary %s locked.", mongohost)
return nil
}

func unlockSecondaryMember(mongohost string) error {
klog.Infoln("Attempting to unlock secondary member", mongohost)
if mongohost == "" {
klog.Warningln("skipped unlocking secondary member. secondary host is empty")
return nil
}
v := make(map[string]interface{})

// unlock file
args := append([]interface{}{
"config",
"--host", mongohost,
"--quiet",
"--eval", "JSON.stringify(db.fsyncUnlock())",
}, mongoCreds...)

output, err := sh.Command(MongoCMD, args...).Output()
if err != nil {
klog.Errorf("Error while running fsyncUnlock on secondary : %s ; output : %s \n", err.Error(), output)
return err
}
err = json.Unmarshal(output, &v)
if err != nil {
klog.Errorf("Unmarshal error while running fsyncUnlock on secondary : %s \n", err.Error())
return err
}
if val, ok := v["ok"].(float64); !ok || int(val) != 1 {
return fmt.Errorf("unable to lock the secondary host. got response: %v", v)
}
klog.Infof("secondary %s unlocked.", mongohost)
return nil
}
Loading
Loading