Skip to content

Commit

Permalink
Remove Chunked/Simple DataFrame distinction
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Hulette committed Jan 12, 2018
1 parent aa999f8 commit 2744c63
Showing 1 changed file with 61 additions and 88 deletions.
149 changes: 61 additions & 88 deletions js/src/dataframe/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,89 +6,33 @@ import { Predicate } from "./predicate"

export type NextFunc = (idx: number, cols: Vector[]) => void;

export abstract class DataFrame {
constructor(readonly lengths: Uint32Array) {}
public abstract columns: Vector<any>[];
public abstract getBatch(batch: number): Vector[];
public abstract scan(next: NextFunc): void;
public filter(predicate: Predicate): DataFrame {
return new FilteredDataFrame(this, predicate);
}

static from(table: Vector<any>): DataFrame {
// There are two types of Vectors we might want to make into
// a ChunkedDataFrame:
// 1) a StructVector of all VirtualVectors
// 2) a VirtualVector of all StructVectors
if (table instanceof StructVector) {
if (table.columns.every((col) => col instanceof VirtualVector)) {
// ChunkedDataFrame case (1)
return new ChunkedDataFrame(table.columns as VirtualVector<any>[]);
} else {
return new SimpleDataFrame(table.columns)
}
} else if (table instanceof VirtualVector &&
table.vectors.every((vec) => vec instanceof StructVector)) {
const structs = table.vectors as StructVector<any>[];
const rest: StructVector<any>[] = structs.slice(1);
const virtuals: VirtualVector<any>[] = structs[0].columns.map((vec, col_idx) => {
return vec.concat(...rest.map((vec) => vec.columns[col_idx]));
}) as VirtualVector<any>[];
// ChunkedDataFrame case (2)
return new ChunkedDataFrame(virtuals);
} else {
return new SimpleDataFrame([table]);
}
}

count(): number {
return this.lengths.reduce((acc, val) => acc + val);
}
}

class SimpleDataFrame extends DataFrame {
export class DataFrame {
readonly lengths: Uint32Array;
constructor(public columns: Vector<any>[]) {
super(new Uint32Array([0, columns[0].length]));
if (!this.columns.slice(1).every((v) => v.length === this.columns[0].length)) {
throw new Error("Attempted to create a DataFrame with un-aligned vectors");
}
}

public getBatch() {
return this.columns;
}

public scan(next: NextFunc) {
for (let idx = -1; ++idx < this.lengths[1];) {
next(idx, this.columns)
}
}

*[Symbol.iterator]() {
for (let idx = -1; ++idx < this.lengths[1];) {
yield idx;
}
}
}

class ChunkedDataFrame extends DataFrame {
public columns: Vector<any>[];
constructor(private virtuals: VirtualVector<any>[]) {
super(ChunkedDataFrame.getLengths(virtuals));
this.virtuals = virtuals;
constructor(readonly batches: Vector<any>[][]) {
// for each batch
this.lengths = new Uint32Array(batches.map((batch)=>{
// verify that every vector has the same length, and return that
// length
// throw an error if the lengths don't match
return batch.reduce((length, col) => {
if (col.length !== length)
throw new Error("Attempted to create a DataFrame with un-aligned vectors");
return length;
}, batch[0].length);
}));
}

getBatch(batch: number): Vector[] {
return this.virtuals.map((virt) => virt.vectors[batch]);
public filter(predicate: Predicate): DataFrame {
return new FilteredDataFrame(this, predicate);
}

scan(next: NextFunc) {
for (let batch = -1; ++batch < this.lengths.length;) {
const length = this.lengths[batch];

// load batches
const columns = this.getBatch(batch);
const columns = this.batches[batch];

// yield all indices
for (let idx = -1; ++idx < length;) {
Expand All @@ -97,12 +41,16 @@ class ChunkedDataFrame extends DataFrame {
}
}

count(): number {
return this.lengths.reduce((acc, val) => acc + val);
}

*[Symbol.iterator]() {
for (let batch = -1; ++batch < this.lengths.length;) {
const length = this.lengths[batch];

// load batches
this.columns = this.getBatch(batch);
this.columns = this.batches[batch];

// yield all indices
for (let idx = -1; ++idx < length;) {
Expand All @@ -111,34 +59,48 @@ class ChunkedDataFrame extends DataFrame {
}
}

private static getLengths(virtuals: VirtualVector<any>[]): Uint32Array {
if (!virtuals.slice(1).every((v) => v.aligned(virtuals[0]))) {
throw new Error("Attempted to create a DataFrame with un-aligned vectors");
static from(table: Vector<any>): DataFrame {
if (table instanceof StructVector) {
const columns = table.columns;
if (isAligned(columns)) {
// StructVector of aligned VirtualVectors
// break up VirtualVectors into batches
const batches = columns[0].vectors.map((_,i) => {
return columns.map((vec: VirtualVector<any>) => {
return vec.vectors[i];
});
});
return new DataFrame(batches);
} else {
return new DataFrame([columns]);
}
} else if (table instanceof VirtualVector &&
table.vectors.every((vec) => vec instanceof StructVector)) {
return new DataFrame(table.vectors.map((vec) => {
return (vec as StructVector<any>).columns;
}));
} else {
return new DataFrame([[table]]);
}
return new Uint32Array(virtuals[0].vectors.map((v)=>v.length));
}
}

class FilteredDataFrame extends DataFrame {
public columns: Vector<any>[];
constructor (readonly parent: DataFrame, private predicate: Predicate) {
super(parent.lengths);
super(parent.batches);
}

getBatch(batch: number): Vector[] {
return this.parent.getBatch(batch);
};

scan(next: NextFunc) {
// inlined version of this:
// this.parent.scan((idx, columns) => {
// if (this.predicate(idx, columns)) next(idx, columns);
// });
for (let batch = -1; ++batch < this.parent.lengths.length;) {
const length = this.parent.lengths[batch];
for (let batch = -1; ++batch < this.lengths.length;) {
const length = this.lengths[batch];

// load batches
const columns = this.parent.getBatch(batch);
const columns = this.batches[batch];
const predicate = this.predicate.bind(columns);

// yield all indices
Expand All @@ -156,11 +118,11 @@ class FilteredDataFrame extends DataFrame {
// });
// return sum;
let sum = 0;
for (let batch = -1; ++batch < this.parent.lengths.length;) {
const length = this.parent.lengths[batch];
for (let batch = -1; ++batch < this.lengths.length;) {
const length = this.lengths[batch];

// load batches
const columns = this.parent.getBatch(batch);
const columns = this.batches[batch];
const predicate = this.predicate.bind(columns);

// yield all indices
Expand All @@ -178,3 +140,14 @@ class FilteredDataFrame extends DataFrame {
);
}
}

function isAligned(columns: Vector[]): columns is VirtualVector<any>[] {
if (columns.every((col) => col instanceof VirtualVector)) {
const virtuals = columns as VirtualVector<any>[]

return virtuals.slice(1).every((col) => {
return col.aligned(virtuals[0]);
});
}
return false;
}

0 comments on commit 2744c63

Please sign in to comment.