Skip to content

Commit

Permalink
GODRIVER-1958 Rename ServerID to ServiceID (mongodb#632)
Browse files Browse the repository at this point in the history
  • Loading branch information
Divjot Arora authored and Mohammad Fahim Abrar committed Mar 17, 2022
1 parent 5016d90 commit e898358
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 76 deletions.
12 changes: 8 additions & 4 deletions event/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ type CommandStartedEvent struct {
CommandName string
RequestID int64
ConnectionID string
ServerID *primitive.ObjectID // Only set for servers that are deployed behind a load balancer.
// ServiceID contains the ID of the server to which the command was sent if it is running behind a load balancer.
// Otherwise, it is unset.
ServiceID *primitive.ObjectID
}

// CommandFinishedEvent represents a generic command finishing.
Expand All @@ -31,7 +33,9 @@ type CommandFinishedEvent struct {
CommandName string
RequestID int64
ConnectionID string
ServerID *primitive.ObjectID // Only set for servers that are deployed behind a load balancer.
// ServiceID contains the ID of the server to which the command was sent if it is running behind a load balancer.
// Otherwise, it is unset.
ServiceID *primitive.ObjectID
}

// CommandSucceededEvent represents an event generated when a command's execution succeeds.
Expand Down Expand Up @@ -89,9 +93,9 @@ type PoolEvent struct {
ConnectionID uint64 `json:"connectionId"`
PoolOptions *MonitorPoolOptions `json:"options"`
Reason string `json:"reason"`
// ServerID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
// can be used to distinguish between individual servers in a load balanced deployment.
ServerID *primitive.ObjectID `json:"serverId"`
ServiceID *primitive.ObjectID `json:"serviceId"`
}

// PoolMonitor is a function that allows the user to gain access to events occurring in the pool
Expand Down
10 changes: 5 additions & 5 deletions mongo/description/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Server struct {
Passives []string
Primary address.Address
ReadOnly bool
ServerID *primitive.ObjectID // Only set for servers that are deployed behind a load balancer.
ServiceID *primitive.ObjectID // Only set for servers that are deployed behind a load balancer.
SessionTimeoutMinutes uint32
SetName string
SetVersion uint32
Expand Down Expand Up @@ -228,12 +228,12 @@ func NewServer(addr address.Address, response bson.Raw) Server {
desc.LastError = fmt.Errorf("expected 'secondary' to be a boolean but it's a BSON %s", element.Value().Type)
return desc
}
case "serverId":
case "serviceId":
oid, ok := element.Value().ObjectIDOK()
if !ok {
desc.LastError = fmt.Errorf("expected 'serverId' to be an ObjectId but it's a BSON %s", element.Value().Type)
desc.LastError = fmt.Errorf("expected 'serviceId' to be an ObjectId but it's a BSON %s", element.Value().Type)
}
desc.ServerID = &oid
desc.ServiceID = &oid
case "setName":
desc.SetName, ok = element.Value().StringValueOK()
if !ok {
Expand Down Expand Up @@ -338,7 +338,7 @@ func (s Server) DataBearing() bool {

// LoadBalanced returns true if the server is a load balancer or is behind a load balancer.
func (s Server) LoadBalanced() bool {
return s.Kind == LoadBalancer || s.ServerID != nil
return s.Kind == LoadBalancer || s.ServiceID != nil
}

// String implements the Stringer interface
Expand Down
12 changes: 6 additions & 6 deletions x/mongo/driver/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type startedInformation struct {
documentSequenceIncluded bool
connID string
redacted bool
serverID *primitive.ObjectID
serviceID *primitive.ObjectID
}

// finishedInformation keeps track of all of the information necessary for monitoring success and failure events.
Expand All @@ -83,7 +83,7 @@ type finishedInformation struct {
connID string
startTime time.Time
redacted bool
serverID *primitive.ObjectID
serviceID *primitive.ObjectID
}

// ResponseInfo contains the context required to parse a server response.
Expand Down Expand Up @@ -385,7 +385,7 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
startedInfo.connID = conn.ID()
startedInfo.cmdName = op.getCommandName(startedInfo.cmd)
startedInfo.redacted = op.redactCommand(startedInfo.cmdName, startedInfo.cmd)
startedInfo.serverID = conn.Description().ServerID
startedInfo.serviceID = conn.Description().ServiceID
op.publishStartedEvent(ctx, startedInfo)

// get the moreToCome flag information before we compress
Expand All @@ -405,7 +405,7 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
startTime: time.Now(),
connID: startedInfo.connID,
redacted: startedInfo.redacted,
serverID: startedInfo.serverID,
serviceID: startedInfo.serviceID,
}

// roundtrip using either the full roundTripper or a special one for when the moreToCome
Expand Down Expand Up @@ -1475,7 +1475,7 @@ func (op Operation) publishStartedEvent(ctx context.Context, info startedInforma
CommandName: info.cmdName,
RequestID: int64(info.requestID),
ConnectionID: info.connID,
ServerID: info.serverID,
ServiceID: info.serviceID,
}
op.CommandMonitor.Started(ctx, started)
}
Expand All @@ -1502,7 +1502,7 @@ func (op Operation) publishFinishedEvent(ctx context.Context, info finishedInfor
RequestID: int64(info.requestID),
ConnectionID: info.connID,
DurationNanos: durationNanos,
ServerID: info.serverID,
ServiceID: info.serviceID,
}

if success {
Expand Down
10 changes: 5 additions & 5 deletions x/mongo/driver/topology/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,15 @@ func (c *connection) processInitializationError(err error) {

c.connectErr = ConnectionError{Wrapped: err, init: true}
if c.config.errorHandlingCallback != nil {
c.config.errorHandlingCallback(c.connectErr, c.generation, c.desc.ServerID)
c.config.errorHandlingCallback(c.connectErr, c.generation, c.desc.ServiceID)
}
}

// setGenerationNumber sets the connection's generation number if a callback has been provided to do so in connection
// configuration.
func (c *connection) setGenerationNumber() {
if c.config.getGenerationFn != nil {
c.generation = c.config.getGenerationFn(c.desc.ServerID)
c.generation = c.config.getGenerationFn(c.desc.ServiceID)
}
}

Expand Down Expand Up @@ -231,14 +231,14 @@ func (c *connection) connect(ctx context.Context) {
c.desc = handshakeInfo.Description
c.isMasterRTT = time.Since(handshakeStartTime)

// If the application has indicated that the cluster is load balanced, ensure the server has included serverId
// If the application has indicated that the cluster is load balanced, ensure the server has included serviceId
// in its handshake response to signal that it knows it's behind an LB as well.
if c.config.loadBalanced && c.desc.ServerID == nil {
if c.config.loadBalanced && c.desc.ServiceID == nil {
err = errLoadBalancedStateMismatch
}
}
if err == nil {
// For load-balanced connections, the generation number depends on the server ID, which isn't known until the
// For load-balanced connections, the generation number depends on the service ID, which isn't known until the
// initial MongoDB handshake is done. To account for this, we don't attempt to set the connection's generation
// number unless GetHandshakeInformation succeeds.
if c.config.loadBalanced {
Expand Down
4 changes: 2 additions & 2 deletions x/mongo/driver/topology/connection_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ var DefaultDialer Dialer = &net.Dialer{}
// initialization. Implementations must be goroutine safe.
type Handshaker = driver.Handshaker

// generationNumberFn is a callback type used by a connection to fetch its generation number given its server ID.
type generationNumberFn func(serverID *primitive.ObjectID) uint64
// generationNumberFn is a callback type used by a connection to fetch its generation number given its service ID.
type generationNumberFn func(serviceID *primitive.ObjectID) uint64

type connectionConfig struct {
appName string
Expand Down
18 changes: 9 additions & 9 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) (*pool, error) {

// stale checks if a given connection's generation is below the generation of the pool
func (p *pool) stale(c *connection) bool {
return c == nil || p.generation.stale(c.desc.ServerID, c.generation)
return c == nil || p.generation.stale(c.desc.ServiceID, c.generation)
}

// connect puts the pool into the connected state, allowing it to be used and will allow items to begin being processed from the wait queue
Expand Down Expand Up @@ -511,8 +511,8 @@ func (p *pool) closeConnection(c *connection) error {
return nil
}

func (p *pool) getGenerationForNewConnection(serverID *primitive.ObjectID) uint64 {
return p.generation.addConnection(serverID)
func (p *pool) getGenerationForNewConnection(serviceID *primitive.ObjectID) uint64 {
return p.generation.addConnection(serviceID)
}

// removeConnection removes a connection from the pool.
Expand All @@ -532,7 +532,7 @@ func (p *pool) removeConnection(c *connection, reason string) error {
// Only update the generation numbers map if the connection has retrieved its generation number. Otherwise, we'd
// decrement the count for the generation even though it had never been incremented.
if c.hasGenerationNumber() {
p.generation.removeConnection(c.desc.ServerID)
p.generation.removeConnection(c.desc.ServiceID)
}

if publishEvent && p.monitor != nil {
Expand Down Expand Up @@ -579,13 +579,13 @@ func (p *pool) put(c *connection) error {
}

// clear clears the pool by incrementing the generation
func (p *pool) clear(serverID *primitive.ObjectID) {
func (p *pool) clear(serviceID *primitive.ObjectID) {
if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.PoolCleared,
Address: p.address.String(),
ServerID: serverID,
Type: event.PoolCleared,
Address: p.address.String(),
ServiceID: serviceID,
})
}
p.generation.clear(serverID)
p.generation.clear(serviceID)
}
50 changes: 25 additions & 25 deletions x/mongo/driver/topology/pool_generation_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ type generationStats struct {
numConns uint64
}

// poolGenerationMap tracks the version for each server ID present in a pool. For deployments that are not behind a load
// balancer, there is only one server ID: primitive.NilObjectID. For load-balanced deployments, each server behind the
// load balancer will have a unique server ID.
// poolGenerationMap tracks the version for each service ID present in a pool. For deployments that are not behind a
// load balancer, there is only one service ID: primitive.NilObjectID. For load-balanced deployments, each server behind
// the load balancer will have a unique service ID.
type poolGenerationMap struct {
// state must be accessed using the atomic package.
state int32
Expand All @@ -47,85 +47,85 @@ func (p *poolGenerationMap) disconnect() {
atomic.StoreInt32(&p.state, disconnected)
}

// addConnection increments the connection count for the generation associated with the given server ID and returns the
// addConnection increments the connection count for the generation associated with the given service ID and returns the
// generation number for the connection.
func (p *poolGenerationMap) addConnection(serverIDPtr *primitive.ObjectID) uint64 {
serverID := getServerID(serverIDPtr)
func (p *poolGenerationMap) addConnection(serviceIDPtr *primitive.ObjectID) uint64 {
serviceID := getServiceID(serviceIDPtr)
p.Lock()
defer p.Unlock()

stats, ok := p.generationMap[serverID]
stats, ok := p.generationMap[serviceID]
if ok {
// If the serverID is already being tracked, we only need to increment the connection count.
// If the serviceID is already being tracked, we only need to increment the connection count.
stats.numConns++
return stats.generation
}

// If the serverID is untracked, create a new entry with a starting generation number of 0.
// If the serviceID is untracked, create a new entry with a starting generation number of 0.
stats = &generationStats{
numConns: 1,
}
p.generationMap[serverID] = stats
p.generationMap[serviceID] = stats
return 0
}

func (p *poolGenerationMap) removeConnection(serverIDPtr *primitive.ObjectID) {
serverID := getServerID(serverIDPtr)
func (p *poolGenerationMap) removeConnection(serviceIDPtr *primitive.ObjectID) {
serviceID := getServiceID(serviceIDPtr)
p.Lock()
defer p.Unlock()

stats, ok := p.generationMap[serverID]
stats, ok := p.generationMap[serviceID]
if !ok {
return
}

// If the serverID is being tracked, decrement the connection count and delete this serverID to prevent the map
// If the serviceID is being tracked, decrement the connection count and delete this serviceID to prevent the map
// from growing unboundedly. This case would happen if a server behind a load-balancer was permanently removed
// and its connections were pruned after a network error or idle timeout.
stats.numConns--
if stats.numConns == 0 {
delete(p.generationMap, serverID)
delete(p.generationMap, serviceID)
}
}

func (p *poolGenerationMap) clear(serverIDPtr *primitive.ObjectID) {
serverID := getServerID(serverIDPtr)
func (p *poolGenerationMap) clear(serviceIDPtr *primitive.ObjectID) {
serviceID := getServiceID(serviceIDPtr)
p.Lock()
defer p.Unlock()

if stats, ok := p.generationMap[serverID]; ok {
if stats, ok := p.generationMap[serviceID]; ok {
stats.generation++
}
}

func (p *poolGenerationMap) stale(serverIDPtr *primitive.ObjectID, knownGeneration uint64) bool {
func (p *poolGenerationMap) stale(serviceIDPtr *primitive.ObjectID, knownGeneration uint64) bool {
// If the map has been disconnected, all connections should be considered stale to ensure that they're closed.
if atomic.LoadInt32(&p.state) == disconnected {
return true
}

serverID := getServerID(serverIDPtr)
serviceID := getServiceID(serviceIDPtr)
p.Lock()
defer p.Unlock()

if stats, ok := p.generationMap[serverID]; ok {
if stats, ok := p.generationMap[serviceID]; ok {
return knownGeneration < stats.generation
}
return false
}

func (p *poolGenerationMap) getGeneration(serverIDPtr *primitive.ObjectID) uint64 {
serverID := getServerID(serverIDPtr)
func (p *poolGenerationMap) getGeneration(serviceIDPtr *primitive.ObjectID) uint64 {
serviceID := getServiceID(serviceIDPtr)
p.Lock()
defer p.Unlock()

if stats, ok := p.generationMap[serverID]; ok {
if stats, ok := p.generationMap[serviceID]; ok {
return stats.generation
}
return 0
}

func getServerID(oid *primitive.ObjectID) primitive.ObjectID {
func getServiceID(oid *primitive.ObjectID) primitive.ObjectID {
if oid == nil {
return primitive.NilObjectID
}
Expand Down
Loading

0 comments on commit e898358

Please sign in to comment.