Skip to content

Commit

Permalink
Merge pull request #182 from jpmorganchase/merge-cpp
Browse files Browse the repository at this point in the history
Merge from fork
  • Loading branch information
texodus authored Aug 8, 2018
2 parents bd88a12 + 599c101 commit 91b5563
Show file tree
Hide file tree
Showing 23 changed files with 290 additions and 97 deletions.
17 changes: 2 additions & 15 deletions packages/perspective/src/cpp/context_grouped_pkey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -947,15 +947,11 @@ t_ctx_grouped_pkey::notify(const t_table& flattened)
// aggregates should be presized to be same size
// as agg_indices
void
t_ctx_grouped_pkey::get_aggregates(t_uindex nidx,
t_ctx_grouped_pkey::get_aggregates_for_sorting(t_uindex nidx,
const t_idxvec& agg_indices,
t_tscalvec& aggregates,
t_ctx2 * ) const
{

const t_str& grouping_label_col =
m_config.get_grouping_label_column();

for (t_uindex idx = 0, loop_end = agg_indices.size();
idx < loop_end;
++idx)
Expand All @@ -964,16 +960,7 @@ t_ctx_grouped_pkey::get_aggregates(t_uindex nidx,

if (which_agg < 0)
{
if (m_has_label)
{
t_tscalvec pkeys;
auto iters = m_tree->get_pkeys_for_leaf(nidx);
aggregates[idx].set(m_state->get_value(
iters.first->m_pkey, grouping_label_col));
} else
{
aggregates[idx].set(m_tree->get_value(nidx));
}
aggregates[idx].set(m_tree->get_sortby_value(nidx));
}
else
{
Expand Down
107 changes: 106 additions & 1 deletion packages/perspective/src/cpp/context_two.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ t_ctx2::get_ctraversal_indices() const
}

t_tscalvec
t_ctx2::get_data(t_tvidx start_row,
t_ctx2::get_data_old_path(t_tvidx start_row,
t_tvidx end_row,
t_tvidx start_col,
t_tvidx end_col) const
Expand Down Expand Up @@ -343,6 +343,111 @@ t_ctx2::get_data(t_tvidx start_row,
return retval;
}

t_tscalvec
t_ctx2::get_data(t_tvidx start_row,
t_tvidx end_row,
t_tvidx start_col,
t_tvidx end_col) const
{
static bool const enable_getdata_fix = true;

if( !enable_getdata_fix )
return get_data_old_path( start_row, end_row, start_col, end_col );

auto ext = sanitize_get_data_extents(
*this, start_row, end_row, start_col, end_col);

t_uidxpvec cells;
for (t_index ridx = ext.m_srow; ridx < ext.m_erow; ++ridx)
{
for (t_index cidx = ext.m_scol; cidx < ext.m_ecol; ++cidx)
{
cells.push_back(t_idxpair(ridx, cidx));
}
}

auto cells_info = resolve_cells(cells);

t_index nrows = ext.m_erow - ext.m_srow;
t_index stride = ext.m_ecol - ext.m_scol;
t_tscalvec retval(nrows * stride);

t_tscalar empty = mknone();

typedef std::pair<t_uindex, t_uindex> t_aggpair;
std::map<t_aggpair, const t_column*> aggmap;

for (t_uindex treeidx = 0, tree_loop_end = m_trees.size();
treeidx < tree_loop_end;
++treeidx)
{
auto aggtable = m_trees[treeidx]->get_aggtable();
t_schema aggschema = aggtable->get_schema();

for (t_uindex aggidx = 0,
agg_loop_end = m_config.get_num_aggregates();
aggidx < agg_loop_end;
++aggidx)
{
const t_str& aggname = aggschema.m_columns[aggidx];

aggmap[t_aggpair(treeidx, aggidx)] =
aggtable->get_const_column(aggname).get();
}
}

const t_aggspecvec& aggspecs = m_config.get_aggregates();

for (t_index ridx = ext.m_srow; ridx < ext.m_erow; ++ridx)
{
if( ext.m_scol == 0 )
{
retval[(ridx - ext.m_srow) * stride].set(
rtree()->get_value(m_rtraversal->get_tree_index(ridx)));
}

for (t_index cidx = std::max( ext.m_scol, t_tvidx(1) ); cidx < ext.m_ecol; ++cidx)
{
t_index insert_idx = (ridx - ext.m_srow) * stride + (cidx - ext.m_scol);
const t_cellinfo& cinfo = cells_info[insert_idx];

if (cinfo.m_idx < 0)
{
retval[insert_idx].set(empty);
}
else
{
auto aggcol = aggmap[t_aggpair(cinfo.m_treenum,
cinfo.m_agg_index)];

t_ptidx p_idx =
m_trees[cinfo.m_treenum]->get_parent_idx(
cinfo.m_idx);

t_uindex agg_ridx =
m_trees[cinfo.m_treenum]->get_aggidx(cinfo.m_idx);

t_uindex agg_pridx =
p_idx == INVALID_INDEX
? INVALID_INDEX
: m_trees[cinfo.m_treenum]->get_aggidx(p_idx);

auto value =
extract_aggregate(aggspecs[cinfo.m_agg_index],
aggcol,
agg_ridx,
agg_pridx);

if (!value.is_valid())
value.set(empty);

retval[insert_idx].set(value);
}
}
}

return retval;
}
void
t_ctx2::sort_by(const t_sortsvec& sortby)
{
Expand Down
6 changes: 3 additions & 3 deletions packages/perspective/src/cpp/gnode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ t_gnode::_process()

t_colcptrvec fcolumns(flattened->num_columns());
t_uindex ncols = sschema.get_num_columns();

t_colcptrvec scolumns(ncols);
t_colptrvec dcolumns(ncols);
t_colptrvec pcolumns(ncols);
Expand Down Expand Up @@ -1261,7 +1261,7 @@ t_gnode::notify_contexts(const t_table& flattened)
t_sctxhvec ctxhvec(num_ctx);

t_index ctxh_count = 0;
for (t_sctxhmap::iterator iter = m_contexts.begin();
for (t_sctxhmap::const_iterator iter = m_contexts.begin();
iter != m_contexts.end();
++iter)
{
Expand All @@ -1272,7 +1272,7 @@ t_gnode::notify_contexts(const t_table& flattened)
auto notify_context_helper = [this, &ctxhvec, &flattened](
t_index ctxidx) {
const t_ctx_handle& ctxh = ctxhvec[ctxidx];
switch (ctxh.m_ctx_type)
switch (ctxh.get_type())
{
case TWO_SIDED_CONTEXT:
{
Expand Down
29 changes: 24 additions & 5 deletions packages/perspective/src/cpp/gnode_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ t_gstate::update_history(const t_table* tbl)
++idx)
{
const t_str& cname = fschema.m_columns[idx];
col_translation[count] = idx;
fcolumns[idx] = tbl->get_const_column(cname).get();
++count;
}
col_translation[count] = idx;
fcolumns[idx] = tbl->get_const_column(cname).get();
++count;
}

t_colptrvec scolumns(ncols);

Expand Down Expand Up @@ -762,10 +762,29 @@ t_gstate::_get_pkeyed_table(const t_schema& schema,
if (get_pkey_dtype() == DTYPE_STR)
{
static const t_tscalar empty = get_interned_tscalar("");
static bool const enable_pkeyed_table_vocab_reserve = true;

t_uindex offset = has_pkey(empty) ? 0 : 1;

pkey_col->set_vocabulary(order);
size_t total_string_size = 0;

if( enable_pkeyed_table_vocab_reserve )
{
total_string_size += offset;
for (t_uindex idx = 0, loop_end = order.size();
idx < loop_end;
++idx)
{
total_string_size += strlen(order[idx].first.get_char_ptr()) + 1;
}
}

// if the m_mapping is empty, get_pkey_dtype() may lie about our pkeys being strings
// don't try to reserve in this case
if( !order.size() )
total_string_size = 0;

pkey_col->set_vocabulary(order, total_string_size);
auto base = pkey_col->get_nth<t_uindex>(0);

for (t_uindex idx = 0, loop_end = order.size();
Expand Down
38 changes: 27 additions & 11 deletions packages/perspective/src/cpp/sparse_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1011,18 +1011,26 @@ t_stree::update_aggs_from_static(const t_dtree_ctx& ctx,
cols_topo_sorted.clear();
cols_topo_sorted.reserve(col_cnt);

static bool const enable_aggregate_reordering = true;
static bool const enable_fix_double_calculation = true;

std::unordered_set< t_column* > dst_visited;
auto push_column = [&](size_t idx)
auto push_column = [&]( size_t idx )
{
t_column* dst = agg_update_info.m_dst[idx];
if (dst_visited.find(dst) != dst_visited.end())
if ( enable_fix_double_calculation )
{
return;
t_column* dst = agg_update_info.m_dst[ idx ];
if ( dst_visited.find( dst ) != dst_visited.end() )
{
return;
}
dst_visited.insert( dst );
}
dst_visited.insert(dst);
cols_topo_sorted.push_back(idx);
cols_topo_sorted.push_back( idx );
};

if ( enable_aggregate_reordering )
{
// Move scaled agg columns to the end
// This does not handle case where scaled aggregate depends on other scaled aggregate
// ( not sure if that is possible )
Expand All @@ -1040,6 +1048,15 @@ t_stree::update_aggs_from_static(const t_dtree_ctx& ctx,
push_column(i);
}
}
}
else
{
// If backed out, use same column order as before ( not topo sorted )
for ( size_t i = 0; i < col_cnt; ++i )
{
push_column( i );
}
}

for (const auto& r : m_tree_unification_records)
{
Expand Down Expand Up @@ -1213,6 +1230,7 @@ t_stree::update_agg_table(t_uindex nidx,
t_index nstrands,
const t_gstate& gstate)
{
static bool const enable_sticky_nan_fix = true;
for (t_uindex idx : info.m_dst_topo_sorted)
{
const t_column* src = info.m_src[idx];
Expand All @@ -1231,16 +1249,14 @@ t_stree::update_agg_table(t_uindex nidx,
t_tscalar dst_scalar = dst->get_scalar(dst_ridx);
old_value.set(dst_scalar);
new_value.set(dst_scalar.add(src_scalar));

if(old_value.is_nan())
if( enable_sticky_nan_fix && old_value.is_nan() ) // is_nan returns false for non-float types
{
// if we previously had a NaN, add can't make it finite again; recalculate entire sum in case it is now finite
auto pkeys = get_pkeys(nidx);
t_f64vec values;
gstate.read_column(spec.get_dependencies()[0].name(), pkeys, values);
new_value.set(std::accumulate(values.begin(), values.end(), t_float64(0)));
}

dst->set_scalar(dst_ridx, new_value);
}
break;
Expand Down Expand Up @@ -2337,7 +2353,7 @@ t_stree::get_aggcols(const t_idxvec& agg_indices) const
// aggregates should be presized to be same size
// as agg_indices
void
t_stree::get_aggregates(t_uindex nidx,
t_stree::get_aggregates_for_sorting(t_uindex nidx,
const t_idxvec& agg_indices,
t_tscalvec& aggregates,
t_ctx2 * ctx2) const
Expand All @@ -2350,7 +2366,7 @@ t_stree::get_aggregates(t_uindex nidx,
auto which_agg = agg_indices[idx];
if(which_agg < 0)
{
aggregates[idx] = get_value(nidx);
aggregates[idx] = get_sortby_value(nidx);
}
else if( ctx2 || ( size_t(which_agg) >= m_aggcols.size() ) )
{
Expand Down
12 changes: 6 additions & 6 deletions packages/perspective/src/cpp/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ t_mask
t_table::filter_cpp(t_filter_op combiner,
const t_ftermvec& fterms_) const
{
static bool const enable_interned_filtering = true;

auto self = const_cast<t_table*>(this);
auto fterms = fterms_;
Expand All @@ -581,10 +582,9 @@ t_table::filter_cpp(t_filter_op combiner,
indices[idx] = m_schema.get_colidx(fterms[idx].m_colname);
columns[idx] = get_const_column(fterms[idx].m_colname).get();
fterms[idx].coerce_numeric(columns[idx]->get_dtype());
auto op = fterms[idx].m_op;
t_tscalar& thr = fterms[idx].m_threshold;
if (fterms[idx].m_use_interned)
if (fterms[idx].m_use_interned && enable_interned_filtering)
{
t_tscalar& thr = fterms[idx].m_threshold;
auto col = self->get_column(fterms[idx].m_colname);
auto interned = col->get_interned(thr.get_char_ptr());
thr.set(interned);
Expand All @@ -611,7 +611,7 @@ t_table::filter_cpp(t_filter_op combiner,
const auto& ft = fterms[cidx];
t_bool tval;

if (ft.m_use_interned)
if (ft.m_use_interned && enable_interned_filtering)
{
cell_val.set(
*(columns[cidx]->get_nth<t_stridx>(ridx)));
Expand All @@ -623,7 +623,7 @@ t_table::filter_cpp(t_filter_op combiner,
tval = ft(cell_val);
}

if (!tval)
if (!cell_val.is_valid() || !tval)
{
pass = false;
break;
Expand Down Expand Up @@ -875,7 +875,7 @@ t_table::fill_expr_helper(const t_svec& icol_names,
struct cmp_str
{
bool
operator()(const char* a, const char* b)
operator()(const char* a, const char* b) const
{
return std::strcmp(a, b) < 0;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/perspective/src/cpp/traversal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ t_traversal::expand_node(const t_sortsvec& sortby, t_tvidx exp_idx, t_ctx2 * ctx
iter != tchildren.end();
++iter)
{
m_tree->get_aggregates(
m_tree->get_aggregates_for_sorting(
iter->m_idx, sortby_agg_indices, aggregates, ctx2);
(*sortelems)[count] = t_mselem(aggregates, child_idx);
++count;
Expand Down
5 changes: 4 additions & 1 deletion packages/perspective/src/cpp/tree_context_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ notify_sparse_tree_common(t_table_sptr strands,

if (!leaf_paths.empty() && traversal.get() && traversal->size() == 1)
{
traversal->populate_root_children(tree);
if ( traversal->get_node( 0 ).m_expanded )
{
traversal->populate_root_children( tree );
}
}
else
{
Expand Down
Loading

0 comments on commit 91b5563

Please sign in to comment.