Skip to content

Commit

Permalink
Add integration test coverage for SO migrations against serverless ES (
Browse files Browse the repository at this point in the history
…#164959)

## Summary

~~Blocked by #162673

Add some initial integration test coverage for SO migrations when
running against serverless Elasticsearch:
- our migration actions test suite 
- some of the zdt algo migration suites 

The actions test suite was adapted to skip, when run against serverless,
the tests that are not supposed to be run (or passing) in that
environment
  • Loading branch information
pgayvallet authored Sep 11, 2023
1 parent 2268bfb commit 03b7cd5
Show file tree
Hide file tree
Showing 21 changed files with 2,664 additions and 2,460 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const migrationSchema = schema.object({
* The delay that the migrator will wait for, in seconds, when updating the
* index mapping's meta to let the other nodes pickup the changes.
*/
metaPickupSyncDelaySec: schema.number({ min: 1, defaultValue: 120 }),
metaPickupSyncDelaySec: schema.number({ min: 1, defaultValue: 5 }),
/**
* The document migration phase will be run from instances with any of the specified roles.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,35 +73,22 @@ describe('cloneIndex', () => {
`);
});

it('calls client.indices.clone with the correct parameter for serverless ES', async () => {
it('resolve left with operation_not_supported for serverless ES', async () => {
const statelessCapabilities = elasticsearchServiceMock.createCapabilities({ serverless: true });
const task = cloneIndex({
client,
source: 'my_source_index',
target: 'my_target_index',
esCapabilities: statelessCapabilities,
});
try {
await task();
} catch (e) {
/** ignore */
}
expect(client.indices.clone.mock.calls[0][0]).toMatchInlineSnapshot(`
const result = await task();
expect(result).toMatchInlineSnapshot(`
Object {
"index": "my_source_index",
"settings": Object {
"index": Object {
"blocks.write": false,
"mapping": Object {
"total_fields": Object {
"limit": 1500,
},
},
},
"_tag": "Left",
"left": Object {
"operationName": "clone",
"type": "operation_not_supported",
},
"target": "my_target_index",
"timeout": "60s",
"wait_for_active_shards": "all",
}
`);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import type { IndexNotFound, AcknowledgeResponse } from '.';
import type { IndexNotFound, AcknowledgeResponse, OperationNotSupported } from '.';
import { type IndexNotGreenTimeout, waitForIndexStatus } from './wait_for_index_status';
import {
DEFAULT_TIMEOUT,
Expand Down Expand Up @@ -58,25 +58,41 @@ export const cloneIndex = ({
target,
timeout = DEFAULT_TIMEOUT,
}: CloneIndexParams): TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound | IndexNotGreenTimeout | ClusterShardLimitExceeded,
| RetryableEsClientError
| IndexNotFound
| IndexNotGreenTimeout
| ClusterShardLimitExceeded
| OperationNotSupported,
CloneIndexResponse
> => {
const cloneTask: TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound | ClusterShardLimitExceeded,
RetryableEsClientError | IndexNotFound | ClusterShardLimitExceeded | OperationNotSupported,
AcknowledgeResponse
> = () => {
const indexSettings = {
// The source we're cloning from will have a write block set, so
// we need to remove it to allow writes to our newly cloned index
'blocks.write': false,
// Increase the fields limit beyond the default of 1000
mapping: {
total_fields: { limit: 1500 },
},
// settings not being supported on serverless ES
...(esCapabilities.serverless
? {}
: {
// clone is not supported on serverless
if (esCapabilities.serverless) {
return Promise.resolve(
Either.left({
type: 'operation_not_supported' as const,
operationName: 'clone',
})
);
}

return client.indices
.clone({
index: source,
target,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
settings: {
index: {
// The source we're cloning from will have a write block set, so
// we need to remove it to allow writes to our newly cloned index
'blocks.write': false,
// Increase the fields limit beyond the default of 1000
mapping: {
total_fields: { limit: 1500 },
},
// The rest of the index settings should have already been applied
// to the source index and will be copied to the clone target. But
// we repeat it here for explicitness.
Expand All @@ -88,16 +104,7 @@ export const cloneIndex = ({
refresh_interval: '1s',
// Bump priority so that recovery happens before newer indices
priority: 10,
}),
};

return client.indices
.clone({
index: source,
target,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
settings: {
index: indexSettings,
},
},
timeout,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ describe('isClusterShardLimitExceeded', () => {
})
).toEqual(true);
});
it('returns true with illegal_argument_exception and reason is maximum normal shards open', () => {
expect(
isClusterShardLimitExceeded({
type: 'illegal_argument_exception',
reason:
'Validation Failed: 1: this action would add [2] shards, but this cluster currently has [3]/[1] maximum normal shards open;',
})
).toEqual(true);
});
it('returns false for validation_exception with another reason', () => {
expect(
isClusterShardLimitExceeded({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ export const isIndexNotFoundException = (errorCause?: estypes.ErrorCause): boole
};

export const isClusterShardLimitExceeded = (errorCause?: estypes.ErrorCause): boolean => {
// traditional ES: validation_exception. serverless ES: illegal_argument_exception
return (
errorCause?.type === 'validation_exception' &&
(errorCause?.type === 'validation_exception' ||
errorCause?.type === 'illegal_argument_exception') &&
errorCause?.reason?.match(
/this action would add .* shards, but this cluster currently has .* maximum normal shards open/
) !== null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ export interface IndexNotFound {
index: string;
}

export interface OperationNotSupported {
type: 'operation_not_supported';
operationName: string;
}

export interface WaitForReindexTaskFailure {
readonly cause: { type: string; reason: string };
}
Expand Down Expand Up @@ -179,6 +184,7 @@ export interface ActionErrorTypeMap {
synchronization_failed: SynchronizationFailed;
actual_mappings_incomplete: ActualMappingsIncomplete;
compared_mappings_changed: ComparedMappingsChanged;
operation_not_supported: OperationNotSupported;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,12 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
controlState: 'FATAL',
reason: `${CLUSTER_SHARD_LIMIT_EXCEEDED_REASON} See ${stateP.migrationDocLinks.clusterShardLimitExceeded}`,
};
} else if (isTypeof(left, 'operation_not_supported')) {
return {
...stateP,
controlState: 'FATAL',
reason: `Action failed due to unsupported operation: ${left.operationName}`,
};
} else {
throwBadResponse(stateP, left);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Client, HttpConnection } from '@elastic/elasticsearch';
import { Cluster } from '@kbn/es';
import { REPO_ROOT } from '@kbn/repo-info';
import { ToolingLog } from '@kbn/tooling-log';
import { esTestConfig } from '@kbn/test';
import { CliArgs } from '@kbn/config';
import { createRoot, type TestElasticsearchUtils, type TestKibanaUtils } from './create_root';

Expand All @@ -25,6 +26,8 @@ export interface TestServerlessUtils {
startKibana: (abortSignal?: AbortSignal) => Promise<TestServerlessKibanaUtils>;
}

const ES_BASE_PATH_DIR = Path.join(REPO_ROOT, '.es/es_test_serverless');

/**
* See docs in {@link TestUtils}. This function provides the same utilities but
* configured for serverless.
Expand All @@ -36,9 +39,11 @@ export function createTestServerlessInstances({
}: {
adjustTimeout: (timeout: number) => void;
}): TestServerlessUtils {
adjustTimeout?.(150_000);

const esUtils = createServerlessES();
const kbUtils = createServerlessKibana();
adjustTimeout?.(120_000);

return {
startES: async () => {
const { stop, getClient } = await esUtils.start();
Expand All @@ -63,26 +68,29 @@ export function createTestServerlessInstances({
};
}

const ES_BASE_PATH_DIR = Path.join(REPO_ROOT, '.es/es_test_serverless');
function createServerlessES() {
const log = new ToolingLog({
level: 'info',
writeTo: process.stdout,
});
const es = new Cluster({ log });
const esPort = esTestConfig.getPort();
return {
es,
start: async () => {
await es.runServerless({
basePath: ES_BASE_PATH_DIR,
port: esPort,
teardown: true,
background: true,
clean: true,
kill: true,
waitForReady: true,
});
const client = getServerlessESClient({ port: esPort });

return {
getClient: getServerlessESClient,
getClient: () => client,
stop: async () => {
await es.stop();
},
Expand All @@ -91,10 +99,9 @@ function createServerlessES() {
};
}

const getServerlessESClient = () => {
const getServerlessESClient = ({ port }: { port: number }) => {
return new Client({
// node ports not configurable from
node: 'http://localhost:9200',
node: `http://localhost:${port}`,
Connection: HttpConnection,
});
};
Expand All @@ -108,6 +115,9 @@ const getServerlessDefault = () => {
strictClientVersionCheck: false,
},
},
elasticsearch: {
hosts: [`http://localhost:${esTestConfig.getPort()}`],
},
migrations: {
algorithm: 'zdt',
zdt: {
Expand All @@ -134,6 +144,7 @@ const getServerlessDefault = () => {
},
};
};

function createServerlessKibana(settings = {}, cliArgs: Partial<CliArgs> = {}) {
return createRoot(defaultsDeep(settings, getServerlessDefault()), {
...cliArgs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import {
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { getCapabilitiesFromClient } from '@kbn/core-elasticsearch-server-internal';

// skipped because test serverless ES nodes are currently using static ports
// causing parallel jest runners to fail for obvious port conflicts reasons.
describe.skip('ES capabilities for serverless ES', () => {
describe('ES capabilities for serverless ES', () => {
let serverlessES: TestServerlessESUtils;
let client: ElasticsearchClient;

Expand Down
Loading

0 comments on commit 03b7cd5

Please sign in to comment.