diff --git a/README.md b/README.md index 75fea3a..b714768 100644 --- a/README.md +++ b/README.md @@ -296,6 +296,39 @@ let maybeStringFuture: EventLoopFuture = fdb.withTransaction(on: myEven Thus your block of code will be gently retried until transaction is successfully committed. +### Writing to Versionstamped Keys + +As a special type of atomic operation, values can be written to special keys that are guaranteed to be unique. These keys make use of an incomplete versionstamp within their tuples, which will be completed by the underlying cluster when data is written. The Versionstamp that was used within a transaction can then be retrieved so it can be referenced elsewhere. + +An incomplete versionstamp can be created and added to tuples using the `FDB.Versionstamp()` initializer. The `userData` field is optional, and serves to further order keys if multiple are written within the same transaction. + +Within a transaction's block, the `set(versionstampedKey: key, value: value)` method can be used to write to keys with incomplete versionstamps. This method will search the key for an incomplete versionstamp, and if one is found, will flag it to be replaced by a complete versionstamp once it's written to the cluster. If an incomplete versionstamp was not found, a `FDB.Error.missingIncompleteVersionstamp` error will be thrown. + +If you need the complete versionstamp that was used within the key, you can call `getVersionstamp()` before the transaction is committed. Note that this method must be called within the same transaction that a versionstamped key was written in, otherwise it won't know which versionstamp to return. Also note that this versionstamp does not include any user data that was associated with it, since it will be the same versionstamp no matter how many versionstamped keys were written. + +```swift +let keyWithVersionstampPlaceholder = self.subspace[FDB.Versionstamp(userData: 42)]["anotherKey"] +let valueToWrite: String = "Hello, World!" + +let versionstampedKeyFuture: EventLoopFuture = fdb.withTransaction(on: self.eventLoop) { transaction in + transaction + .set(versionstampedKey: keyWithVersionstampPlaceholder, value: Bytes(valueToWrite.utf8)) + .flatMap { _ in + return transaction.getVersionstamp(commit: true) + } +} + +// When the versionstamp eventually resolves… +versionstampedKeyFuture.whenSuccess { versionstamp in + var versionstamp = versionstamp + versionstamp.userData = 42 + + let actualKey = self.subspace[versionstamp]["anotherKey"] + + // … return it to user, save it as a reference to another entry, etc… +} +``` + ### Complete example ```swift diff --git a/Sources/FDB/AnyFDBTransaction.swift b/Sources/FDB/AnyFDBTransaction.swift index 1eea890..d06820f 100644 --- a/Sources/FDB/AnyFDBTransaction.swift +++ b/Sources/FDB/AnyFDBTransaction.swift @@ -68,14 +68,15 @@ public protocol AnyFDBTransaction { /// an incomplete version stamp, this method will throw an error. The actual version stamp used /// may be retrieved by calling `getVersionstamp()` on the transaction. /// + /// If a version stamp cannot be found, this will return a failed future with FDB.Error.missingIncompleteVersionstamp. + /// /// - parameters: /// - versionstampedKey: FDB key containing an incomplete Versionstamp /// - value: Bytes value /// - commit: Whether to commit this transaction after action or not /// - /// - Throws: Throws FDB.Error.missingIncompleteVersionstamp if a version stamp cannot be found /// - returns: EventLoopFuture with future Transaction (`self`) value - func set(versionstampedKey: AnyFDBKey, value: Bytes, commit: Bool) throws -> EventLoopFuture + func set(versionstampedKey: AnyFDBKey, value: Bytes, commit: Bool) -> EventLoopFuture /// Returns bytes value for given key (or `nil` if no key) /// @@ -291,11 +292,19 @@ public protocol AnyFDBTransaction { /// - returns: EventLoopFuture with future Int64 value func getReadVersion() -> EventLoopFuture - /// Returns versionstamp which was used by any versionstamp operations in this transaction + /// Returns a future for the versionstamp which was used by any versionstamp operations + /// in this transaction. Unlike the synchronous version of the same name, this method + /// does _not_ commit by default. /// /// - returns: EventLoopFuture with future FDB.Versionstamp value func getVersionstamp() -> EventLoopFuture + /// Returns a future for the versionstamp which was used by any versionstamp operations + /// in this transaction, and optionally commit the transaction right afterwards + /// + /// - returns: EventLoopFuture with future FDB.Versionstamp value + func getVersionstamp(commit shouldCommit: Bool) -> EventLoopFuture + /// Sync methods /// Commits current transaction @@ -466,7 +475,9 @@ public protocol AnyFDBTransaction { /// - returns: Read version as Int64 func getReadVersion() throws -> Int64 - /// Returns versionstamp which was used by any versionstamp operations in this transaction + /// Commits transaction and returns versionstamp which was used by any versionstamp operations + /// in this transaction. Note that this method must commit the transaction in order to wait for the + /// versionstamp to become available. /// /// This function will block current thread during execution /// @@ -694,15 +705,16 @@ public extension AnyFDBTransaction { /// an incomplete version stamp, this method will throw an error. The actual version stamp used /// may be retrieved by calling `getVersionstamp()` on the transaction. /// + /// If a version stamp cannot be found, this will return a failed future with FDB.Error.missingIncompleteVersionstamp. + /// /// - parameters: /// - versionstampedKey: FDB key containing an incomplete Versionstamp /// - value: Bytes value /// - commit: Whether to commit this transaction after action or not /// - /// - Throws: Throws FDB.Error.missingIncompleteVersionstamp if a version stamp cannot be found /// - returns: EventLoopFuture with future Transaction (`self`) value - func set(versionstampedKey: AnyFDBKey, value: Bytes, commit: Bool = false) throws -> EventLoopFuture { - return try self.set(versionstampedKey: versionstampedKey, value: value, commit: commit) + func set(versionstampedKey: AnyFDBKey, value: Bytes, commit: Bool = false) -> EventLoopFuture { + return self.set(versionstampedKey: versionstampedKey, value: value, commit: commit) } /// Returns bytes value for given key (or `nil` if no key) @@ -996,4 +1008,25 @@ public extension AnyFDBTransaction { ) -> EventLoopFuture { return self.atomic(op, key: key, value: value, commit: commit) } + + /// Returns a future for the versionstamp which was used by any versionstamp operations + /// in this transaction. Unlike the synchronous version of the same name, this method + /// does _not_ commit by default. + /// + /// - returns: EventLoopFuture with future FDB.Versionstamp value + func getVersionstamp() -> EventLoopFuture<(FDB.Versionstamp, AnyFDBTransaction)> { + return self + .getVersionstamp() + .map { ($0, self) } + } + + /// Returns a future for the versionstamp which was used by any versionstamp operations + /// in this transaction, and optionally commit the transaction right afterwards + /// + /// - returns: EventLoopFuture with future FDB.Versionstamp value + func getVersionstamp(commit shouldCommit: Bool) -> EventLoopFuture<(FDB.Versionstamp, AnyFDBTransaction)> { + return self + .getVersionstamp(commit: shouldCommit) + .map { ($0, self) } + } } diff --git a/Sources/FDB/Future/Future+Bytes.swift b/Sources/FDB/Future/Future+Bytes.swift index ee7ffb0..be8a8a0 100644 --- a/Sources/FDB/Future/Future+Bytes.swift +++ b/Sources/FDB/Future/Future+Bytes.swift @@ -48,7 +48,7 @@ extension FDB.Future { /// Parses key bytes result from current future /// /// Warning: this should be only called if future is in resolved state - func parseKeyBytes() throws -> Bytes { + internal func parseKeyBytes() throws -> Bytes { var readKey: UnsafePointer! var readKeyLength: Int32 = 0 try fdb_future_get_key(self.pointer, &readKey, &readKeyLength).orThrow() diff --git a/Sources/FDB/Transaction/Transaction+NIO.swift b/Sources/FDB/Transaction/Transaction+NIO.swift index e742d7a..435bbc6 100644 --- a/Sources/FDB/Transaction/Transaction+NIO.swift +++ b/Sources/FDB/Transaction/Transaction+NIO.swift @@ -60,12 +60,21 @@ public extension FDB.Transaction { return future } - func set(versionstampedKey: AnyFDBKey, value: Bytes, commit: Bool) throws -> EventLoopFuture { - var serializedKey = versionstampedKey.asFDBKey() - let offset = try FDB.Tuple.offsetOfFirstIncompleteVersionstamp(from: serializedKey) - serializedKey.append(contentsOf: getBytes(offset.littleEndian)) - - return self.atomic(.setVersionstampedKey, key: serializedKey, value: value, commit: commit) + func set(versionstampedKey: AnyFDBKey, value: Bytes, commit: Bool) -> EventLoopFuture { + guard let eventLoop = self.eventLoop else { + self.log("[set versionstampedKey] No event loop", level: .error) + return FDB.dummyEventLoop.makeFailedFuture(FDB.Error.noEventLoopProvided) + } + + do { + var serializedKey = versionstampedKey.asFDBKey() + let offset = try FDB.Tuple.offsetOfFirstIncompleteVersionstamp(from: serializedKey) + serializedKey.append(contentsOf: getBytes(offset.littleEndian)) + + return self.atomic(.setVersionstampedKey, key: serializedKey, value: value, commit: commit) + } catch { + return eventLoop.makeFailedFuture(error) + } } func get( @@ -363,8 +372,20 @@ public extension FDB.Transaction { return promise.futureResult } - /// Returns versionstamp which was used by any versionstamp operations in this transaction + /// Returns a future for the versionstamp which was used by any versionstamp operations + /// in this transaction. Unlike the synchronous version of the same name, this method + /// does _not_ commit by default. + /// + /// - returns: EventLoopFuture with future FDB.Versionstamp value func getVersionstamp() -> EventLoopFuture { + return getVersionstamp(commit: false) + } + + /// Returns a future for the versionstamp which was used by any versionstamp operations + /// in this transaction, and optionally commit the transaction right afterwards + /// + /// - returns: EventLoopFuture with future FDB.Versionstamp value + func getVersionstamp(commit shouldCommit: Bool) -> EventLoopFuture { guard let eventLoop = self.eventLoop else { self.log("[getVersionstamp] No event loop", level: .error) return FDB.dummyEventLoop.makeFailedFuture(FDB.Error.noEventLoopProvided) @@ -394,7 +415,11 @@ public extension FDB.Transaction { } catch { promise.fail(error) } - - return promise.futureResult + + if shouldCommit { + return self.commit().flatMap { promise.futureResult } + } else { + return promise.futureResult + } } } diff --git a/Sources/FDB/Transaction/Transaction+Sync.swift b/Sources/FDB/Transaction/Transaction+Sync.swift index 128e05d..40ac536 100644 --- a/Sources/FDB/Transaction/Transaction+Sync.swift +++ b/Sources/FDB/Transaction/Transaction+Sync.swift @@ -140,6 +140,18 @@ public extension FDB.Transaction { } func getVersionstamp() throws -> FDB.Versionstamp { - return try self.getVersionstamp().wait() + let future: FDB.Future = self.getVersionstamp() + try self.commitSync() + let bytes = try future.waitAndCheck().parseKeyBytes() + + guard bytes.count == 10 else { + self.log("[getVersionstamp] Bytes that do not represent a versionstamp were returned: \(String(describing: bytes))", level: .error) + throw FDB.Error.invalidVersionstamp + } + + let transactionCommitVersion = try! UInt64(bigEndian: Bytes(bytes[0..<8]).cast()) + let batchNumber = try! UInt16(bigEndian: Bytes(bytes[8..<10]).cast()) + + return FDB.Versionstamp(transactionCommitVersion: transactionCommitVersion, batchNumber: batchNumber) } } diff --git a/Sources/FDB/Versionstamp.swift b/Sources/FDB/Versionstamp.swift index a6503a3..71f8bbb 100644 --- a/Sources/FDB/Versionstamp.swift +++ b/Sources/FDB/Versionstamp.swift @@ -23,11 +23,11 @@ public extension FDB { /// Initializes a new incomplete Versionstamp suitable for passing to an atomic operation with the .setVersionstampedKey option. If the user data is specified, a 96-bit versionstamp will be encoded, otherwise an 80-bit verstion stamp without the user data will be encoded. /// - Parameter userData: Extra ordering information to order writes within a single transaction, thereby providing a global order for all versions. public init(userData: UInt16? = nil) { - self.init(transactionCommitVersion: 0, batchNumber: 0, userData: userData) + self.init(transactionCommitVersion: 0xFFFFFFFFFFFFFFFF, batchNumber: 0xFFFF, userData: userData) } public var isComplete: Bool { - transactionCommitVersion != 0 || batchNumber != 0 + transactionCommitVersion != 0xFFFFFFFFFFFFFFFF as UInt64 || batchNumber != 0xFFFF } } } diff --git a/Tests/FDBTests/FDBTests.swift b/Tests/FDBTests/FDBTests.swift index 18e7f31..6db5613 100644 --- a/Tests/FDBTests/FDBTests.swift +++ b/Tests/FDBTests/FDBTests.swift @@ -105,24 +105,126 @@ class FDBTests: XCTestCase { func testSetVersionstampedKey() throws { let fdb = FDBTests.fdb! let subspace = FDBTests.subspace.subspace("atomic_versionstamp") - let key = subspace[FDB.Versionstamp(userData: 42)]["mykey"] - let value: String = "hello!" + do { // Test synchronous variations + let value: String = "basic sync value" + let nonVersionstampedKey = subspace[FDB.Versionstamp(transactionCommitVersion: 1, batchNumber: 2)]["aSyncKey"] + + let versionStamp: FDB.Versionstamp = try fdb.withTransaction { transaction in + XCTAssertNoThrow(try transaction.set(versionstampedKey: subspace[FDB.Versionstamp()]["aSyncKey"], value: Bytes(value.utf8)) as Void) + XCTAssertThrowsError(try transaction.set(versionstampedKey: nonVersionstampedKey, value: getBytes(value.utf8)) as Void) { error in + switch error { + case FDB.Error.missingIncompleteVersionstamp: break + default: XCTFail("Invalid error returned: \(error)") + } + } + + return try transaction.getVersionstamp() + } + + XCTAssertNil(versionStamp.userData) + + let result = try fdb.get(key: subspace[versionStamp]["aSyncKey"]) + XCTAssertEqual(String(bytes: result ?? [], encoding: .utf8), value) + } - let tr = try self.begin().wait() - XCTAssertNoThrow(try tr.set(versionstampedKey: key, value: getBytes(value.utf8)) as Void) - let future: EventLoopFuture = tr.getVersionstamp() - XCTAssertNoThrow(try tr.commitSync()) + do { // Test synchronous variations, with userData and multiple writes + let valueA: String = "advanced sync value A" + let valueB: String = "advanced sync value B" + + var versionStamp: FDB.Versionstamp = try fdb.withTransaction { transaction in + try transaction.set(versionstampedKey: subspace[FDB.Versionstamp(userData: 1)]["aSyncKey"], value: Bytes(valueA.utf8)) as Void + try transaction.set(versionstampedKey: subspace[FDB.Versionstamp(userData: 2)]["aSyncKey"], value: Bytes(valueB.utf8)) as Void + return try transaction.getVersionstamp() + } + + XCTAssertNil(versionStamp.userData) + + versionStamp.userData = 1 + let resultA = try fdb.get(key: subspace[versionStamp]["aSyncKey"]) + XCTAssertEqual(String(bytes: resultA ?? [], encoding: .utf8), valueA) + + versionStamp.userData = 2 + let resultB = try fdb.get(key: subspace[versionStamp]["aSyncKey"]) + XCTAssertEqual(String(bytes: resultB ?? [], encoding: .utf8), valueB) + } - var versionstamp = try future.wait() - XCTAssertNil(versionstamp.userData) - versionstamp.userData = 42 + do { // Test asynchronous variations + let value: String = "basic async value" + + let result: String? = try fdb.withTransaction(on: self.eventLoop) { transaction in + transaction + .set(versionstampedKey: subspace[FDB.Versionstamp()]["anAsyncKey"], value: Bytes(value.utf8)) + .flatMap { _ -> EventLoopFuture in + let versionstamp = transaction.getVersionstamp() as EventLoopFuture + return transaction.commit().flatMap { versionstamp } + } + }.flatMap { versionstamp in + XCTAssertNil(versionstamp.userData) + + return fdb.withTransaction(on: self.eventLoop) { transaction in + transaction + .get(key: subspace[versionstamp]["anAsyncKey"], commit: true) + .map { bytes, _ in + return String(bytes: bytes ?? [], encoding: .utf8) + } + } + }.wait() + + XCTAssertEqual(result, value) + } - let updatedKey = subspace[versionstamp]["mykey"] - - let result = try fdb.get(key: updatedKey) - XCTAssertNotNil(result) - try XCTAssertEqual(result!.cast() as String, value) + do { // Test asynchronous variations with invalid versionstamp + let value: String = "invalid async value" + let nonVersionstampedKey = subspace[FDB.Versionstamp(transactionCommitVersion: 1, batchNumber: 2)]["aSyncKey"] + + let result: EventLoopFuture = fdb.withTransaction(on: self.eventLoop) { transaction in + transaction + .set(versionstampedKey: subspace[nonVersionstampedKey]["anAsyncKey"], value: Bytes(value.utf8)) + .flatMap { _ -> EventLoopFuture in + XCTFail("We should never get to this point") + let versionstamp = transaction.getVersionstamp() as EventLoopFuture + return transaction.commit().flatMap { versionstamp } + } + }.flatMap { _ in + XCTFail("We should never get to this point") + + return self.eventLoop.makeSucceededFuture(()) + } + + XCTAssertThrowsError(try result.wait()) { error in + switch error { + case FDB.Error.missingIncompleteVersionstamp: break + default: XCTFail("Invalid error returned: \(error)") + } + } + } + + do { // Test asynchronous variations, with userData and committing inline + let value: String = "advanced async value" + + let result: String? = try fdb.withTransaction(on: self.eventLoop) { transaction in + transaction + .set(versionstampedKey: subspace[FDB.Versionstamp(userData: 42)]["anAsyncKey"], value: Bytes(value.utf8)) + .flatMap { _ in + return transaction.getVersionstamp(commit: true) + } + }.flatMap { versionstamp in + var versionstamp = versionstamp + XCTAssertNil(versionstamp.userData) + versionstamp.userData = 42 + + return fdb.withTransaction(on: self.eventLoop) { transaction in + transaction + .get(key: subspace[versionstamp]["anAsyncKey"], commit: true) + .map { bytes, _ in + return String(bytes: bytes ?? [], encoding: .utf8) + } + } + }.wait() + + XCTAssertEqual(result, value) + } } func testClear() throws { diff --git a/Tests/FDBTests/TupleTests.swift b/Tests/FDBTests/TupleTests.swift index f91677e..994803e 100644 --- a/Tests/FDBTests/TupleTests.swift +++ b/Tests/FDBTests/TupleTests.swift @@ -247,10 +247,10 @@ class TupleTests: XCTestCase { (FDB.Versionstamp(transactionCommitVersion: 42, batchNumber: 196, userData: nil), [50, 00, 00, 00, 00, 00, 00, 00, 42, 00, 196]), (FDB.Versionstamp(transactionCommitVersion: 42, batchNumber: 196, userData: 0), [51, 00, 00, 00, 00, 00, 00, 00, 42, 00, 196, 00, 00]), (FDB.Versionstamp(transactionCommitVersion: 42, batchNumber: 196, userData: 24), [51, 00, 00, 00, 00, 00, 00, 00, 42, 00, 196, 00, 24]), - (FDB.Versionstamp(), [50, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00]), - (FDB.Versionstamp(userData: nil), [50, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00]), - (FDB.Versionstamp(userData: 0), [51, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00]), - (FDB.Versionstamp(userData: 24), [51, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 24]), + (FDB.Versionstamp(), [50, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]), + (FDB.Versionstamp(userData: nil), [50, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]), + (FDB.Versionstamp(userData: 0), [51, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 00, 00]), + (FDB.Versionstamp(userData: 24), [51, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 00, 24]), ] for (input, expectedBytes) in cases {