Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into CBG-3504
Browse files Browse the repository at this point in the history
  • Loading branch information
torcolvin committed Oct 13, 2023
2 parents 806e46f + 6537f43 commit 85764a8
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 38 deletions.
45 changes: 24 additions & 21 deletions db/import_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,38 +127,47 @@ func (il *importListener) StartImportFeed(dbContext *DatabaseContext) (err error

// ProcessFeedEvent is invoked for each mutate or delete event seen on the server's mutation feed. It may be
// executed concurrently for multiple events from different vbuckets. Filters out
// internal documents based on key, then checks sync metadata to determine whether document needs to be imported
// internal documents based on key, then checks sync metadata to determine whether document needs to be imported.
// Returns true if the checkpoints should be persisted.
func (il *importListener) ProcessFeedEvent(event sgbucket.FeedEvent) (shouldPersistCheckpoint bool) {

ctx := il.loggingCtx
// Ignore non-mutation/deletion events
if event.Opcode != sgbucket.FeedOpMutation && event.Opcode != sgbucket.FeedOpDeletion {
return true
}
key := string(event.Key)
docID := string(event.Key)

collection, ok := il.collections[event.CollectionID]
if !ok {
base.WarnfCtx(ctx, "Received import event for unrecognised collection 0x%x", event.CollectionID)
return true
}
ctx = base.CollectionLogCtx(ctx, collection.Name)

// Ignore internal documents
if strings.HasPrefix(key, base.SyncDocPrefix) {
if strings.HasPrefix(docID, base.SyncDocPrefix) {
// Ignore all DCP checkpoints no matter config group ID
return !strings.HasPrefix(key, base.DCPCheckpointRootPrefix)
return !strings.HasPrefix(docID, base.DCPCheckpointRootPrefix)
}

// If this is a delete and there are no xattrs (no existing SG revision), we shouldn't import
if event.Opcode == sgbucket.FeedOpDeletion && len(event.Value) == 0 {
base.DebugfCtx(il.loggingCtx, base.KeyImport, "Ignoring delete mutation for %s - no existing Sync Gateway metadata.", base.UD(event.Key))
base.DebugfCtx(ctx, base.KeyImport, "Ignoring delete mutation for %s - no existing Sync Gateway metadata.", base.UD(docID))
return true
}

// If this is a binary document we can ignore, but update checkpoint to avoid reprocessing upon restart
if event.DataType == base.MemcachedDataTypeRaw {
base.InfofCtx(il.loggingCtx, base.KeyImport, "Ignoring binary mutation event for %s.", base.UD(event.Key))
base.InfofCtx(ctx, base.KeyImport, "Ignoring binary mutation event for %s.", base.UD(docID))
return true
}

il.ImportFeedEvent(event)
il.ImportFeedEvent(ctx, &collection, event)
return true
}

func (il *importListener) ImportFeedEvent(event sgbucket.FeedEvent) {
func (il *importListener) ImportFeedEvent(ctx context.Context, collection *DatabaseCollectionWithUser, event sgbucket.FeedEvent) {
var importAttempt bool
startTime := time.Now()
defer func() {
Expand All @@ -171,19 +180,13 @@ func (il *importListener) ImportFeedEvent(event sgbucket.FeedEvent) {
stat := CalculateComputeStat(int64(bytes), functionTime)
il.dbStats.ImportProcessCompute.Add(stat)
}()
// Unmarshal the doc metadata (if present) to determine if this mutation requires import.
collectionCtx, ok := il.collections[event.CollectionID]
if !ok {
base.WarnfCtx(il.loggingCtx, "Received import event for unrecognised collection 0x%x", event.CollectionID)
return
}

syncData, rawBody, rawXattr, rawUserXattr, err := UnmarshalDocumentSyncDataFromFeed(event.Value, event.DataType, collectionCtx.userXattrKey(), false)
syncData, rawBody, rawXattr, rawUserXattr, err := UnmarshalDocumentSyncDataFromFeed(event.Value, event.DataType, collection.userXattrKey(), false)
if err != nil {
if err == base.ErrEmptyMetadata {
base.WarnfCtx(il.loggingCtx, "Unexpected empty metadata when processing feed event. docid: %s opcode: %v datatype:%v", base.UD(event.Key), event.Opcode, event.DataType)
base.WarnfCtx(ctx, "Unexpected empty metadata when processing feed event. docid: %s opcode: %v datatype:%v", base.UD(event.Key), event.Opcode, event.DataType)
} else {
base.WarnfCtx(il.loggingCtx, "Found sync metadata, but unable to unmarshal for feed document %q. Will not be imported. Error: %v", base.UD(event.Key), err)
base.WarnfCtx(ctx, "Found sync metadata, but unable to unmarshal for feed document %q. Will not be imported. Error: %v", base.UD(event.Key), err)
}
il.importStats.ImportErrorCount.Add(1)
return
Expand All @@ -210,19 +213,19 @@ func (il *importListener) ImportFeedEvent(event sgbucket.FeedEvent) {
// last attempt to exit processing if the importListener has been closed before attempting to write to the bucket
select {
case <-il.terminator:
base.InfofCtx(il.loggingCtx, base.KeyImport, "Aborting import for doc %q - importListener.terminator was closed", base.UD(docID))
base.InfofCtx(ctx, base.KeyImport, "Aborting import for doc %q - importListener.terminator was closed", base.UD(docID))
return
default:
}

_, err := collectionCtx.ImportDocRaw(il.loggingCtx, docID, rawBody, rawXattr, rawUserXattr, isDelete, event.Cas, &event.Expiry, ImportFromFeed)
_, err := collection.ImportDocRaw(ctx, docID, rawBody, rawXattr, rawUserXattr, isDelete, event.Cas, &event.Expiry, ImportFromFeed)
if err != nil {
if err == base.ErrImportCasFailure {
base.DebugfCtx(il.loggingCtx, base.KeyImport, "Not importing mutation - document %s has been subsequently updated and will be imported based on that mutation.", base.UD(docID))
base.DebugfCtx(ctx, base.KeyImport, "Not importing mutation - document %s has been subsequently updated and will be imported based on that mutation.", base.UD(docID))
} else if err == base.ErrImportCancelledFilter {
// No logging required - filter info already logged during importDoc
} else {
base.DebugfCtx(il.loggingCtx, base.KeyImport, "Did not import doc %q - external update will not be accessible via Sync Gateway. Reason: %v", base.UD(docID), err)
base.DebugfCtx(ctx, base.KeyImport, "Did not import doc %q - external update will not be accessible via Sync Gateway. Reason: %v", base.UD(docID), err)
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ require (
github.com/samuel/go-metrics v0.0.0-20150819231912-7ccf3e0e1fb1
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/stretchr/testify v1.8.4
golang.org/x/crypto v0.13.0
golang.org/x/crypto v0.14.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/net v0.15.0
golang.org/x/oauth2 v0.12.0
golang.org/x/net v0.17.0
golang.org/x/oauth2 v0.13.0
gopkg.in/square/go-jose.v2 v2.6.0
)

Expand Down Expand Up @@ -72,10 +72,10 @@ require (
github.com/stretchr/objx v0.5.0 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/sourcemap.v1 v1.0.5 // indirect
Expand Down
24 changes: 12 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6Wezm
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
Expand Down Expand Up @@ -195,21 +195,20 @@ github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/oauth2 v0.12.0 h1:smVPGxink+n1ZI5pkQa8y6fZT0RW0MgCO5bFpepy4B4=
golang.org/x/oauth2 v0.12.0/go.mod h1:A74bZ3aGXgCY0qaIC9Ahg6Lglin4AMAco8cIv9baba4=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/oauth2 v0.13.0 h1:jDDenyj+WgFtmV3zYVoi8aE2BwtXFLWOA67ZfNWftiY=
golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn0=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -225,15 +224,16 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
Expand All @@ -245,8 +245,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
Expand Down

0 comments on commit 85764a8

Please sign in to comment.