Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integration test coverage for SO migrations against serverless ES #164959

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const migrationSchema = schema.object({
* The delay that the migrator will wait for, in seconds, when updating the
* index mapping's meta to let the other nodes pickup the changes.
*/
metaPickupSyncDelaySec: schema.number({ min: 1, defaultValue: 120 }),
metaPickupSyncDelaySec: schema.number({ min: 1, defaultValue: 5 }),
/**
* The document migration phase will be run from instances with any of the specified roles.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,35 +73,22 @@ describe('cloneIndex', () => {
`);
});

it('calls client.indices.clone with the correct parameter for serverless ES', async () => {
it('resolve left with operation_not_supported for serverless ES', async () => {
const statelessCapabilities = elasticsearchServiceMock.createCapabilities({ serverless: true });
const task = cloneIndex({
client,
source: 'my_source_index',
target: 'my_target_index',
esCapabilities: statelessCapabilities,
});
try {
await task();
} catch (e) {
/** ignore */
}
expect(client.indices.clone.mock.calls[0][0]).toMatchInlineSnapshot(`
const result = await task();
expect(result).toMatchInlineSnapshot(`
Object {
"index": "my_source_index",
"settings": Object {
"index": Object {
"blocks.write": false,
"mapping": Object {
"total_fields": Object {
"limit": 1500,
},
},
},
"_tag": "Left",
"left": Object {
"operationName": "clone",
"type": "operation_not_supported",
},
"target": "my_target_index",
"timeout": "60s",
"wait_for_active_shards": "all",
}
`);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import type { IndexNotFound, AcknowledgeResponse } from '.';
import type { IndexNotFound, AcknowledgeResponse, OperationNotSupported } from '.';
import { type IndexNotGreenTimeout, waitForIndexStatus } from './wait_for_index_status';
import {
DEFAULT_TIMEOUT,
Expand Down Expand Up @@ -58,25 +58,41 @@ export const cloneIndex = ({
target,
timeout = DEFAULT_TIMEOUT,
}: CloneIndexParams): TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound | IndexNotGreenTimeout | ClusterShardLimitExceeded,
| RetryableEsClientError
| IndexNotFound
| IndexNotGreenTimeout
| ClusterShardLimitExceeded
| OperationNotSupported,
CloneIndexResponse
> => {
const cloneTask: TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound | ClusterShardLimitExceeded,
RetryableEsClientError | IndexNotFound | ClusterShardLimitExceeded | OperationNotSupported,
AcknowledgeResponse
> = () => {
const indexSettings = {
// The source we're cloning from will have a write block set, so
// we need to remove it to allow writes to our newly cloned index
'blocks.write': false,
// Increase the fields limit beyond the default of 1000
mapping: {
total_fields: { limit: 1500 },
},
// settings not being supported on serverless ES
...(esCapabilities.serverless
? {}
: {
// clone is not supported on serverless
if (esCapabilities.serverless) {
return Promise.resolve(
Either.left({
type: 'operation_not_supported' as const,
operationName: 'clone',
})
);
}

return client.indices
.clone({
index: source,
target,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
settings: {
index: {
// The source we're cloning from will have a write block set, so
// we need to remove it to allow writes to our newly cloned index
'blocks.write': false,
// Increase the fields limit beyond the default of 1000
mapping: {
total_fields: { limit: 1500 },
},
// The rest of the index settings should have already been applied
// to the source index and will be copied to the clone target. But
// we repeat it here for explicitness.
Expand All @@ -88,16 +104,7 @@ export const cloneIndex = ({
refresh_interval: '1s',
// Bump priority so that recovery happens before newer indices
priority: 10,
}),
};

return client.indices
.clone({
index: source,
target,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
settings: {
index: indexSettings,
},
},
timeout,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ describe('isClusterShardLimitExceeded', () => {
})
).toEqual(true);
});
it('returns true with illegal_argument_exception and reason is maximum normal shards open', () => {
expect(
isClusterShardLimitExceeded({
type: 'illegal_argument_exception',
reason:
'Validation Failed: 1: this action would add [2] shards, but this cluster currently has [3]/[1] maximum normal shards open;',
})
).toEqual(true);
});
it('returns false for validation_exception with another reason', () => {
expect(
isClusterShardLimitExceeded({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ export const isIndexNotFoundException = (errorCause?: estypes.ErrorCause): boole
};

export const isClusterShardLimitExceeded = (errorCause?: estypes.ErrorCause): boolean => {
// traditional ES: validation_exception. serverless ES: illegal_argument_exception
return (
errorCause?.type === 'validation_exception' &&
(errorCause?.type === 'validation_exception' ||
errorCause?.type === 'illegal_argument_exception') &&
errorCause?.reason?.match(
/this action would add .* shards, but this cluster currently has .* maximum normal shards open/
) !== null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ export interface IndexNotFound {
index: string;
}

export interface OperationNotSupported {
type: 'operation_not_supported';
operationName: string;
}

export interface WaitForReindexTaskFailure {
readonly cause: { type: string; reason: string };
}
Expand Down Expand Up @@ -179,6 +184,7 @@ export interface ActionErrorTypeMap {
synchronization_failed: SynchronizationFailed;
actual_mappings_incomplete: ActualMappingsIncomplete;
compared_mappings_changed: ComparedMappingsChanged;
operation_not_supported: OperationNotSupported;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,12 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
controlState: 'FATAL',
reason: `${CLUSTER_SHARD_LIMIT_EXCEEDED_REASON} See ${stateP.migrationDocLinks.clusterShardLimitExceeded}`,
};
} else if (isTypeof(left, 'operation_not_supported')) {
return {
...stateP,
controlState: 'FATAL',
reason: `Action failed due to unsupported operation: ${left.operationName}`,
};
} else {
throwBadResponse(stateP, left);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Client, HttpConnection } from '@elastic/elasticsearch';
import { Cluster } from '@kbn/es';
import { REPO_ROOT } from '@kbn/repo-info';
import { ToolingLog } from '@kbn/tooling-log';
import { esTestConfig } from '@kbn/test';
import { CliArgs } from '@kbn/config';
import { createRoot, type TestElasticsearchUtils, type TestKibanaUtils } from './create_root';

Expand All @@ -25,6 +26,8 @@ export interface TestServerlessUtils {
startKibana: (abortSignal?: AbortSignal) => Promise<TestServerlessKibanaUtils>;
}

const ES_BASE_PATH_DIR = Path.join(REPO_ROOT, '.es/es_test_serverless');

/**
* See docs in {@link TestUtils}. This function provides the same utilities but
* configured for serverless.
Expand All @@ -36,9 +39,11 @@ export function createTestServerlessInstances({
}: {
adjustTimeout: (timeout: number) => void;
}): TestServerlessUtils {
adjustTimeout?.(150_000);

const esUtils = createServerlessES();
const kbUtils = createServerlessKibana();
adjustTimeout?.(120_000);

return {
startES: async () => {
const { stop, getClient } = await esUtils.start();
Expand All @@ -63,26 +68,29 @@ export function createTestServerlessInstances({
};
}

const ES_BASE_PATH_DIR = Path.join(REPO_ROOT, '.es/es_test_serverless');
function createServerlessES() {
const log = new ToolingLog({
level: 'info',
writeTo: process.stdout,
});
const es = new Cluster({ log });
const esPort = esTestConfig.getPort();
return {
es,
start: async () => {
await es.runServerless({
basePath: ES_BASE_PATH_DIR,
port: esPort,
teardown: true,
background: true,
clean: true,
kill: true,
waitForReady: true,
});
const client = getServerlessESClient({ port: esPort });

return {
getClient: getServerlessESClient,
getClient: () => client,
stop: async () => {
await es.stop();
},
Expand All @@ -91,10 +99,9 @@ function createServerlessES() {
};
}

const getServerlessESClient = () => {
const getServerlessESClient = ({ port }: { port: number }) => {
return new Client({
// node ports not configurable from
node: 'http://localhost:9200',
node: `http://localhost:${port}`,
Connection: HttpConnection,
});
};
Expand All @@ -108,6 +115,9 @@ const getServerlessDefault = () => {
strictClientVersionCheck: false,
},
},
elasticsearch: {
hosts: [`http://localhost:${esTestConfig.getPort()}`],
},
migrations: {
algorithm: 'zdt',
zdt: {
Expand All @@ -134,6 +144,7 @@ const getServerlessDefault = () => {
},
};
};

function createServerlessKibana(settings = {}, cliArgs: Partial<CliArgs> = {}) {
return createRoot(defaultsDeep(settings, getServerlessDefault()), {
...cliArgs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import {
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { getCapabilitiesFromClient } from '@kbn/core-elasticsearch-server-internal';

// skipped because test serverless ES nodes are currently using static ports
// causing parallel jest runners to fail for obvious port conflicts reasons.
describe.skip('ES capabilities for serverless ES', () => {
describe('ES capabilities for serverless ES', () => {
let serverlessES: TestServerlessESUtils;
let client: ElasticsearchClient;

Expand Down
Loading