Skip to content

Commit

Permalink
Merge pull request #63 from dimitribouniol/incomplete-versionstamps
Browse files Browse the repository at this point in the history
Updated the meaning of an incomplete versionstamp to match existing bindings
  • Loading branch information
kirilltitov authored Aug 31, 2020
2 parents ad25991 + 22c9eab commit 670880c
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 38 deletions.
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,39 @@ let maybeStringFuture: EventLoopFuture<String?> = fdb.withTransaction(on: myEven

Thus your block of code will be gently retried until transaction is successfully committed.

### Writing to Versionstamped Keys

As a special type of atomic operation, values can be written to special keys that are guaranteed to be unique. These keys make use of an incomplete versionstamp within their tuples, which will be completed by the underlying cluster when data is written. The Versionstamp that was used within a transaction can then be retrieved so it can be referenced elsewhere.

An incomplete versionstamp can be created and added to tuples using the `FDB.Versionstamp()` initializer. The `userData` field is optional, and serves to further order keys if multiple are written within the same transaction.

Within a transaction's block, the `set(versionstampedKey: key, value: value)` method can be used to write to keys with incomplete versionstamps. This method will search the key for an incomplete versionstamp, and if one is found, will flag it to be replaced by a complete versionstamp once it's written to the cluster. If an incomplete versionstamp was not found, a `FDB.Error.missingIncompleteVersionstamp` error will be thrown.

If you need the complete versionstamp that was used within the key, you can call `getVersionstamp()` before the transaction is committed. Note that this method must be called within the same transaction that a versionstamped key was written in, otherwise it won't know which versionstamp to return. Also note that this versionstamp does not include any user data that was associated with it, since it will be the same versionstamp no matter how many versionstamped keys were written.

```swift
let keyWithVersionstampPlaceholder = self.subspace[FDB.Versionstamp(userData: 42)]["anotherKey"]
let valueToWrite: String = "Hello, World!"

let versionstampedKeyFuture: EventLoopFuture<FDB.Versionstamp> = fdb.withTransaction(on: self.eventLoop) { transaction in
transaction
.set(versionstampedKey: keyWithVersionstampPlaceholder, value: Bytes(valueToWrite.utf8))
.flatMap { _ in
return transaction.getVersionstamp(commit: true)
}
}

// When the versionstamp eventually resolves…
versionstampedKeyFuture.whenSuccess { versionstamp in
var versionstamp = versionstamp
versionstamp.userData = 42

let actualKey = self.subspace[versionstamp]["anotherKey"]

// … return it to user, save it as a reference to another entry, etc…
}
```

### Complete example

```swift
Expand Down
47 changes: 40 additions & 7 deletions Sources/FDB/AnyFDBTransaction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,15 @@ public protocol AnyFDBTransaction {
/// an incomplete version stamp, this method will throw an error. The actual version stamp used
/// may be retrieved by calling `getVersionstamp()` on the transaction.
///
/// If a version stamp cannot be found, this will return a failed future with FDB.Error.missingIncompleteVersionstamp.
///
/// - parameters:
/// - versionstampedKey: FDB key containing an incomplete Versionstamp
/// - value: Bytes value
/// - commit: Whether to commit this transaction after action or not
///
/// - Throws: Throws FDB.Error.missingIncompleteVersionstamp if a version stamp cannot be found
/// - returns: EventLoopFuture with future Transaction (`self`) value
func set(versionstampedKey: AnyFDBKey, value: Bytes, commit: Bool) throws -> EventLoopFuture<AnyFDBTransaction>
func set(versionstampedKey: AnyFDBKey, value: Bytes, commit: Bool) -> EventLoopFuture<AnyFDBTransaction>

/// Returns bytes value for given key (or `nil` if no key)
///
Expand Down Expand Up @@ -291,11 +292,19 @@ public protocol AnyFDBTransaction {
/// - returns: EventLoopFuture with future Int64 value
func getReadVersion() -> EventLoopFuture<Int64>

/// Returns versionstamp which was used by any versionstamp operations in this transaction
/// Returns a future for the versionstamp which was used by any versionstamp operations
/// in this transaction. Unlike the synchronous version of the same name, this method
/// does _not_ commit by default.
///
/// - returns: EventLoopFuture with future FDB.Versionstamp value
func getVersionstamp() -> EventLoopFuture<FDB.Versionstamp>

/// Returns a future for the versionstamp which was used by any versionstamp operations
/// in this transaction, and optionally commit the transaction right afterwards
///
/// - returns: EventLoopFuture with future FDB.Versionstamp value
func getVersionstamp(commit shouldCommit: Bool) -> EventLoopFuture<FDB.Versionstamp>

/// Sync methods

/// Commits current transaction
Expand Down Expand Up @@ -466,7 +475,9 @@ public protocol AnyFDBTransaction {
/// - returns: Read version as Int64
func getReadVersion() throws -> Int64

/// Returns versionstamp which was used by any versionstamp operations in this transaction
/// Commits transaction and returns versionstamp which was used by any versionstamp operations
/// in this transaction. Note that this method must commit the transaction in order to wait for the
/// versionstamp to become available.
///
/// This function will block current thread during execution
///
Expand Down Expand Up @@ -694,15 +705,16 @@ public extension AnyFDBTransaction {
/// an incomplete version stamp, this method will throw an error. The actual version stamp used
/// may be retrieved by calling `getVersionstamp()` on the transaction.
///
/// If a version stamp cannot be found, this will return a failed future with FDB.Error.missingIncompleteVersionstamp.
///
/// - parameters:
/// - versionstampedKey: FDB key containing an incomplete Versionstamp
/// - value: Bytes value
/// - commit: Whether to commit this transaction after action or not
///
/// - Throws: Throws FDB.Error.missingIncompleteVersionstamp if a version stamp cannot be found
/// - returns: EventLoopFuture with future Transaction (`self`) value
func set(versionstampedKey: AnyFDBKey, value: Bytes, commit: Bool = false) throws -> EventLoopFuture<AnyFDBTransaction> {
return try self.set(versionstampedKey: versionstampedKey, value: value, commit: commit)
func set(versionstampedKey: AnyFDBKey, value: Bytes, commit: Bool = false) -> EventLoopFuture<AnyFDBTransaction> {
return self.set(versionstampedKey: versionstampedKey, value: value, commit: commit)
}

/// Returns bytes value for given key (or `nil` if no key)
Expand Down Expand Up @@ -996,4 +1008,25 @@ public extension AnyFDBTransaction {
) -> EventLoopFuture<AnyFDBTransaction> {
return self.atomic(op, key: key, value: value, commit: commit)
}

/// Returns a future for the versionstamp which was used by any versionstamp operations
/// in this transaction. Unlike the synchronous version of the same name, this method
/// does _not_ commit by default.
///
/// - returns: EventLoopFuture with future FDB.Versionstamp value
func getVersionstamp() -> EventLoopFuture<(FDB.Versionstamp, AnyFDBTransaction)> {
return self
.getVersionstamp()
.map { ($0, self) }
}

/// Returns a future for the versionstamp which was used by any versionstamp operations
/// in this transaction, and optionally commit the transaction right afterwards
///
/// - returns: EventLoopFuture with future FDB.Versionstamp value
func getVersionstamp(commit shouldCommit: Bool) -> EventLoopFuture<(FDB.Versionstamp, AnyFDBTransaction)> {
return self
.getVersionstamp(commit: shouldCommit)
.map { ($0, self) }
}
}
2 changes: 1 addition & 1 deletion Sources/FDB/Future/Future+Bytes.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ extension FDB.Future {
/// Parses key bytes result from current future
///
/// Warning: this should be only called if future is in resolved state
func parseKeyBytes() throws -> Bytes {
internal func parseKeyBytes() throws -> Bytes {
var readKey: UnsafePointer<Byte>!
var readKeyLength: Int32 = 0
try fdb_future_get_key(self.pointer, &readKey, &readKeyLength).orThrow()
Expand Down
43 changes: 34 additions & 9 deletions Sources/FDB/Transaction/Transaction+NIO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,21 @@ public extension FDB.Transaction {
return future
}

func set(versionstampedKey: AnyFDBKey, value: Bytes, commit: Bool) throws -> EventLoopFuture<AnyFDBTransaction> {
var serializedKey = versionstampedKey.asFDBKey()
let offset = try FDB.Tuple.offsetOfFirstIncompleteVersionstamp(from: serializedKey)
serializedKey.append(contentsOf: getBytes(offset.littleEndian))

return self.atomic(.setVersionstampedKey, key: serializedKey, value: value, commit: commit)
func set(versionstampedKey: AnyFDBKey, value: Bytes, commit: Bool) -> EventLoopFuture<AnyFDBTransaction> {
guard let eventLoop = self.eventLoop else {
self.log("[set versionstampedKey] No event loop", level: .error)
return FDB.dummyEventLoop.makeFailedFuture(FDB.Error.noEventLoopProvided)
}

do {
var serializedKey = versionstampedKey.asFDBKey()
let offset = try FDB.Tuple.offsetOfFirstIncompleteVersionstamp(from: serializedKey)
serializedKey.append(contentsOf: getBytes(offset.littleEndian))

return self.atomic(.setVersionstampedKey, key: serializedKey, value: value, commit: commit)
} catch {
return eventLoop.makeFailedFuture(error)
}
}

func get(
Expand Down Expand Up @@ -363,8 +372,20 @@ public extension FDB.Transaction {
return promise.futureResult
}

/// Returns versionstamp which was used by any versionstamp operations in this transaction
/// Returns a future for the versionstamp which was used by any versionstamp operations
/// in this transaction. Unlike the synchronous version of the same name, this method
/// does _not_ commit by default.
///
/// - returns: EventLoopFuture with future FDB.Versionstamp value
func getVersionstamp() -> EventLoopFuture<FDB.Versionstamp> {
return getVersionstamp(commit: false)
}

/// Returns a future for the versionstamp which was used by any versionstamp operations
/// in this transaction, and optionally commit the transaction right afterwards
///
/// - returns: EventLoopFuture with future FDB.Versionstamp value
func getVersionstamp(commit shouldCommit: Bool) -> EventLoopFuture<FDB.Versionstamp> {
guard let eventLoop = self.eventLoop else {
self.log("[getVersionstamp] No event loop", level: .error)
return FDB.dummyEventLoop.makeFailedFuture(FDB.Error.noEventLoopProvided)
Expand Down Expand Up @@ -394,7 +415,11 @@ public extension FDB.Transaction {
} catch {
promise.fail(error)
}

return promise.futureResult

if shouldCommit {
return self.commit().flatMap { promise.futureResult }
} else {
return promise.futureResult
}
}
}
14 changes: 13 additions & 1 deletion Sources/FDB/Transaction/Transaction+Sync.swift
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ public extension FDB.Transaction {
}

func getVersionstamp() throws -> FDB.Versionstamp {
return try self.getVersionstamp().wait()
let future: FDB.Future = self.getVersionstamp()
try self.commitSync()
let bytes = try future.waitAndCheck().parseKeyBytes()

guard bytes.count == 10 else {
self.log("[getVersionstamp] Bytes that do not represent a versionstamp were returned: \(String(describing: bytes))", level: .error)
throw FDB.Error.invalidVersionstamp
}

let transactionCommitVersion = try! UInt64(bigEndian: Bytes(bytes[0..<8]).cast())
let batchNumber = try! UInt16(bigEndian: Bytes(bytes[8..<10]).cast())

return FDB.Versionstamp(transactionCommitVersion: transactionCommitVersion, batchNumber: batchNumber)
}
}
4 changes: 2 additions & 2 deletions Sources/FDB/Versionstamp.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ public extension FDB {
/// Initializes a new incomplete Versionstamp suitable for passing to an atomic operation with the .setVersionstampedKey option. If the user data is specified, a 96-bit versionstamp will be encoded, otherwise an 80-bit verstion stamp without the user data will be encoded.
/// - Parameter userData: Extra ordering information to order writes within a single transaction, thereby providing a global order for all versions.
public init(userData: UInt16? = nil) {
self.init(transactionCommitVersion: 0, batchNumber: 0, userData: userData)
self.init(transactionCommitVersion: 0xFFFFFFFFFFFFFFFF, batchNumber: 0xFFFF, userData: userData)
}

public var isComplete: Bool {
transactionCommitVersion != 0 || batchNumber != 0
transactionCommitVersion != 0xFFFFFFFFFFFFFFFF as UInt64 || batchNumber != 0xFFFF
}
}
}
Expand Down
130 changes: 116 additions & 14 deletions Tests/FDBTests/FDBTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,126 @@ class FDBTests: XCTestCase {
func testSetVersionstampedKey() throws {
let fdb = FDBTests.fdb!
let subspace = FDBTests.subspace.subspace("atomic_versionstamp")
let key = subspace[FDB.Versionstamp(userData: 42)]["mykey"]

let value: String = "hello!"
do { // Test synchronous variations
let value: String = "basic sync value"
let nonVersionstampedKey = subspace[FDB.Versionstamp(transactionCommitVersion: 1, batchNumber: 2)]["aSyncKey"]

let versionStamp: FDB.Versionstamp = try fdb.withTransaction { transaction in
XCTAssertNoThrow(try transaction.set(versionstampedKey: subspace[FDB.Versionstamp()]["aSyncKey"], value: Bytes(value.utf8)) as Void)
XCTAssertThrowsError(try transaction.set(versionstampedKey: nonVersionstampedKey, value: getBytes(value.utf8)) as Void) { error in
switch error {
case FDB.Error.missingIncompleteVersionstamp: break
default: XCTFail("Invalid error returned: \(error)")
}
}

return try transaction.getVersionstamp()
}

XCTAssertNil(versionStamp.userData)

let result = try fdb.get(key: subspace[versionStamp]["aSyncKey"])
XCTAssertEqual(String(bytes: result ?? [], encoding: .utf8), value)
}

let tr = try self.begin().wait()
XCTAssertNoThrow(try tr.set(versionstampedKey: key, value: getBytes(value.utf8)) as Void)
let future: EventLoopFuture<FDB.Versionstamp> = tr.getVersionstamp()
XCTAssertNoThrow(try tr.commitSync())
do { // Test synchronous variations, with userData and multiple writes
let valueA: String = "advanced sync value A"
let valueB: String = "advanced sync value B"

var versionStamp: FDB.Versionstamp = try fdb.withTransaction { transaction in
try transaction.set(versionstampedKey: subspace[FDB.Versionstamp(userData: 1)]["aSyncKey"], value: Bytes(valueA.utf8)) as Void
try transaction.set(versionstampedKey: subspace[FDB.Versionstamp(userData: 2)]["aSyncKey"], value: Bytes(valueB.utf8)) as Void
return try transaction.getVersionstamp()
}

XCTAssertNil(versionStamp.userData)

versionStamp.userData = 1
let resultA = try fdb.get(key: subspace[versionStamp]["aSyncKey"])
XCTAssertEqual(String(bytes: resultA ?? [], encoding: .utf8), valueA)

versionStamp.userData = 2
let resultB = try fdb.get(key: subspace[versionStamp]["aSyncKey"])
XCTAssertEqual(String(bytes: resultB ?? [], encoding: .utf8), valueB)
}

var versionstamp = try future.wait()
XCTAssertNil(versionstamp.userData)
versionstamp.userData = 42
do { // Test asynchronous variations
let value: String = "basic async value"

let result: String? = try fdb.withTransaction(on: self.eventLoop) { transaction in
transaction
.set(versionstampedKey: subspace[FDB.Versionstamp()]["anAsyncKey"], value: Bytes(value.utf8))
.flatMap { _ -> EventLoopFuture<FDB.Versionstamp> in
let versionstamp = transaction.getVersionstamp() as EventLoopFuture<FDB.Versionstamp>
return transaction.commit().flatMap { versionstamp }
}
}.flatMap { versionstamp in
XCTAssertNil(versionstamp.userData)

return fdb.withTransaction(on: self.eventLoop) { transaction in
transaction
.get(key: subspace[versionstamp]["anAsyncKey"], commit: true)
.map { bytes, _ in
return String(bytes: bytes ?? [], encoding: .utf8)
}
}
}.wait()

XCTAssertEqual(result, value)
}

let updatedKey = subspace[versionstamp]["mykey"]

let result = try fdb.get(key: updatedKey)
XCTAssertNotNil(result)
try XCTAssertEqual(result!.cast() as String, value)
do { // Test asynchronous variations with invalid versionstamp
let value: String = "invalid async value"
let nonVersionstampedKey = subspace[FDB.Versionstamp(transactionCommitVersion: 1, batchNumber: 2)]["aSyncKey"]

let result: EventLoopFuture<Void> = fdb.withTransaction(on: self.eventLoop) { transaction in
transaction
.set(versionstampedKey: subspace[nonVersionstampedKey]["anAsyncKey"], value: Bytes(value.utf8))
.flatMap { _ -> EventLoopFuture<FDB.Versionstamp> in
XCTFail("We should never get to this point")
let versionstamp = transaction.getVersionstamp() as EventLoopFuture<FDB.Versionstamp>
return transaction.commit().flatMap { versionstamp }
}
}.flatMap { _ in
XCTFail("We should never get to this point")

return self.eventLoop.makeSucceededFuture(())
}

XCTAssertThrowsError(try result.wait()) { error in
switch error {
case FDB.Error.missingIncompleteVersionstamp: break
default: XCTFail("Invalid error returned: \(error)")
}
}
}

do { // Test asynchronous variations, with userData and committing inline
let value: String = "advanced async value"

let result: String? = try fdb.withTransaction(on: self.eventLoop) { transaction in
transaction
.set(versionstampedKey: subspace[FDB.Versionstamp(userData: 42)]["anAsyncKey"], value: Bytes(value.utf8))
.flatMap { _ in
return transaction.getVersionstamp(commit: true)
}
}.flatMap { versionstamp in
var versionstamp = versionstamp
XCTAssertNil(versionstamp.userData)
versionstamp.userData = 42

return fdb.withTransaction(on: self.eventLoop) { transaction in
transaction
.get(key: subspace[versionstamp]["anAsyncKey"], commit: true)
.map { bytes, _ in
return String(bytes: bytes ?? [], encoding: .utf8)
}
}
}.wait()

XCTAssertEqual(result, value)
}
}

func testClear() throws {
Expand Down
Loading

0 comments on commit 670880c

Please sign in to comment.