Skip to content

Commit

Permalink
Migrate es-archiver to typescript (#56008)
Browse files Browse the repository at this point in the history
* migrate lib/archives and lib/docs

* migrate lib/indices

* migrate end of /lib

* migrate /actions

* migrate es_archiver

* migrate cli

* migrate tests

* use proper log stub

* add Record typing

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
pgayvallet and elasticmachine authored Feb 3, 2020
1 parent fe86a86 commit 38dc1cb
Show file tree
Hide file tree
Showing 46 changed files with 576 additions and 393 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async function getSettingsFromFile(log: ToolingLog, path: string, settingOverrid
return transformDeprecations(settingsWithDefaults, logDeprecation);
}

export async function readConfigFile(log: ToolingLog, path: string, settingOverrides: any) {
export async function readConfigFile(log: ToolingLog, path: string, settingOverrides: any = {}) {
return new Config({
settings: await getSettingsFromFile(log, path, settingOverrides),
primary: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,23 @@ import Fs from 'fs';
import { createGunzip, createGzip, Z_BEST_COMPRESSION } from 'zlib';
import { promisify } from 'util';
import globby from 'globby';
import { ToolingLog } from '@kbn/dev-utils';

import { createPromiseFromStreams } from '../../legacy/utils';

const unlinkAsync = promisify(Fs.unlink);

export async function editAction({ prefix, dataDir, log, handler }) {
export async function editAction({
prefix,
dataDir,
log,
handler,
}: {
prefix: string;
dataDir: string;
log: ToolingLog;
handler: () => Promise<any>;
}) {
const archives = (
await globby('**/*.gz', {
cwd: prefix ? resolve(dataDir, prefix) : dataDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/

import { Client } from 'elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';

import { migrateKibanaIndex, deleteKibanaIndices, createStats } from '../lib';

export async function emptyKibanaIndexAction({ client, log, kbnClient }) {
export async function emptyKibanaIndexAction({
client,
log,
kbnClient,
}: {
client: Client;
log: ToolingLog;
kbnClient: KbnClient;
}) {
const stats = createStats('emptyKibanaIndex', log);
const kibanaPluginIds = await kbnClient.plugins.getEnabledIds();

await deleteKibanaIndices({ client, stats });
await migrateKibanaIndex({ client, log, stats, kibanaPluginIds });
await deleteKibanaIndices({ client, stats, log });
await migrateKibanaIndex({ client, log, kibanaPluginIds });
return stats;
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import { resolve } from 'path';
import { createReadStream } from 'fs';
import { Readable } from 'stream';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { Client } from 'elasticsearch';

import { createPromiseFromStreams, concatStreamProviders } from '../../legacy/utils';

Expand All @@ -38,12 +41,26 @@ import {
// pipe a series of streams into each other so that data and errors
// flow from the first stream to the last. Errors from the last stream
// are not listened for
const pipeline = (...streams) =>
const pipeline = (...streams: Readable[]) =>
streams.reduce((source, dest) =>
source.once('error', error => dest.emit('error', error)).pipe(dest)
source.once('error', error => dest.emit('error', error)).pipe(dest as any)
);

export async function loadAction({ name, skipExisting, client, dataDir, log, kbnClient }) {
export async function loadAction({
name,
skipExisting,
client,
dataDir,
log,
kbnClient,
}: {
name: string;
skipExisting: boolean;
client: Client;
dataDir: string;
log: ToolingLog;
kbnClient: KbnClient;
}) {
const inputDir = resolve(dataDir, name);
const stats = createStats(name, log);
const files = prioritizeMappings(await readDirectory(inputDir));
Expand All @@ -64,20 +81,20 @@ export async function loadAction({ name, skipExisting, client, dataDir, log, kbn
{ objectMode: true }
);

const progress = new Progress('load progress');
const progress = new Progress();
progress.activate(log);

await createPromiseFromStreams([
recordStream,
createCreateIndexStream({ client, stats, skipExisting, log, kibanaPluginIds }),
createCreateIndexStream({ client, stats, skipExisting, log }),
createIndexDocRecordsStream(client, stats, progress),
]);

progress.deactivate();
const result = stats.toJSON();

for (const [index, { docs }] of Object.entries(result)) {
if (!docs && docs.indexed > 0) {
if (docs && docs.indexed > 0) {
log.info('[%s] Indexed %d docs into %j', name, docs.indexed, index);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
*/

import { resolve, dirname, relative } from 'path';

import { stat, rename, createReadStream, createWriteStream } from 'fs';

import { Readable, Writable } from 'stream';
import { fromNode } from 'bluebird';
import { ToolingLog } from '@kbn/dev-utils';

import { createPromiseFromStreams } from '../../legacy/utils';

import {
prioritizeMappings,
readDirectory,
Expand All @@ -33,12 +32,20 @@ import {
createFormatArchiveStreams,
} from '../lib';

async function isDirectory(path) {
async function isDirectory(path: string): Promise<boolean> {
const stats = await fromNode(cb => stat(path, cb));
return stats.isDirectory();
}

export async function rebuildAllAction({ dataDir, log, rootDir = dataDir }) {
export async function rebuildAllAction({
dataDir,
log,
rootDir = dataDir,
}: {
dataDir: string;
log: ToolingLog;
rootDir?: string;
}) {
const childNames = prioritizeMappings(await readDirectory(dataDir));
for (const childName of childNames) {
const childPath = resolve(dataDir, childName);
Expand All @@ -58,11 +65,11 @@ export async function rebuildAllAction({ dataDir, log, rootDir = dataDir }) {
const tempFile = childPath + (gzip ? '.rebuilding.gz' : '.rebuilding');

await createPromiseFromStreams([
createReadStream(childPath),
createReadStream(childPath) as Readable,
...createParseArchiveStreams({ gzip }),
...createFormatArchiveStreams({ gzip }),
createWriteStream(tempFile),
]);
] as [Readable, ...Writable[]]);

await fromNode(cb => rename(tempFile, childPath, cb));
log.info(`${archiveName} Rebuilt ${childName}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import { resolve } from 'path';
import { createWriteStream, mkdirSync } from 'fs';
import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch';
import { ToolingLog } from '@kbn/dev-utils';

import { createListStream, createPromiseFromStreams } from '../../legacy/utils';

import {
createStats,
createGenerateIndexRecordsStream,
Expand All @@ -30,7 +32,21 @@ import {
Progress,
} from '../lib';

export async function saveAction({ name, indices, client, dataDir, log, raw }) {
export async function saveAction({
name,
indices,
client,
dataDir,
log,
raw,
}: {
name: string;
indices: string | string[];
client: Client;
dataDir: string;
log: ToolingLog;
raw: boolean;
}) {
const outputDir = resolve(dataDir, name);
const stats = createStats(name, log);

Expand All @@ -48,15 +64,15 @@ export async function saveAction({ name, indices, client, dataDir, log, raw }) {
createGenerateIndexRecordsStream(client, stats),
...createFormatArchiveStreams(),
createWriteStream(resolve(outputDir, 'mappings.json')),
]),
] as [Readable, ...Writable[]]),

// export all documents from matching indexes into data.json.gz
createPromiseFromStreams([
createListStream(indices),
createGenerateDocRecordsStream(client, stats, progress),
...createFormatArchiveStreams({ gzip: !raw }),
createWriteStream(resolve(outputDir, `data.json${raw ? '' : '.gz'}`)),
]),
] as [Readable, ...Writable[]]),
]);

progress.deactivate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import { resolve } from 'path';
import { createReadStream } from 'fs';
import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';

import { createPromiseFromStreams } from '../../legacy/utils';

import {
isGzip,
createStats,
Expand All @@ -32,7 +34,19 @@ import {
createDeleteIndexStream,
} from '../lib';

export async function unloadAction({ name, client, dataDir, log, kbnClient }) {
export async function unloadAction({
name,
client,
dataDir,
log,
kbnClient,
}: {
name: string;
client: Client;
dataDir: string;
log: ToolingLog;
kbnClient: KbnClient;
}) {
const inputDir = resolve(dataDir, name);
const stats = createStats(name, log);
const kibanaPluginIds = await kbnClient.plugins.getEnabledIds();
Expand All @@ -42,11 +56,11 @@ export async function unloadAction({ name, client, dataDir, log, kbnClient }) {
log.info('[%s] Unloading indices from %j', name, filename);

await createPromiseFromStreams([
createReadStream(resolve(inputDir, filename)),
createReadStream(resolve(inputDir, filename)) as Readable,
...createParseArchiveStreams({ gzip: isGzip(filename) }),
createFilterRecordsStream('index'),
createDeleteIndexStream(client, stats, log, kibanaPluginIds),
]);
] as [Readable, ...Writable[]]);
}

return stats.toJSON();
Expand Down
22 changes: 12 additions & 10 deletions src/es_archiver/cli.js → src/es_archiver/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

/*************************************************************
/** ***********************************************************
*
* Run `node scripts/es_archiver --help` for usage information
*
Expand All @@ -27,17 +27,17 @@ import { resolve } from 'path';
import { readFileSync } from 'fs';
import { format as formatUrl } from 'url';
import readline from 'readline';

import { Command } from 'commander';
import * as legacyElasticsearch from 'elasticsearch';

import { EsArchiver } from './es_archiver';
import { ToolingLog } from '@kbn/dev-utils';
import { readConfigFile } from '@kbn/test';

import { EsArchiver } from './es_archiver';

const cmd = new Command('node scripts/es_archiver');

const resolveConfigPath = v => resolve(process.cwd(), v);
const resolveConfigPath = (v: string) => resolve(process.cwd(), v);
const defaultConfigPath = resolveConfigPath('test/functional/config.js');

cmd
Expand All @@ -56,6 +56,7 @@ cmd
defaultConfigPath
)
.on('--help', () => {
// eslint-disable-next-line no-console
console.log(readFileSync(resolve(__dirname, './cli_help.txt'), 'utf8'));
});

Expand Down Expand Up @@ -95,10 +96,10 @@ cmd
output: process.stdout,
});

await new Promise(resolve => {
await new Promise(resolveInput => {
rl.question(`Press enter when you're done`, () => {
rl.close();
resolve();
resolveInput();
});
});
})
Expand All @@ -112,12 +113,12 @@ cmd

cmd.parse(process.argv);

const missingCommand = cmd.args.every(a => !(a instanceof Command));
const missingCommand = cmd.args.every(a => !((a as any) instanceof Command));
if (missingCommand) {
execute();
}

async function execute(fn) {
async function execute(fn?: (esArchiver: EsArchiver, command: Command) => void): Promise<void> {
try {
const log = new ToolingLog({
level: cmd.verbose ? 'debug' : 'info',
Expand All @@ -134,7 +135,7 @@ async function execute(fn) {

// log and count all validation errors
let errorCount = 0;
const error = msg => {
const error = (msg: string) => {
errorCount++;
log.error(msg);
};
Expand Down Expand Up @@ -170,11 +171,12 @@ async function execute(fn) {
dataDir: resolve(cmd.dir),
kibanaUrl: cmd.kibanaUrl,
});
await fn(esArchiver, cmd);
await fn!(esArchiver, cmd);
} finally {
await client.close();
}
} catch (err) {
// eslint-disable-next-line no-console
console.log('FATAL ERROR', err.stack);
}
}
Loading

0 comments on commit 38dc1cb

Please sign in to comment.