Skip to content

Commit

Permalink
Create new function monomorphicBulkWriteWithFallback
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuhao Yang committed Sep 19, 2023
1 parent 2d3d6db commit 8509f1f
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,44 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
return EventLoopFuture.andAllSucceed(futures, on: self.eventLoop)
}

func monomorphicBulkWriteWithFallback<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) -> EventLoopFuture<Void> {
// fall back to singel operation if the write entry exceeds the statement length limitation
var nonBulkWriteEntries: [WriteEntry<AttributesType, ItemType>] = []
let bulkWriteEntries: [WriteEntry<AttributesType, ItemType>]
do {
bulkWriteEntries = try entries.compactMap { entry in
do {
try self.validateEntry(entry: entry)
return entry
} catch SmokeDynamoDBError.statementLengthExceeded {
nonBulkWriteEntries.append(entry)
return nil
}
}
} catch {
let promise = self.eventLoop.makePromise(of: Void.self)
promise.fail(error)
return promise.futureResult
}

var futures = nonBulkWriteEntries.map { nonBulkWriteEntry -> EventLoopFuture<Void> in
switch nonBulkWriteEntry {
case .update(new: let new, existing: let existing):
return self.updateItem(newItem: new, existingItem: existing)
case .insert(new: let new):
return self.insertItem(new)
case .deleteAtKey(key: let key):
return self.deleteItem(forKey: key)
case .deleteItem(existing: let existing):
return self.deleteItem(existingItem: existing)
}
}

futures.append(self.monomorphicBulkWrite(bulkWriteEntries))

return EventLoopFuture.andAllSucceed(futures, on: self.eventLoop)
}

func writeChunkedItemsWithoutThrowing<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>])
-> EventLoopFuture<Set<BatchStatementErrorCodeEnum>> {
// if there are no items, there is nothing to update
Expand Down Expand Up @@ -487,6 +525,30 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
}
}

func monomorphicBulkWriteWithFallback<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) async throws {
// fall back to singel operation if the write entry exceeds the statement length limitation
var bulkWriteEntries: [WriteEntry<AttributesType, ItemType>] = []
try await entries.concurrentForEach { entry in
do {
try self.validateEntry(entry: entry)
bulkWriteEntries.append(entry)
} catch SmokeDynamoDBError.statementLengthExceeded {
switch entry {
case .update(new: let new, existing: let existing):
try await self.updateItem(newItem: new, existingItem: existing)
case .insert(new: let new):
try await self.insertItem(new)
case .deleteAtKey(key: let key):
try await self.deleteItem(forKey: key)
case .deleteItem(existing: let existing):
try await self.deleteItem(existingItem: existing)
}
}
}

return try await monomorphicBulkWrite(bulkWriteEntries)
}

func writeChunkedItemsWithoutThrowing<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) async throws
-> Set<BatchStatementErrorCodeEnum> {
// if there are no items, there is nothing to update
Expand Down
20 changes: 14 additions & 6 deletions Sources/SmokeDynamoDB/DynamoDBCompositePrimaryKeyTable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ public protocol DynamoDBCompositePrimaryKeyTable {
*/
func monomorphicBulkWrite<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) -> EventLoopFuture<Void>

func monomorphicBulkWriteWithFallback<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) -> EventLoopFuture<Void>

func monomorphicBulkWriteWithoutThrowing<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>])
-> EventLoopFuture<Set<BatchStatementErrorCodeEnum>>

Expand Down Expand Up @@ -345,6 +347,8 @@ public protocol DynamoDBCompositePrimaryKeyTable {
* Provides the ability to bulk write database rows
*/
func monomorphicBulkWrite<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) async throws

func monomorphicBulkWriteWithFallback<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) async throws

func monomorphicBulkWriteWithoutThrowing<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) async throws
-> Set<BatchStatementErrorCodeEnum>
Expand Down Expand Up @@ -523,15 +527,15 @@ public extension DynamoDBCompositePrimaryKeyTable {
// For async/await APIs, simply delegate to the EventLoopFuture implementation until support is dropped for Swift <5.5
public extension DynamoDBCompositePrimaryKeyTable {
#if (os(Linux) && compiler(>=5.5)) || (!os(Linux) && compiler(>=5.5.2)) && canImport(_Concurrency)

func insertItem<AttributesType, ItemType>(_ item: TypedDatabaseItem<AttributesType, ItemType>) async throws {
return try await insertItem(item).get()
}

func clobberItem<AttributesType, ItemType>(_ item: TypedDatabaseItem<AttributesType, ItemType>) async throws {
return try await clobberItem(item).get()
}

func updateItem<AttributesType, ItemType>(newItem: TypedDatabaseItem<AttributesType, ItemType>,
existingItem: TypedDatabaseItem<AttributesType, ItemType>) async throws {
return try await updateItem(newItem: newItem, existingItem: existingItem).get()
Expand All @@ -541,9 +545,8 @@ public extension DynamoDBCompositePrimaryKeyTable {
fatalError("Not implemented")
}

func transactWrite<WriteEntryType: PolymorphicWriteEntry,
TransactionConstraintEntryType: PolymorphicTransactionConstraintEntry>(
_ entries: [WriteEntryType], constraints: [TransactionConstraintEntryType]) async throws {
func transactWrite<WriteEntryType: PolymorphicWriteEntry, TransactionConstraintEntryType: PolymorphicTransactionConstraintEntry>(
_ entries: [WriteEntryType], constraints: [TransactionConstraintEntryType]) async throws {
fatalError("Not implemented")
}

Expand All @@ -554,6 +557,11 @@ public extension DynamoDBCompositePrimaryKeyTable {
func monomorphicBulkWrite<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) async throws {
return try await monomorphicBulkWrite(entries).get()
}

func monomorphicBulkWriteWithFallback<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) async throws {
return try await monomorphicBulkWriteWithFallback(entries).get()
}

func monomorphicBulkWriteWithoutThrowing<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) async throws
-> Set<BatchStatementErrorCodeEnum>{
return try await monomorphicBulkWriteWithoutThrowing(entries).get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,45 @@ public class InMemoryDynamoDBCompositePrimaryKeyTable: DynamoDBCompositePrimaryK
-> EventLoopFuture<Void> {
return storeWrapper.monomorphicBulkWrite(entries, eventLoop: self.eventLoop)
}

public func monomorphicBulkWriteWithFallback<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>])
-> EventLoopFuture<Void> {
// fall back to singel operation if the write entry exceeds the statement length limitation
var nonBulkWriteEntries: [WriteEntry<AttributesType, ItemType>] = []
var bulkWriteEntries: [WriteEntry<AttributesType, ItemType>] = []
do {
bulkWriteEntries = try entries.compactMap { entry in
do {
try self.validateEntry(entry: entry)
return entry
} catch SmokeDynamoDBError.statementLengthExceeded {
nonBulkWriteEntries.append(entry)
return nil
}
}
} catch {
let promise = eventLoop.makePromise(of: Void.self)
promise.fail(error)
return promise.futureResult
}

var futures = nonBulkWriteEntries.map { nonBulkWriteEntry -> EventLoopFuture<Void> in
switch nonBulkWriteEntry {
case .update(new: let new, existing: let existing):
return self.updateItem(newItem: new, existingItem: existing)
case .insert(new: let new):
return self.insertItem(new)
case .deleteAtKey(key: let key):
return self.deleteItem(forKey: key)
case .deleteItem(existing: let existing):
return self.deleteItem(existingItem: existing)
}
}

futures.append(self.monomorphicBulkWrite(bulkWriteEntries))

return EventLoopFuture.andAllSucceed(futures, on: eventLoop)
}

public func monomorphicBulkWriteWithoutThrowing<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) -> EventLoopFuture<Set<BatchStatementErrorCodeEnum>> {
return storeWrapper.monomorphicBulkWriteWithoutThrowing(entries, eventLoop: eventLoop)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ internal class InMemoryDynamoDBCompositePrimaryKeyTableStore {
}

extension InMemoryDynamoDBCompositePrimaryKeyTableStore {

func insertItem<AttributesType, ItemType>(_ item: TypedDatabaseItem<AttributesType, ItemType>,
eventLoop: EventLoop) -> EventLoopFuture<Void> {
let promise = eventLoop.makePromise(of: Void.self)
Expand Down Expand Up @@ -348,7 +347,7 @@ extension InMemoryDynamoDBCompositePrimaryKeyTableStore {

return EventLoopFuture.andAllSucceed(futures, on: eventLoop)
}

public func monomorphicBulkWriteWithoutThrowing<AttributesType, ItemType>(
_ entries: [WriteEntry<AttributesType, ItemType>],
eventLoop: EventLoop) -> EventLoopFuture<Set<BatchStatementErrorCodeEnum>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,44 @@ public struct InMemoryDynamoDBCompositePrimaryKeyTableWithIndex<GSILogic: Dynamo
return EventLoopFuture.andAllSucceed(futures, on: self.eventLoop)
}

public func monomorphicBulkWriteWithFallback<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) -> EventLoopFuture<Void> {
// fall back to singel operation if the write entry exceeds the statement length limitation
var nonBulkWriteEntries: [WriteEntry<AttributesType, ItemType>] = []
var bulkWriteEntries: [WriteEntry<AttributesType, ItemType>] = []
do {
bulkWriteEntries = try entries.compactMap { entry in
do {
try self.validateEntry(entry: entry)
return entry
} catch SmokeDynamoDBError.statementLengthExceeded {
nonBulkWriteEntries.append(entry)
return nil
}
}
} catch {
let promise = self.eventLoop.makePromise(of: Void.self)
promise.fail(error)
return promise.futureResult
}

var futures = nonBulkWriteEntries.map { nonBulkWriteEntry -> EventLoopFuture<Void> in
switch nonBulkWriteEntry {
case .update(new: let new, existing: let existing):
return self.updateItem(newItem: new, existingItem: existing)
case .insert(new: let new):
return self.insertItem(new)
case .deleteAtKey(key: let key):
return self.deleteItem(forKey: key)
case .deleteItem(existing: let existing):
return self.deleteItem(existingItem: existing)
}
}

futures.append(self.monomorphicBulkWrite(bulkWriteEntries))

return EventLoopFuture.andAllSucceed(futures, on: self.eventLoop)
}

public func monomorphicBulkWriteWithoutThrowing<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>])
-> EventLoopFuture<Set<BatchStatementErrorCodeEnum>> {
let futures = entries.map { entry -> EventLoopFuture<BatchStatementErrorCodeEnum?> in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,45 @@ public class SimulateConcurrencyDynamoDBCompositePrimaryKeyTable: DynamoDBCompos
return EventLoopFuture.andAllSucceed(futures, on: self.eventLoop)
}

public func monomorphicBulkWriteWithFallback<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>])
-> EventLoopFuture<Void> {
// fall back to singel operation if the write entry exceeds the statement length limitation
var nonBulkWriteEntries: [WriteEntry<AttributesType, ItemType>] = []
var bulkWriteEntries: [WriteEntry<AttributesType, ItemType>] = []
do {
bulkWriteEntries = try entries.compactMap { entry in
do {
try self.validateEntry(entry: entry)
return entry
} catch SmokeDynamoDBError.statementLengthExceeded {
nonBulkWriteEntries.append(entry)
return nil
}
}
} catch {
let promise = eventLoop.makePromise(of: Void.self)
promise.fail(error)
return promise.futureResult
}

var futures = nonBulkWriteEntries.map { nonBulkWriteEntry -> EventLoopFuture<Void> in
switch nonBulkWriteEntry {
case .update(new: let new, existing: let existing):
return self.updateItem(newItem: new, existingItem: existing)
case .insert(new: let new):
return self.insertItem(new)
case .deleteAtKey(key: let key):
return self.deleteItem(forKey: key)
case .deleteItem(existing: let existing):
return self.deleteItem(existingItem: existing)
}
}

futures.append(self.monomorphicBulkWrite(bulkWriteEntries))

return EventLoopFuture.andAllSucceed(futures, on: self.eventLoop)
}

public func monomorphicBulkWriteWithoutThrowing<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>])
-> EventLoopFuture<Set<BatchStatementErrorCodeEnum>>
where AttributesType : PrimaryKeyAttributes, ItemType : Decodable, ItemType : Encodable {
Expand Down
Loading

0 comments on commit 8509f1f

Please sign in to comment.