Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12951] [SQL] support spilling in generated aggregate #10998

Closed
wants to merge 5 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Jan 31, 2016

This PR add spilling support for generated TungstenAggregate.

If spilling happened, it's not that bad to do the iterator based sort-merge-aggregate (not generated).

The changes will be covered by TungstenAggregationQueryWithControlledFallbackSuite

@rxin
Copy link
Contributor

rxin commented Jan 31, 2016

Can you add some documentation in the code somewhere appropriate to explain the high level flow (e.g. when X happens, we switch to Y through Z)?

@hvanhovell
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jan 31, 2016

Test build #50460 has finished for PR 10998 at commit 07c4119.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 1, 2016

Test build #50494 has finished for PR 10998 at commit 02f2a25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nongli
Copy link
Contributor

nongli commented Feb 1, 2016

Can you include the generated code?

@davies
Copy link
Contributor Author

davies commented Feb 2, 2016

For query:

sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()

will generate:

/* 001 */
/* 002 */ public Object generate(Object[] references) {
/* 003 */   return new GeneratedIterator(references);
/* 004 */ }
/* 005 */
/* 006 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */
/* 008 */   private Object[] references;
/* 009 */   private boolean agg_initAgg0;
/* 010 */   private org.apache.spark.sql.execution.aggregate.TungstenAggregate agg_plan1;
/* 011 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap2;
/* 012 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter3;
/* 013 */   private org.apache.spark.unsafe.KVIterator agg_mapIter4;
/* 014 */   private boolean range_initRange6;
/* 015 */   private long range_partitionEnd7;
/* 016 */   private long range_number8;
/* 017 */   private boolean range_overflow9;
/* 018 */   private UnsafeRow agg_result19;
/* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder20;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter21;
/* 021 */   private int agg_count38;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner agg_unsafeRowJoiner41;
/* 023 */
/* 024 */   private void initRange(int idx) {
/* 049 */   }
/* 052 */   private void agg_doAggregateWithKeys5() throws java.io.IOException {
/* 054 */     // initialize Range
/* 055 */     if (!range_initRange6) {
/* 056 */       range_initRange6 = true;
/* 057 */       if (input.hasNext()) {
/* 058 */         initRange(((InternalRow) input.next()).getInt(0));
/* 059 */       } else {
/* 060 */         return;
/* 061 */       }
/* 062 */     }
/* 063 */
/* 064 */     while (!range_overflow9 && range_number8 < range_partitionEnd7) {
/* 065 */       long range_value10 = range_number8;
/* 066 */       range_number8 += 1L;
/* 067 */       if (range_number8 < range_value10 ^ 1L < 0) {
/* 068 */         range_overflow9 = true;
/* 069 */       }
/* 076 */       long project_value12 = -1L;
/* 077 */       project_value12 = range_value10 & 65535L;
/* 080 */       // generate grouping key
/* 086 */       agg_rowWriter21.write(0, project_value12);
/* 087 */
/* 088 */
/* 089 */       UnsafeRow agg_aggBuffer23 = null;
/* 090 */       if (true) {
/* 091 */         agg_aggBuffer23 = agg_hashMap2.getAggregationBufferFromUnsafeRow(agg_result19);
/* 092 */       }
/* 093 */       if (agg_aggBuffer23 == null) {
/* 094 */         if (agg_sorter3 == null) {
/* 095 */           agg_sorter3 = agg_hashMap2.destructAndCreateExternalSorter();
/* 096 */         } else {
/* 097 */           agg_sorter3.merge(agg_hashMap2.destructAndCreateExternalSorter());
/* 098 */         }
/* 099 */         agg_count38 = 0;
/* 100 */         agg_aggBuffer23 = agg_hashMap2.getAggregationBufferFromUnsafeRow(agg_result19);
/* 101 */         if (agg_aggBuffer23 == null) {
/* 102 */           // failed to allocate the first page
/* 103 */           throw new OutOfMemoryError("No enough memory for aggregation");
/* 104 */         }
/* 105 */       }
/* 106 */       agg_count38 += 1;
/* 107 */
/* 108 */       // evaluate aggregate function
/* 112 */       boolean agg_isNull28 = agg_aggBuffer23.isNullAt(0);
/* 113 */       long agg_value29 = agg_isNull28 ? -1L : (agg_aggBuffer23.getLong(0));
/* 114 */       boolean agg_isNull26 = agg_isNull28;
/* 115 */       long agg_value27 = agg_value29;
/* 116 */
/* 117 */       if (agg_isNull26) {
/* 121 */         boolean agg_isNull30 = false;
/* 122 */         long agg_value31 = -1L;
/* 123 */         if (!false) {
/* 124 */           agg_value31 = (long) 0;
/* 125 */         }
/* 126 */         if (!agg_isNull30) {
/* 127 */           agg_isNull26 = false;
/* 128 */           agg_value27 = agg_value31;
/* 129 */         }
/* 130 */       }
/* 134 */       boolean agg_isNull34 = false;
/* 135 */       long agg_value35 = -1L;
/* 136 */       if (!false) {
/* 137 */         agg_value35 = project_value12;
/* 138 */       }
/* 139 */       long agg_value25 = -1L;
/* 140 */       agg_value25 = agg_value27 + agg_value35;
/* 141 */       // update aggregate buffer
/* 142 */       agg_aggBuffer23.setLong(0, agg_value25);
/* 145 */     }
/* 148 */     agg_mapIter4 = agg_plan1.finishAggregate(agg_hashMap2, agg_sorter3);
/* 149 */   }
/* 152 */   public GeneratedIterator(Object[] references) {
/* 153 */     this.references = references;
/* 154 */     agg_initAgg0 = false;
/* 155 */     this.agg_plan1 = (org.apache.spark.sql.execution.aggregate.TungstenAggregate) references[0];
/* 156 */     agg_hashMap2 = agg_plan1.createHashMap();

/* 159 */     range_initRange6 = false;
/* 160 */     range_partitionEnd7 = 0L;
/* 161 */     range_number8 = 0L;
/* 162 */     range_overflow9 = false;
/* 163 */     agg_result19 = new UnsafeRow(1);
/* 164 */     this.agg_holder20 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result19, 0);
/* 165 */     this.agg_rowWriter21 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder20, 1);
/* 166 */     agg_count38 = 0;
/* 167 */     agg_unsafeRowJoiner41 = agg_plan1.createUnsafeJoiner();
/* 168 */   }
/* 169 */
/* 170 */   protected void processNext() throws java.io.IOException {
/* 171 */
/* 172 */     if (!agg_initAgg0) {
/* 173 */       agg_initAgg0 = true;
/* 174 */       agg_doAggregateWithKeys5();
/* 175 */     }
/* 176 */
/* 177 */     // output the result
/* 178 */     while (agg_mapIter4.next()) {
/* 179 */       UnsafeRow agg_aggKey39 = (UnsafeRow) agg_mapIter4.getKey();
/* 180 */       UnsafeRow agg_aggBuffer40 = (UnsafeRow) agg_mapIter4.getValue();
/* 182 */       UnsafeRow agg_resultRow42 = agg_unsafeRowJoiner41.join(agg_aggKey39, agg_aggBuffer40);
/* 184 */       currentRow = agg_resultRow42;
/* 185 */       return;
/* 188 */     }
/* 189 */
/* 190 */     agg_mapIter4.close();
/* 191 */     if (agg_sorter3 == null) {
/* 192 */       agg_hashMap2.free();
/* 193 */     }
/* 195 */   }
/* 196 */ }
/* 197 */

s"""
// generate grouping key
${keyCode.code}
UnsafeRow $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key);
UnsafeRow $buffer = null;
if ($checkFallback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this. You seem to do a look up twice and line (539) . Is that intentional?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm. I get it now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is confusing we should consider adding documentations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments.

@nongli
Copy link
Contributor

nongli commented Feb 2, 2016

In the generated code, what is agg_count38 used for?

@nongli
Copy link
Contributor

nongli commented Feb 2, 2016

This naming is getting really hard to follow. We should as a follow up make the variable names better. In this case for example, it should be clear if the value's are input values or intermediate result values.

@davies
Copy link
Contributor Author

davies commented Feb 2, 2016

@nongli agg_count38 is used for checking fallback (only for testing).

@nongli
Copy link
Contributor

nongli commented Feb 2, 2016

@davies If this debugging is generally useful to have, can comment it in the code and possibly update the variable to be more specific than "count". If this is a one off you used for this, let's remove it.

@SparkQA
Copy link

SparkQA commented Feb 2, 2016

Test build #50598 has finished for PR 10998 at commit 0c52897.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nongli
Copy link
Contributor

nongli commented Feb 3, 2016

LGTM

@SparkQA
Copy link

SparkQA commented Feb 3, 2016

Test build #2497 has finished for PR 10998 at commit 0c52897.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Feb 3, 2016

Merged into master.

@asfgit asfgit closed this in 99a6e3c Feb 3, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants