Skip to content

Commit

Permalink
[ML] File upload adding deployment initialization step (elastic#198446)
Browse files Browse the repository at this point in the history
Fixes elastic#196696

When adding a semantic text field, we now have an additional step in the
file uploading process which calls inference for the selected inference
endpoint.
The response of the inference call is ignored and a poll is started to
check to see of the model has been deployed by check to see if
`num_allocations > 0`
Any errors returned from the inference call will stop the upload, unless
they are timeout errors which are ignored.


https://github.com/user-attachments/assets/382ce565-3b4b-47a3-a081-d79c15aa462f
  • Loading branch information
jgowdyelastic authored and CAWilson94 committed Nov 18, 2024
1 parent 254397c commit babee9a
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,6 @@ export class FileDataVisualizerView extends Component {
fileName={fileName}
fileContents={fileContents}
data={data}
dataViewsContract={this.props.dataViewsContract}
dataStart={this.props.dataStart}
fileUpload={this.props.fileUpload}
getAdditionalLinks={this.props.getAdditionalLinks}
resultLinks={this.props.resultLinks}
capabilities={this.props.capabilities}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ export interface Statuses {
createDataView: boolean;
createPipeline: boolean;
permissionCheckStatus: IMPORT_STATUS;
initializeDeployment: boolean;
initializeDeploymentStatus: IMPORT_STATUS;
}

export const ImportProgress: FC<{ statuses: Statuses }> = ({ statuses }) => {
Expand All @@ -45,6 +47,8 @@ export const ImportProgress: FC<{ statuses: Statuses }> = ({ statuses }) => {
uploadStatus,
createDataView,
createPipeline,
initializeDeployment,
initializeDeploymentStatus,
} = statuses;

let statusInfo = null;
Expand All @@ -58,27 +62,37 @@ export const ImportProgress: FC<{ statuses: Statuses }> = ({ statuses }) => {
) {
completedStep = 0;
}

if (
readStatus === IMPORT_STATUS.COMPLETE &&
initializeDeployment === true &&
initializeDeploymentStatus === IMPORT_STATUS.INCOMPLETE
) {
completedStep = 1;
}

if (
readStatus === IMPORT_STATUS.COMPLETE &&
(initializeDeployment === false || initializeDeploymentStatus === IMPORT_STATUS.COMPLETE) &&
indexCreatedStatus === IMPORT_STATUS.INCOMPLETE &&
ingestPipelineCreatedStatus === IMPORT_STATUS.INCOMPLETE
) {
completedStep = 1;
completedStep = 2;
}
if (indexCreatedStatus === IMPORT_STATUS.COMPLETE) {
completedStep = 2;
completedStep = 3;
}
if (
ingestPipelineCreatedStatus === IMPORT_STATUS.COMPLETE ||
(createPipeline === false && indexCreatedStatus === IMPORT_STATUS.COMPLETE)
) {
completedStep = 3;
completedStep = 4;
}
if (uploadStatus === IMPORT_STATUS.COMPLETE) {
completedStep = 4;
completedStep = 5;
}
if (dataViewCreatedStatus === IMPORT_STATUS.COMPLETE) {
completedStep = 5;
completedStep = 6;
}

let processFileTitle = i18n.translate(
Expand All @@ -87,6 +101,12 @@ export const ImportProgress: FC<{ statuses: Statuses }> = ({ statuses }) => {
defaultMessage: 'Process file',
}
);
let initializeDeploymentTitle = i18n.translate(
'xpack.dataVisualizer.file.importProgress.initializeDeploymentTitle',
{
defaultMessage: 'Initialize model deployment',
}
);
let createIndexTitle = i18n.translate(
'xpack.dataVisualizer.file.importProgress.createIndexTitle',
{
Expand Down Expand Up @@ -146,13 +166,43 @@ export const ImportProgress: FC<{ statuses: Statuses }> = ({ statuses }) => {
</p>
);
}
if (completedStep >= 1) {
if (initializeDeployment) {
if (completedStep >= 1) {
processFileTitle = i18n.translate(
'xpack.dataVisualizer.file.importProgress.fileProcessedTitle',
{
defaultMessage: 'File processed',
}
);
initializeDeploymentTitle = i18n.translate(
'xpack.dataVisualizer.file.importProgress.initializingDeploymentTitle',
{
defaultMessage: 'Initializing model deployment',
}
);
statusInfo = (
<p>
<FormattedMessage
id="xpack.dataVisualizer.file.importProgress.processingImportedFileDescription"
defaultMessage="Initializing model deployment"
/>
</p>
);
}
}
if (completedStep >= 2) {
processFileTitle = i18n.translate(
'xpack.dataVisualizer.file.importProgress.fileProcessedTitle',
{
defaultMessage: 'File processed',
}
);
initializeDeploymentTitle = i18n.translate(
'xpack.dataVisualizer.file.importProgress.deploymentInitializedTitle',
{
defaultMessage: 'Model deployed',
}
);
createIndexTitle = i18n.translate(
'xpack.dataVisualizer.file.importProgress.creatingIndexTitle',
{
Expand All @@ -162,7 +212,7 @@ export const ImportProgress: FC<{ statuses: Statuses }> = ({ statuses }) => {
statusInfo =
createPipeline === true ? creatingIndexAndIngestPipelineStatus : creatingIndexStatus;
}
if (completedStep >= 2) {
if (completedStep >= 3) {
createIndexTitle = i18n.translate(
'xpack.dataVisualizer.file.importProgress.indexCreatedTitle',
{
Expand All @@ -178,7 +228,7 @@ export const ImportProgress: FC<{ statuses: Statuses }> = ({ statuses }) => {
statusInfo =
createPipeline === true ? creatingIndexAndIngestPipelineStatus : creatingIndexStatus;
}
if (completedStep >= 3) {
if (completedStep >= 4) {
createIngestPipelineTitle = i18n.translate(
'xpack.dataVisualizer.file.importProgress.ingestPipelineCreatedTitle',
{
Expand All @@ -193,7 +243,7 @@ export const ImportProgress: FC<{ statuses: Statuses }> = ({ statuses }) => {
);
statusInfo = <UploadFunctionProgress progress={uploadProgress} />;
}
if (completedStep >= 4) {
if (completedStep >= 5) {
uploadingDataTitle = i18n.translate(
'xpack.dataVisualizer.file.importProgress.dataUploadedTitle',
{
Expand All @@ -219,7 +269,7 @@ export const ImportProgress: FC<{ statuses: Statuses }> = ({ statuses }) => {
statusInfo = null;
}
}
if (completedStep >= 5) {
if (completedStep >= 6) {
createDataViewTitle = i18n.translate(
'xpack.dataVisualizer.file.importProgress.dataViewCreatedTitle',
{
Expand All @@ -239,78 +289,92 @@ export const ImportProgress: FC<{ statuses: Statuses }> = ({ statuses }) => {
: 'selected') as EuiStepStatus,
onClick: () => {},
},
{
title: createIndexTitle,
status: (indexCreatedStatus !== IMPORT_STATUS.INCOMPLETE // Show failure/completed states first
? indexCreatedStatus
];

if (initializeDeployment === true) {
steps.push({
title: initializeDeploymentTitle,
status: (initializeDeploymentStatus !== IMPORT_STATUS.INCOMPLETE // Show failure/completed states first
? initializeDeploymentStatus
: completedStep === 1 // Then show selected/incomplete states
? 'selected'
: 'incomplete') as EuiStepStatus,
onClick: () => {},
},
{
title: uploadingDataTitle,
status: (uploadStatus !== IMPORT_STATUS.INCOMPLETE // Show failure/completed states first
? uploadStatus
: completedStep === 3 // Then show selected/incomplete states
? 'selected'
: 'incomplete') as EuiStepStatus,
onClick: () => {},
},
];
});
}

steps.push({
title: createIndexTitle,
status: (indexCreatedStatus !== IMPORT_STATUS.INCOMPLETE // Show failure/completed states first
? indexCreatedStatus
: completedStep === 2 // Then show selected/incomplete states
? 'selected'
: 'incomplete') as EuiStepStatus,
onClick: () => {},
});

if (createPipeline === true) {
steps.splice(2, 0, {
steps.push({
title: createIngestPipelineTitle,
status: (ingestPipelineCreatedStatus !== IMPORT_STATUS.INCOMPLETE // Show failure/completed states first
? ingestPipelineCreatedStatus
: completedStep === 2 // Then show selected/incomplete states
: completedStep === 3 // Then show selected/incomplete states
? 'selected'
: 'incomplete') as EuiStepStatus,
onClick: () => {},
});
}

steps.push({
title: uploadingDataTitle,
status: (uploadStatus !== IMPORT_STATUS.INCOMPLETE // Show failure/completed states first
? uploadStatus
: completedStep === 4 // Then show selected/incomplete states
? 'selected'
: 'incomplete') as EuiStepStatus,
onClick: () => {},
});

if (createDataView === true) {
steps.push({
title: createDataViewTitle,
status: (dataViewCreatedStatus !== IMPORT_STATUS.INCOMPLETE // Show failure/completed states first
? dataViewCreatedStatus
: completedStep === 4 // Then show selected/incomplete states
: completedStep === 5 // Then show selected/incomplete states
? 'selected'
: 'incomplete') as EuiStepStatus,
onClick: () => {},
});
}

return (
<React.Fragment>
<>
<EuiStepsHorizontal steps={steps} style={{ backgroundColor: 'transparent' }} />
{statusInfo && (
<React.Fragment>
<>
<EuiSpacer size="m" />
{statusInfo}
</React.Fragment>
</>
)}
</React.Fragment>
</>
);
};

const UploadFunctionProgress: FC<{ progress: number }> = ({ progress }) => {
return (
<React.Fragment>
<>
<p>
<FormattedMessage
id="xpack.dataVisualizer.file.importProgress.uploadingDataDescription"
defaultMessage="Uploading data"
/>
</p>
{progress < 100 && (
<React.Fragment>
<>
<EuiSpacer size="s" />
<EuiProgress value={progress} max={100} color="primary" size="s" />
</React.Fragment>
</>
)}
</React.Fragment>
</>
);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { InferenceInferenceEndpointInfo } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { HttpSetup } from '@kbn/core/public';

const POLL_INTERVAL = 5; // seconds

export class AutoDeploy {
private inferError: Error | null = null;
constructor(private readonly http: HttpSetup, private readonly inferenceId: string) {}

public async deploy() {
this.inferError = null;
if (await this.isDeployed()) {
return;
}

this.infer().catch((e) => {
// ignore timeout errors
// The deployment may take a long time
// we'll know when it's ready from polling the inference endpoints
// looking for num_allocations
const status = e.response?.status;
if (status === 408 || status === 504 || status === 502) {
return;
}
this.inferError = e;
});
await this.pollIsDeployed();
}

private async infer() {
return this.http.fetch<InferenceInferenceEndpointInfo[]>(
`/internal/data_visualizer/inference/${this.inferenceId}`,
{
method: 'POST',
version: '1',
body: JSON.stringify({ input: '' }),
}
);
}

private async isDeployed() {
const inferenceEndpoints = await this.http.fetch<InferenceInferenceEndpointInfo[]>(
'/internal/data_visualizer/inference_endpoints',
{
method: 'GET',
version: '1',
}
);
return inferenceEndpoints.some((endpoint) => {
return (
endpoint.inference_id === this.inferenceId && endpoint.service_settings.num_allocations > 0
);
});
}

private async pollIsDeployed() {
while (true) {
if (this.inferError !== null) {
throw this.inferError;
}
const isDeployed = await this.isDeployed();
if (isDeployed) {
// break out of the loop once we have a successful deployment
return;
}
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL * 1000));
}
}
}
Loading

0 comments on commit babee9a

Please sign in to comment.