Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the time cost to go into training #410

Merged
merged 26 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
772dd5a
feat: add GlobalDataProcess plugin
WenheLI Jul 28, 2020
9132604
fix: fix lint issue
WenheLI Jul 28, 2020
00539da
fix: fix lint issue & rename datasetProcess
WenheLI Jul 28, 2020
523a24a
Merge branch 'add/global-process' of https://github.com/alibaba/pipco…
WenheLI Jul 28, 2020
f72eae9
add
WenheLI Jul 28, 2020
ec6a3b5
fix: add inline comments
WenheLI Jul 28, 2020
50dff0b
Merge branch 'master' of https://github.com/alibaba/pipcook into add/…
rickycao-qy Jul 29, 2020
fc941ef
Merge branch 'master' of https://github.com/alibaba/pipcook into add/…
rickycao-qy Jul 30, 2020
5998960
run pipeline
rickycao-qy Jul 30, 2020
6224db4
Merge branch 'master' of https://github.com/alibaba/pipcook into add/…
rickycao-qy Jul 30, 2020
250f6d0
Merge branch 'master' of https://github.com/alibaba/pipcook into add/…
rickycao-qy Aug 1, 2020
5b6165b
save
rickycao-qy Aug 2, 2020
40385c8
Merge branch 'master' of https://github.com/alibaba/pipcook into add/…
rickycao-qy Aug 2, 2020
7b3fbc8
change plugin accordingly
rickycao-qy Aug 5, 2020
a8ab4b8
Merge branch 'master' of https://github.com/alibaba/pipcook into add/…
rickycao-qy Aug 5, 2020
e5ab252
delete uncessary log
rickycao-qy Aug 5, 2020
6c9676d
fix lint
rickycao-qy Aug 5, 2020
c7c8db5
fix
rickycao-qy Aug 6, 2020
1775ae2
change way how image-class tensorflow owrks
rickycao-qy Aug 6, 2020
aa7803a
change resize to reasonable size
rickycao-qy Aug 7, 2020
a529104
Merge branch 'master' of https://github.com/alibaba/pipcook into add/…
rickycao-qy Aug 7, 2020
e3ee6c7
fix
rickycao-qy Aug 7, 2020
960b1c6
move datasetprocess to default
rickycao-qy Aug 8, 2020
48a2be7
test dataloader
rickycao-qy Aug 10, 2020
2641855
add eof
rickycao-qy Aug 10, 2020
c960e1b
fix api for python tf
rickycao-qy Aug 10, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 11 additions & 35 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions packages/cli/assets/pluginPackage/src/dataProcess.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { DataProcessType, ArgsType, UniDataset } from '@pipcook/pipcook-core'
import { DataProcessType, ArgsType, Sample, Metadata } from '@pipcook/pipcook-core'

const templateDataProcess: DataProcessType = async (data: UniDataset, args: ArgsType): Promise<UniDataset> => {
return {} as UniDataset;
const templateDataProcess: DataProcessType = async (data: Sample, metadata: Metadata, args: ArgsType): Promise<void> => {
}

export default templateDataProcess;
7 changes: 7 additions & 0 deletions packages/cli/assets/pluginPackage/src/datasetProcess.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { DatasetProcessType, ArgsType, UniDataset } from '@pipcook/pipcook-core'

const templateDatasetProcess: DatasetProcessType = async (dataset: UniDataset, args: ArgsType): Promise<void> => {
// Do some process to the whole dataset
};

export default templateDatasetProcess;
2 changes: 2 additions & 0 deletions packages/core/src/constants/plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { PluginTypeI } from '../types/plugins';
export const DATACOLLECT: PluginTypeI = 'dataCollect';
export const DATAACCESS: PluginTypeI = 'dataAccess';
export const DATAPROCESS: PluginTypeI = 'dataProcess';
export const DATASETPROCESS: PluginTypeI = 'datasetProcess';
export const MODELLOAD: PluginTypeI = 'modelLoad';
export const MODELDEFINE: PluginTypeI = 'modelDefine';
export const MODELTRAIN: PluginTypeI = 'modelTrain';
Expand All @@ -11,6 +12,7 @@ export const PLUGINS: PluginTypeI[] = [
DATACOLLECT,
DATAACCESS,
DATAPROCESS,
DATASETPROCESS,
MODELLOAD,
MODELDEFINE,
MODELTRAIN,
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export {
DataCollectType,
DataAccessType,
DataProcessType,
DatasetProcessType,
ModelLoadType,
ModelDefineType,
ModelTrainType,
Expand Down
89 changes: 84 additions & 5 deletions packages/core/src/types/data/common.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import events from 'events';

import { generateId } from '../../utils/public';
import { Statistic } from '../other';

/**
Expand Down Expand Up @@ -44,11 +47,87 @@ export interface Sample {
/**
* The data loader to used to load dataset.
*/
export interface DataLoader {
len: () => Promise<number>;
getItem: (id: number) => Promise<Sample>;
next?: () => Promise<Sample>;
nextBatch?: (batchSize: number) => Promise<Array<Sample>>;
export abstract class DataLoader{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
export abstract class DataLoader{
export abstract class DataLoader {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And a unit tests for this class is required :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lint problem has been fixed. I will add a unit test ASAP

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit test has been added. Please have a check pls :)

private event = new events.EventEmitter();
private fetchIndex = 0;
private id = generateId();
public processIndex = -1;

/**
* Data-access plugin developer needs to implement these three abstract function
* which is to notify the length of data, how to get and set the specific index of data
*/
abstract async len(): Promise<number>;
abstract async getItem(id: number): Promise<Sample>;
abstract async setItem(id: number, sample: Sample): Promise<void>;

notifyProcess(): void {
this.event.emit(this.id);
}

/**
* iterate over dataset. Get next single sample
* Override Forbidden
*/
async next(): Promise<Sample> {
// reset index of data fetched to beginning when it reaches the end
if (this.fetchIndex >= await this.len()) {
this.fetchIndex = 0;
}

// if the data fetched has already been processed, return it
if (this.fetchIndex < this.processIndex || this.processIndex === -1) {
return this.getItem(this.fetchIndex++);
}

// if data fetched not already processed, wait util this is finished
return new Promise((resolve) => {
this.event.on(this.id, async () => {
if (this.fetchIndex < this.processIndex) {
const data = await this.getItem(this.fetchIndex++);
this.event.removeAllListeners(this.id);
resolve(data);
}
});
});
}

/**
* iterate over dataset. Get next batch of data
* Override Forbidden
*/
async nextBatch(batchSize: number): Promise<Sample[]> {
const dataLen = await this.len();

if (this.fetchIndex >= dataLen) {
this.fetchIndex = 0;
}

if (this.fetchIndex + batchSize >= dataLen) {
batchSize = dataLen - this.fetchIndex - 1;
}

if (this.fetchIndex + batchSize < this.processIndex) {
const result = [];
for (let i = this.fetchIndex; i < this.fetchIndex + batchSize; i++) {
result.push(this.getItem(i));
}
return Promise.all(result);
}

return new Promise((resolve) => {
this.event.on(this.id, async () => {
if (this.fetchIndex + batchSize < this.processIndex) {
const result = [];
for (let i = this.fetchIndex; i < this.fetchIndex + batchSize; i++) {
result.push(this.getItem(i));
}
this.event.removeAllListeners(this.id);
resolve(await Promise.all(result));
}
});
});
}
}

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/types/data/csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ export interface CsvMetadata extends Metadata {
};
}

export interface CsvDataLoader extends DataLoader {
getItem: (id: number) => Promise<CsvSample>;
export abstract class CsvDataLoader extends DataLoader {
abstract getItem(id: number): Promise<CsvSample>;
}

export interface CsvDataset extends UniDataset {
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/types/data/image.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ export interface ImageSample {
label: ImageLabel;
}

export interface ImageDataLoader extends DataLoader {
getItem: (id: number) => Promise<ImageSample>;
export abstract class ImageDataLoader extends DataLoader {
abstract getItem(id: number): Promise<ImageSample>;
}

export interface ImageDataset extends UniDataset {
Expand Down
30 changes: 27 additions & 3 deletions packages/core/src/types/plugins.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import { UniDataset, Sample, Metadata } from './data/common';
import { UniModel } from './model';
import { EvaluateResult } from './other';
Expand All @@ -9,7 +8,7 @@ interface InsertParams {
dataDir: string;
}

export type PluginTypeI = 'dataCollect' | 'dataAccess' | 'dataProcess' | 'modelLoad' | 'modelDefine' |'modelTrain' | 'modelEvaluate';
export type PluginTypeI = 'dataCollect' | 'dataAccess' | 'dataProcess' | 'datasetProcess' | 'modelLoad' | 'modelDefine' | 'modelTrain' | 'modelEvaluate';

/**
* The base type which represents the `Record` from pipeline config file.
Expand Down Expand Up @@ -124,7 +123,32 @@ export interface DataAccessType extends PipcookPlugin {
* @param args The arguments from pipeline config file.
*/
export interface DataProcessType extends PipcookPlugin {
(data: Sample, metadata: Metadata, args: ArgsType): Promise<void>;
(data: Sample, metadata: Metadata, args: ArgsType): Promise<Sample>;
}

/**
* Similar to `DataProcessType`, but this type targets on the whole dataset rather than a sample.
* This plugin will be convenient when it comes to process data that requires information from the whole dataset. I.E. corpus construction, average data among the dataset ...
*
* @example
*
* ```js
* const getCorpus = async (dataset: UniDataset, metadata: Metadata, args?: ArgsType): Promise<void> => {
* const corpus: Set<string> = new Set();
* for (const data of dataset) {
* for (const word of (data.data.split(" "))) {
* corpus.add(word);
* }
* }
* metadata.corpus = corpus;
* }
* ```
*
* @param dataset The dataset of which you loaded in `dataCollect` & `dataAccess`
* @param args The arguments from pipeline config file.
*/
export interface DatasetProcessType extends PipcookPlugin {
(dataset: UniDataset, args: ArgsType): Promise<void>;
}

/**
Expand Down
29 changes: 20 additions & 9 deletions packages/costa/src/client/entry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,28 @@ async function emitStart(message: PluginMessage): Promise<void> {
if (pkg.pipcook.category === 'dataProcess') {
// in "dataProcess" plugin, we need to do process them in one by one.
const [ dataset, args ] = pluginArgs.map(deserializeArg) as [ UniDataset, any ];
const loaders = [ dataset.trainLoader, dataset.validationLoader, dataset.testLoader ]
[ dataset.trainLoader, dataset.validationLoader, dataset.testLoader ]
.filter((loader: DataLoader) => loader != null)
.map(async (loader: DataLoader) => {
const len = await loader.len();
// FIXME(Yorkie): in parallel?
for (let i = 0; i < len; i++) {
const sample = await loader.getItem(i);
await fn(sample, dataset.metadata, args);
}
.forEach(async (loader: DataLoader) => {
process.nextTick(async () => {
const len = await loader.len();
loader.processIndex = 0;
for (let i = 0; i < len; i++) {
let sample = await loader.getItem(i);
sample = await fn(sample, dataset.metadata, args);
await loader.setItem(i, sample);
loader.processIndex = i + 1;
loader.notifyProcess();
}
});
});
await Promise.all(loaders);
recv(PluginOperator.WRITE);
return;
}

if (pkg.pipcook.category === 'datasetProcess') {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we move the datasetProcess to default handler?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, have remove this

const [ dataset, args ] = pluginArgs.map(deserializeArg) as [ UniDataset, any ];
await fn(dataset, args);
recv(PluginOperator.WRITE);
return;
}
Expand Down
8 changes: 8 additions & 0 deletions packages/daemon/src/model/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ export class PipelineModel extends Model {
readonly dataAccessParams: string;
readonly dataProcess: string;
readonly dataProcessParams: string;
readonly datasetProcess: string;
readonly datasetProcessParams: string;
readonly modelDefine: string;
readonly modelDefineParams: string;
readonly modelLoad: string;
Expand Down Expand Up @@ -65,6 +67,12 @@ export default async function model(context: IApplicationContext): Promise<Pipel
dataProcessParams: {
type: STRING
},
datasetProcess: {
type: STRING
},
datasetProcessParams: {
type: STRING
},
modelDefine: {
type: STRING
},
Expand Down
8 changes: 8 additions & 0 deletions packages/daemon/src/service/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ interface GenerateOptions {
modelPath: string;
modelPlugin: PluginPackage;
dataProcess?: PluginPackage;
datasetProcess?: PluginPackage;
pipeline: PipelineModel;
workingDir: string;
template: string;
Expand Down Expand Up @@ -213,6 +214,12 @@ export class PipelineService {
dataDir
}));

let datasetProcess: PluginPackage;
if (plugins.datasetProcess) {
datasetProcess = plugins.datasetProcess.plugin;
await runnable.start(plugins.datasetProcess.plugin, dataset, getParams(plugins.datasetProcess.params));
}

let dataProcess: PluginPackage;
if (plugins.dataProcess) {
dataProcess = plugins.dataProcess.plugin;
Expand Down Expand Up @@ -261,6 +268,7 @@ export class PipelineService {
modelPath,
modelPlugin,
dataProcess,
datasetProcess,
pipeline,
workingDir: runnable.workingDir,
template: 'node' // set node by default
Expand Down
Loading