Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate mongo driver #20

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions backend/backend.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
package backend

import "time"
import (
"context"
"time"
)

//go:generate counterfeiter -o ../fakes/fakebackend/fake_masterlock.go . MasterLock

type MasterLock interface {
// Achieve a lock to become the master. If lock is successful, the provided
// MasterInfo will be filled out and recorded. The MasterInfo passed in will be filled
// out with the remaining details.
Lock(info *MasterInfo) error
Lock(ctx context.Context, info *MasterInfo) error

// Release the lock to relinquish the master role. This will not succeed if the
// provided masterID does not match the ID of the current master.
UnLock(masterID string) error
UnLock(ctx context.Context, masterID string) error

// Write a heartbeat to ensure that the master role is not lost.
// If successful, the last heartbeat time is written to the passed MasterInfo
WriteHeartbeat(info *MasterInfo) error
WriteHeartbeat(ctx context.Context, info *MasterInfo) error

// Get the current master status. Provides the MasterInfo of the current master.
Status() (*MasterInfo, error)
Status(ctx context.Context) (*MasterInfo, error)
}

type MasterInfo struct {
Expand Down
40 changes: 16 additions & 24 deletions backend/mongo/masterlock.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package mongo

import (
"context"
"fmt"
"time"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
mgo "github.com/qiniu/qmgo"
"go.mongodb.org/mongo-driver/bson"

"github.com/InVisionApp/go-master/backend"
)
Expand Down Expand Up @@ -36,7 +37,7 @@ func (m *MongoMasterInfo) toMasterInfo() *backend.MasterInfo {
// Achieve a lock to become the master. If lock is successful, the provided
// MasterInfo will be filled out and recorded. The MasterInfo passed in will be filled
// out with the remaining details.
func (m *MongoBackend) Lock(info *backend.MasterInfo) error {
func (m *MongoBackend) Lock(ctx context.Context, info *backend.MasterInfo) error {
// get the heartbeat first to see if there is one before inserting
oldMMI := &MongoMasterInfo{}
t := time.Now()
Expand All @@ -48,12 +49,12 @@ func (m *MongoBackend) Lock(info *backend.MasterInfo) error {
LastHeartbeat: t,
}

err := m.lock.Collection().FindId(MasterInfoID).One(oldMMI)
err := m.lock.Collection().Find(ctx, bson.M{"_id": MasterInfoID}).One(oldMMI)
// an error has occurred and it is not a NotFound
if err != nil {
if err == mgo.ErrNotFound {
if err == mgo.ErrNoSuchDocuments {
// perform an insert
if err := m.lock.Collection().Insert(mmi); err != nil {
if _, err := m.lock.Collection().InsertOne(ctx, mmi); err != nil {
return fmt.Errorf("unable to insert initial lock: %v", err)
}

Expand All @@ -62,9 +63,6 @@ func (m *MongoBackend) Lock(info *backend.MasterInfo) error {

err = fmt.Errorf("failed to fetch current master info: %v", err)

m.log.Debug("attempting to refresh sessions in case of db issues")
m.refresh()

return err
}

Expand All @@ -76,11 +74,11 @@ func (m *MongoBackend) Lock(info *backend.MasterInfo) error {
query := bson.M{"master_id": oldMMI.MasterID, "last_heartbeat": oldMMI.LastHeartbeat}

change := mgo.Change{
Update: mmi,
Update: bson.M{"$set": mmi},
ReturnNew: true,
}

if _, err := m.lock.Collection().Find(query).Apply(change, mmi); err != nil {
if err := m.lock.Collection().Find(ctx, query).Apply(change, mmi); err != nil {
err = fmt.Errorf("unable to complete findModify: %v", err)
return err
}
Expand All @@ -89,19 +87,13 @@ func (m *MongoBackend) Lock(info *backend.MasterInfo) error {

}

// Force refresh all sessions (bypassing SmartCollection's auto-refresh)
// TODO: add auto-refreshing functionality across the board (GetNextJob, CompleteJob, etc.)
func (m *MongoBackend) refresh() {
m.lock.Collection().Database.Session.Refresh()
}

// Release the lock to relinquish the master role. This will not succeed if the
// provided masterID does not match the ID of the current master.
func (m *MongoBackend) UnLock(masterID string) error {
func (m *MongoBackend) UnLock(ctx context.Context, masterID string) error {
query := bson.M{"master_id": masterID}

if err := m.lock.Collection().Remove(query); err != nil {
if err == mgo.ErrNotFound { // not found is ok, already gone
if err := m.lock.Collection().Remove(ctx, query); err != nil {
if err == mgo.ErrNoSuchDocuments { // not found is ok, already gone
return nil
}

Expand All @@ -113,7 +105,7 @@ func (m *MongoBackend) UnLock(masterID string) error {

// Write a heartbeat to ensure that the master role is not lost.
// If successful, the last heartbeat time is written to the passed MasterInfo
func (m *MongoBackend) WriteHeartbeat(info *backend.MasterInfo) error {
func (m *MongoBackend) WriteHeartbeat(ctx context.Context, info *backend.MasterInfo) error {
query := bson.M{"master_id": info.MasterID}

lastHeartbeat := time.Now()
Expand All @@ -124,7 +116,7 @@ func (m *MongoBackend) WriteHeartbeat(info *backend.MasterInfo) error {
},
}

if err := m.lock.coll.Update(query, change); err != nil {
if err := m.lock.coll.UpdateOne(ctx, query, change); err != nil {
return fmt.Errorf("Unable to complete heartbeat update: %v", err)
}

Expand All @@ -134,9 +126,9 @@ func (m *MongoBackend) WriteHeartbeat(info *backend.MasterInfo) error {
}

// Get the current master status. Provides the MasterInfo of the current master.
func (m *MongoBackend) Status() (*backend.MasterInfo, error) {
func (m *MongoBackend) Status(ctx context.Context) (*backend.MasterInfo, error) {
mi := &backend.MasterInfo{}
if err := m.lock.Collection().FindId(MasterInfoID).One(mi); err != nil {
if err := m.lock.Collection().Find(ctx, bson.M{"_id": MasterInfoID}).One(mi); err != nil {
return nil, fmt.Errorf("failed to fetch master info: %v", err)
}

Expand Down
Loading