Skip to content

Commit

Permalink
feat(clients): cleanup after replaceAllObjects failure [skip-bc] (#3824)
Browse files Browse the repository at this point in the history
Co-authored-by: Thomas Raffray <[email protected]>
  • Loading branch information
millotp and Fluf22 authored Dec 31, 2024
1 parent 5553caf commit 37223c9
Show file tree
Hide file tree
Showing 22 changed files with 595 additions and 386 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -494,37 +494,46 @@ public async Task<ReplaceAllObjectsResponse> ReplaceAllObjectsAsync<T>(string in
var rnd = new Random();
var tmpIndexName = $"{indexName}_tmp_{rnd.Next(100)}";

var copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);
try
{
var copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);

var batchResponse = await ChunkedBatchAsync(tmpIndexName, objects, Action.AddObject, true, batchSize,
options, cancellationToken).ConfigureAwait(false);
var batchResponse = await ChunkedBatchAsync(tmpIndexName, objects, Action.AddObject, true, batchSize,
options, cancellationToken).ConfigureAwait(false);

await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);
await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);
await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);
copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);
await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

var moveResponse = await OperationIndexAsync(tmpIndexName,
new OperationIndexParams(OperationType.Move, indexName), options, cancellationToken)
.ConfigureAwait(false);
var moveResponse = await OperationIndexAsync(tmpIndexName,
new OperationIndexParams(OperationType.Move, indexName), options, cancellationToken)
.ConfigureAwait(false);

await WaitForTaskAsync(tmpIndexName, moveResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);
await WaitForTaskAsync(tmpIndexName, moveResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

return new ReplaceAllObjectsResponse
return new ReplaceAllObjectsResponse
{
CopyOperationResponse = copyResponse,
MoveOperationResponse = moveResponse,
BatchResponses = batchResponse
};
}
catch (Exception ex)
{
CopyOperationResponse = copyResponse,
MoveOperationResponse = moveResponse,
BatchResponses = batchResponse
};
await DeleteIndexAsync(tmpIndexName, cancellationToken: cancellationToken).ConfigureAwait(false);

throw ex;
}
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,46 +472,52 @@ public suspend fun SearchClient.replaceAllObjects(
): ReplaceAllObjectsResponse {
val tmpIndexName = "${indexName}_tmp_${Random.nextInt(from = 0, until = 100)}"

var copy = operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = listOf(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms),
),
requestOptions = requestOptions,
)
try {
var copy = operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = listOf(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms),
),
requestOptions = requestOptions,
)

val batchResponses = this.chunkedBatch(
indexName = tmpIndexName,
objects = objects,
action = Action.AddObject,
waitForTask = true,
batchSize = batchSize,
requestOptions = requestOptions,
)
val batchResponses = this.chunkedBatch(
indexName = tmpIndexName,
objects = objects,
action = Action.AddObject,
waitForTask = true,
batchSize = batchSize,
requestOptions = requestOptions,
)

waitForTask(indexName = tmpIndexName, taskID = copy.taskID)
waitForTask(indexName = tmpIndexName, taskID = copy.taskID)

copy = operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = listOf(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms),
),
requestOptions = requestOptions,
)
waitForTask(indexName = tmpIndexName, taskID = copy.taskID)
copy = operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = listOf(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms),
),
requestOptions = requestOptions,
)
waitForTask(indexName = tmpIndexName, taskID = copy.taskID)

val move = operationIndex(
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions,
)
waitForTask(indexName = tmpIndexName, taskID = move.taskID)
val move = operationIndex(
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions,
)
waitForTask(indexName = tmpIndexName, taskID = move.taskID)

return ReplaceAllObjectsResponse(copy, batchResponses, move)
} catch (e: Exception) {
deleteIndex(tmpIndexName)

return ReplaceAllObjectsResponse(copy, batchResponses, move)
throw e
}
}

/**
Expand Down Expand Up @@ -542,6 +548,13 @@ public fun securedApiKeyRemainingValidity(apiKey: String): Duration {
return validUntil - Clock.System.now()
}

/**
* Checks that an index exists.
*
* @param indexName The name of the index to check.
* @return true if the index exists, false otherwise.
* @throws AlgoliaApiException if an error occurs during the request.
*/
public suspend fun SearchClient.indexExists(indexName: String): Boolean {
try {
getSettings(indexName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,50 +366,58 @@ package object extension {
)(implicit ec: ExecutionContext): Future[ReplaceAllObjectsResponse] = {
val tmpIndexName = s"${indexName}_tmp_${scala.util.Random.nextInt(100)}"

for {
copy <- client.operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = Some(Seq(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms))
),
requestOptions = requestOptions
)
try {
for {
copy <- client.operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = Some(Seq(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms))
),
requestOptions = requestOptions
)

batchResponses <- chunkedBatch(
indexName = tmpIndexName,
objects = objects,
action = Action.AddObject,
waitForTasks = true,
batchSize = batchSize,
requestOptions = requestOptions
)
batchResponses <- chunkedBatch(
indexName = tmpIndexName,
objects = objects,
action = Action.AddObject,
waitForTasks = true,
batchSize = batchSize,
requestOptions = requestOptions
)

_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)
_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

copy <- client.operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = Some(Seq(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms))
),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)
copy <- client.operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = Some(Seq(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms))
),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

move <- client.operationIndex(
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions
move <- client.operationIndex(
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = move.taskID, requestOptions = requestOptions)
} yield ReplaceAllObjectsResponse(
copyOperationResponse = copy,
batchResponses = batchResponses,
moveOperationResponse = move
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = move.taskID, requestOptions = requestOptions)
} yield ReplaceAllObjectsResponse(
copyOperationResponse = copy,
batchResponses = batchResponses,
moveOperationResponse = move
)
} catch {
case e : Throwable => {
client.deleteIndex(tmpIndexName)

throw e
}
}
}

/** Check if an index exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,51 +559,57 @@ public extension SearchClient {
) async throws -> ReplaceAllObjectsResponse {
let tmpIndexName = "\(indexName)_tmp_\(Int.random(in: 1_000_000 ..< 10_000_000))"

var copyOperationResponse = try await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: .copy,
destination: tmpIndexName,
scope: [.settings, .rules, .synonyms]
),
requestOptions: requestOptions
)
do {
var copyOperationResponse = try await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: .copy,
destination: tmpIndexName,
scope: [.settings, .rules, .synonyms]
),
requestOptions: requestOptions
)

let batchResponses = try await self.chunkedBatch(
indexName: tmpIndexName,
objects: objects,
waitForTasks: true,
batchSize: batchSize,
requestOptions: requestOptions
)
try await self.waitForTask(indexName: tmpIndexName, taskID: copyOperationResponse.taskID)
let batchResponses = try await self.chunkedBatch(
indexName: tmpIndexName,
objects: objects,
waitForTasks: true,
batchSize: batchSize,
requestOptions: requestOptions
)
try await self.waitForTask(indexName: tmpIndexName, taskID: copyOperationResponse.taskID)

copyOperationResponse = try await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: .copy,
destination: tmpIndexName,
scope: [.settings, .rules, .synonyms]
),
requestOptions: requestOptions
)
try await self.waitForTask(indexName: tmpIndexName, taskID: copyOperationResponse.taskID)

let moveOperationResponse = try await self.operationIndex(
indexName: tmpIndexName,
operationIndexParams: OperationIndexParams(
operation: .move,
destination: indexName
),
requestOptions: requestOptions
)
try await self.waitForTask(indexName: tmpIndexName, taskID: moveOperationResponse.taskID)
copyOperationResponse = try await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: .copy,
destination: tmpIndexName,
scope: [.settings, .rules, .synonyms]
),
requestOptions: requestOptions
)
try await self.waitForTask(indexName: tmpIndexName, taskID: copyOperationResponse.taskID)

return ReplaceAllObjectsResponse(
copyOperationResponse: copyOperationResponse,
batchResponses: batchResponses,
moveOperationResponse: moveOperationResponse
)
let moveOperationResponse = try await self.operationIndex(
indexName: tmpIndexName,
operationIndexParams: OperationIndexParams(
operation: .move,
destination: indexName
),
requestOptions: requestOptions
)
try await self.waitForTask(indexName: tmpIndexName, taskID: moveOperationResponse.taskID)

return ReplaceAllObjectsResponse(
copyOperationResponse: copyOperationResponse,
batchResponses: batchResponses,
moveOperationResponse: moveOperationResponse
)
} catch {
_ = try? await self.deleteIndex(indexName: tmpIndexName)

throw error
}
}

/// Generate a secured API key
Expand Down
2 changes: 2 additions & 0 deletions scripts/cts/runCts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { printBenchmarkReport } from './testServer/benchmark.js';
import { assertChunkWrapperValid } from './testServer/chunkWrapper.js';
import { startTestServer } from './testServer/index.js';
import { assertValidReplaceAllObjects } from './testServer/replaceAllObjects.js';
import { assertValidReplaceAllObjectsFailed } from './testServer/replaceAllObjectsFailed.js';
import { assertValidTimeouts } from './testServer/timeout.js';
import { assertValidWaitForApiKey } from './testServer/waitFor.js';

Expand Down Expand Up @@ -152,6 +153,7 @@ export async function runCts(
assertValidTimeouts(languages.length);
assertChunkWrapperValid(languages.length - skip('dart') - skip('scala'));
assertValidReplaceAllObjects(languages.length - skip('dart') - skip('scala'));
assertValidReplaceAllObjectsFailed(languages.length - skip('dart') - skip('scala'));
assertValidWaitForApiKey(languages.length - skip('dart') - skip('scala'));
}
if (withBenchmarkServer) {
Expand Down
Loading

0 comments on commit 37223c9

Please sign in to comment.