-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(lib): implement executeOperationV2
Fixes NODE-1896
- Loading branch information
Showing
4 changed files
with
242 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
'use strict'; | ||
|
||
const MongoError = require('mongodb-core').MongoError; | ||
const Aspect = require('./operation').Aspect; | ||
const OperationBase = require('./operation').OperationBase; | ||
|
||
/** | ||
* Executes the given operation with provided arguments. | ||
* | ||
* This method reduces large amounts of duplication in the entire codebase by providing | ||
* a single point for determining whether callbacks or promises should be used. Additionally | ||
* it allows for a single point of entry to provide features such as implicit sessions, which | ||
* are required by the Driver Sessions specification in the event that a ClientSession is | ||
* not provided | ||
* | ||
* @param {object} topology The topology to execute this operation on | ||
* @param {Operation} operation The operation to execute | ||
* @param {function} callback The command result callback | ||
*/ | ||
function executeOperationV2(topology, operation, callback) { | ||
if (topology == null) { | ||
throw new TypeError('This method requires a valid topology instance'); | ||
} | ||
|
||
if (!(operation instanceof OperationBase)) { | ||
throw new TypeError('This method requires a valid operation instance'); | ||
} | ||
|
||
const Promise = topology.s.promiseLibrary; | ||
|
||
// The driver sessions spec mandates that we implicitly create sessions for operations | ||
// that are not explicitly provided with a session. | ||
let session, owner; | ||
if (!operation.hasAspect(Aspect.SKIP_SESSION) && topology.hasSessionSupport()) { | ||
if (operation.session == null) { | ||
owner = Symbol(); | ||
session = topology.startSession({ owner }); | ||
operation.session = session; | ||
} else if (operation.session.hasEnded) { | ||
throw new MongoError('Use of expired sessions is not permitted'); | ||
} | ||
} | ||
|
||
const makeExecuteCallback = (resolve, reject) => | ||
function executeCallback(err, result) { | ||
if (session && session.owner === owner) { | ||
session.endSession(() => { | ||
if (operation.session === session) { | ||
operation.clearSession(); | ||
} | ||
if (err) return reject(err); | ||
resolve(result); | ||
}); | ||
} else { | ||
if (err) return reject(err); | ||
resolve(result); | ||
} | ||
}; | ||
|
||
// Execute using callback | ||
if (typeof callback === 'function') { | ||
const handler = makeExecuteCallback( | ||
result => callback(null, result), | ||
err => callback(err, null) | ||
); | ||
|
||
try { | ||
return operation.execute(handler); | ||
} catch (e) { | ||
handler(e); | ||
throw e; | ||
} | ||
} | ||
|
||
return new Promise(function(resolve, reject) { | ||
const handler = makeExecuteCallback(resolve, reject); | ||
|
||
try { | ||
return operation.execute(handler); | ||
} catch (e) { | ||
handler(e); | ||
} | ||
}); | ||
} | ||
|
||
module.exports = executeOperationV2; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
'use strict'; | ||
|
||
const applyRetryableWrites = require('../utils').applyRetryableWrites; | ||
const applyWriteConcern = require('../utils').applyWriteConcern; | ||
const handleCallback = require('../utils').handleCallback; | ||
const MongoError = require('mongodb-core').MongoError; | ||
const OperationBase = require('./operation').OperationBase; | ||
const toError = require('../utils').toError; | ||
|
||
class InsertOneOperation extends OperationBase { | ||
constructor(collection, doc, options) { | ||
super(options); | ||
|
||
this.collection = collection; | ||
this.doc = doc; | ||
} | ||
|
||
execute(callback) { | ||
const coll = this.collection; | ||
const doc = this.doc; | ||
const options = this.options; | ||
|
||
if (Array.isArray(doc)) { | ||
return callback( | ||
MongoError.create({ message: 'doc parameter must be an object', driver: true }) | ||
); | ||
} | ||
|
||
insertDocuments(coll, [doc], options, (err, r) => { | ||
if (callback == null) return; | ||
if (err && callback) return callback(err); | ||
// Workaround for pre 2.6 servers | ||
if (r == null) return callback(null, { result: { ok: 1 } }); | ||
// Add values to top level to ensure crud spec compatibility | ||
r.insertedCount = r.result.n; | ||
r.insertedId = doc._id; | ||
if (callback) callback(null, r); | ||
}); | ||
} | ||
} | ||
|
||
function insertDocuments(coll, docs, options, callback) { | ||
if (typeof options === 'function') (callback = options), (options = {}); | ||
options = options || {}; | ||
// Ensure we are operating on an array op docs | ||
docs = Array.isArray(docs) ? docs : [docs]; | ||
|
||
// Final options for retryable writes and write concern | ||
let finalOptions = Object.assign({}, options); | ||
finalOptions = applyRetryableWrites(finalOptions, coll.s.db); | ||
finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options); | ||
|
||
// If keep going set unordered | ||
if (finalOptions.keepGoing === true) finalOptions.ordered = false; | ||
finalOptions.serializeFunctions = options.serializeFunctions || coll.s.serializeFunctions; | ||
|
||
docs = prepareDocs(coll, docs, options); | ||
|
||
// File inserts | ||
coll.s.topology.insert(coll.s.namespace, docs, finalOptions, (err, result) => { | ||
if (callback == null) return; | ||
if (err) return handleCallback(callback, err); | ||
if (result == null) return handleCallback(callback, null, null); | ||
if (result.result.code) return handleCallback(callback, toError(result.result)); | ||
if (result.result.writeErrors) | ||
return handleCallback(callback, toError(result.result.writeErrors[0])); | ||
// Add docs to the list | ||
result.ops = docs; | ||
// Return the results | ||
handleCallback(callback, null, result); | ||
}); | ||
} | ||
|
||
function prepareDocs(coll, docs, options) { | ||
const forceServerObjectId = | ||
typeof options.forceServerObjectId === 'boolean' | ||
? options.forceServerObjectId | ||
: coll.s.db.options.forceServerObjectId; | ||
|
||
// no need to modify the docs if server sets the ObjectId | ||
if (forceServerObjectId === true) { | ||
return docs; | ||
} | ||
|
||
return docs.map(doc => { | ||
if (forceServerObjectId !== true && doc._id == null) { | ||
doc._id = coll.s.pkFactory.createPk(); | ||
} | ||
|
||
return doc; | ||
}); | ||
} | ||
|
||
module.exports = InsertOneOperation; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
'use strict'; | ||
|
||
const Aspect = { | ||
SKIP_SESSION: Symbol('SKIP_SESSION') | ||
}; | ||
|
||
/** | ||
* This class acts as a parent class for any operation and is responsible for setting this.options, | ||
* as well as setting and getting a session. | ||
* Additionally, this class implements `hasAspect`, which determines whether an operation has | ||
* a specific aspect, including `SKIP_SESSION` and other aspects to encode retryability | ||
* and other functionality. | ||
*/ | ||
class OperationBase { | ||
constructor(options) { | ||
this.options = options || {}; | ||
} | ||
|
||
hasAspect(aspect) { | ||
if (this.constructor.aspects == null) { | ||
return false; | ||
} | ||
return this.constructor.aspects.has(aspect); | ||
} | ||
|
||
set session(session) { | ||
Object.assign(this.options, { session }); | ||
} | ||
|
||
get session() { | ||
return this.options.session; | ||
} | ||
|
||
clearSession() { | ||
delete this.options.session; | ||
} | ||
|
||
execute() { | ||
throw new TypeError('`execute` must be implemented for OperationBase subclasses'); | ||
} | ||
} | ||
|
||
function defineAspects(operation, aspects) { | ||
aspects = new Set(aspects); | ||
Object.defineProperty(operation, 'aspects', { | ||
value: aspects, | ||
writable: false | ||
}); | ||
return aspects; | ||
} | ||
|
||
module.exports = { | ||
Aspect, | ||
defineAspects, | ||
OperationBase | ||
}; |