forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pull apache spark #9
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## What changes were proposed in this pull request? Optimization of arbitrary Scala sequence deserialization introduced by #16240. The previous implementation constructed an array which was then converted by `to`. This required two passes in most cases. This implementation attempts to remedy that by using `Builder`s provided by the `newBuilder` method on every Scala collection's companion object to build the resulting collection directly. Example codegen for simple `List` (obtained using `Seq(List(1)).toDS().map(identity).queryExecution.debug.codegen`): Before: ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private boolean deserializetoobject_resultIsNull; /* 010 */ private java.lang.Object[] deserializetoobject_argValue; /* 011 */ private boolean MapObjects_loopIsNull1; /* 012 */ private int MapObjects_loopValue0; /* 013 */ private boolean deserializetoobject_resultIsNull1; /* 014 */ private scala.collection.generic.CanBuildFrom deserializetoobject_argValue1; /* 015 */ private UnsafeRow deserializetoobject_result; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 018 */ private scala.collection.immutable.List mapelements_argValue; /* 019 */ private UnsafeRow mapelements_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 022 */ private scala.collection.immutable.List serializefromobject_argValue; /* 023 */ private UnsafeRow serializefromobject_result; /* 024 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 025 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter; /* 027 */ /* 028 */ public GeneratedIterator(Object[] references) { /* 029 */ this.references = references; /* 030 */ } /* 031 */ /* 032 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 033 */ partitionIndex = index; /* 034 */ this.inputs = inputs; /* 035 */ inputadapter_input = inputs[0]; /* 036 */ /* 037 */ deserializetoobject_result = new UnsafeRow(1); /* 038 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32); /* 039 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 040 */ /* 041 */ mapelements_result = new UnsafeRow(1); /* 042 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32); /* 043 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 044 */ /* 045 */ serializefromobject_result = new UnsafeRow(1); /* 046 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32); /* 047 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 048 */ this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 049 */ /* 050 */ } /* 051 */ /* 052 */ protected void processNext() throws java.io.IOException { /* 053 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 054 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 055 */ ArrayData inputadapter_value = inputadapter_row.getArray(0); /* 056 */ /* 057 */ deserializetoobject_resultIsNull = false; /* 058 */ /* 059 */ if (!deserializetoobject_resultIsNull) { /* 060 */ ArrayData deserializetoobject_value3 = null; /* 061 */ /* 062 */ if (!false) { /* 063 */ Integer[] deserializetoobject_convertedArray = null; /* 064 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 065 */ deserializetoobject_convertedArray = new Integer[deserializetoobject_dataLength]; /* 066 */ /* 067 */ int deserializetoobject_loopIndex = 0; /* 068 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 069 */ MapObjects_loopValue0 = (int) (inputadapter_value.getInt(deserializetoobject_loopIndex)); /* 070 */ MapObjects_loopIsNull1 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 071 */ /* 072 */ if (MapObjects_loopIsNull1) { /* 073 */ throw new RuntimeException(((java.lang.String) references[0])); /* 074 */ } /* 075 */ if (false) { /* 076 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null; /* 077 */ } else { /* 078 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue0; /* 079 */ } /* 080 */ /* 081 */ deserializetoobject_loopIndex += 1; /* 082 */ } /* 083 */ /* 084 */ deserializetoobject_value3 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /* 085 */ } /* 086 */ boolean deserializetoobject_isNull2 = true; /* 087 */ java.lang.Object[] deserializetoobject_value2 = null; /* 088 */ if (!false) { /* 089 */ deserializetoobject_isNull2 = false; /* 090 */ if (!deserializetoobject_isNull2) { /* 091 */ Object deserializetoobject_funcResult = null; /* 092 */ deserializetoobject_funcResult = deserializetoobject_value3.array(); /* 093 */ if (deserializetoobject_funcResult == null) { /* 094 */ deserializetoobject_isNull2 = true; /* 095 */ } else { /* 096 */ deserializetoobject_value2 = (java.lang.Object[]) deserializetoobject_funcResult; /* 097 */ } /* 098 */ /* 099 */ } /* 100 */ deserializetoobject_isNull2 = deserializetoobject_value2 == null; /* 101 */ } /* 102 */ deserializetoobject_resultIsNull = deserializetoobject_isNull2; /* 103 */ deserializetoobject_argValue = deserializetoobject_value2; /* 104 */ } /* 105 */ /* 106 */ boolean deserializetoobject_isNull1 = deserializetoobject_resultIsNull; /* 107 */ final scala.collection.Seq deserializetoobject_value1 = deserializetoobject_resultIsNull ? null : scala.collection.mutable.WrappedArray.make(deserializetoobject_argValue); /* 108 */ deserializetoobject_isNull1 = deserializetoobject_value1 == null; /* 109 */ boolean deserializetoobject_isNull = true; /* 110 */ scala.collection.immutable.List deserializetoobject_value = null; /* 111 */ if (!deserializetoobject_isNull1) { /* 112 */ deserializetoobject_resultIsNull1 = false; /* 113 */ /* 114 */ if (!deserializetoobject_resultIsNull1) { /* 115 */ boolean deserializetoobject_isNull6 = false; /* 116 */ final scala.collection.generic.CanBuildFrom deserializetoobject_value6 = false ? null : scala.collection.immutable.List.canBuildFrom(); /* 117 */ deserializetoobject_isNull6 = deserializetoobject_value6 == null; /* 118 */ deserializetoobject_resultIsNull1 = deserializetoobject_isNull6; /* 119 */ deserializetoobject_argValue1 = deserializetoobject_value6; /* 120 */ } /* 121 */ /* 122 */ deserializetoobject_isNull = deserializetoobject_resultIsNull1; /* 123 */ if (!deserializetoobject_isNull) { /* 124 */ Object deserializetoobject_funcResult1 = null; /* 125 */ deserializetoobject_funcResult1 = deserializetoobject_value1.to(deserializetoobject_argValue1); /* 126 */ if (deserializetoobject_funcResult1 == null) { /* 127 */ deserializetoobject_isNull = true; /* 128 */ } else { /* 129 */ deserializetoobject_value = (scala.collection.immutable.List) deserializetoobject_funcResult1; /* 130 */ } /* 131 */ /* 132 */ } /* 133 */ deserializetoobject_isNull = deserializetoobject_value == null; /* 134 */ } /* 135 */ /* 136 */ boolean mapelements_isNull = true; /* 137 */ scala.collection.immutable.List mapelements_value = null; /* 138 */ if (!false) { /* 139 */ mapelements_argValue = deserializetoobject_value; /* 140 */ /* 141 */ mapelements_isNull = false; /* 142 */ if (!mapelements_isNull) { /* 143 */ Object mapelements_funcResult = null; /* 144 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 145 */ if (mapelements_funcResult == null) { /* 146 */ mapelements_isNull = true; /* 147 */ } else { /* 148 */ mapelements_value = (scala.collection.immutable.List) mapelements_funcResult; /* 149 */ } /* 150 */ /* 151 */ } /* 152 */ mapelements_isNull = mapelements_value == null; /* 153 */ } /* 154 */ /* 155 */ if (mapelements_isNull) { /* 156 */ throw new RuntimeException(((java.lang.String) references[2])); /* 157 */ } /* 158 */ serializefromobject_argValue = mapelements_value; /* 159 */ /* 160 */ final ArrayData serializefromobject_value = false ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue); /* 161 */ serializefromobject_holder.reset(); /* 162 */ /* 163 */ // Remember the current cursor so that we can calculate how many bytes are /* 164 */ // written later. /* 165 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 166 */ /* 167 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 168 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 169 */ // grow the global buffer before writing data. /* 170 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 171 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 172 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 173 */ /* 174 */ } else { /* 175 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 176 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4); /* 177 */ /* 178 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 179 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 180 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index); /* 181 */ } else { /* 182 */ final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index); /* 183 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 184 */ } /* 185 */ } /* 186 */ } /* 187 */ /* 188 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 189 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 190 */ append(serializefromobject_result); /* 191 */ if (shouldStop()) return; /* 192 */ } /* 193 */ } /* 194 */ } ``` After: ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private boolean CollectObjects_loopIsNull1; /* 010 */ private int CollectObjects_loopValue0; /* 011 */ private UnsafeRow deserializetoobject_result; /* 012 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 013 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 014 */ private scala.collection.immutable.List mapelements_argValue; /* 015 */ private UnsafeRow mapelements_result; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 018 */ private scala.collection.immutable.List serializefromobject_argValue; /* 019 */ private UnsafeRow serializefromobject_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter; /* 023 */ /* 024 */ public GeneratedIterator(Object[] references) { /* 025 */ this.references = references; /* 026 */ } /* 027 */ /* 028 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 029 */ partitionIndex = index; /* 030 */ this.inputs = inputs; /* 031 */ inputadapter_input = inputs[0]; /* 032 */ /* 033 */ deserializetoobject_result = new UnsafeRow(1); /* 034 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32); /* 035 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 036 */ /* 037 */ mapelements_result = new UnsafeRow(1); /* 038 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32); /* 039 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 040 */ /* 041 */ serializefromobject_result = new UnsafeRow(1); /* 042 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32); /* 043 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 044 */ this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 045 */ /* 046 */ } /* 047 */ /* 048 */ protected void processNext() throws java.io.IOException { /* 049 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 050 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 051 */ ArrayData inputadapter_value = inputadapter_row.getArray(0); /* 052 */ /* 053 */ scala.collection.immutable.List deserializetoobject_value = null; /* 054 */ /* 055 */ if (!false) { /* 056 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 057 */ scala.collection.mutable.Builder CollectObjects_builderValue2 = scala.collection.immutable.List$.MODULE$.newBuilder(); /* 058 */ CollectObjects_builderValue2.sizeHint(deserializetoobject_dataLength); /* 059 */ /* 060 */ int deserializetoobject_loopIndex = 0; /* 061 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 062 */ CollectObjects_loopValue0 = (int) (inputadapter_value.getInt(deserializetoobject_loopIndex)); /* 063 */ CollectObjects_loopIsNull1 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 064 */ /* 065 */ if (CollectObjects_loopIsNull1) { /* 066 */ throw new RuntimeException(((java.lang.String) references[0])); /* 067 */ } /* 068 */ if (false) { /* 069 */ CollectObjects_builderValue2.$plus$eq(null); /* 070 */ } else { /* 071 */ CollectObjects_builderValue2.$plus$eq(CollectObjects_loopValue0); /* 072 */ } /* 073 */ /* 074 */ deserializetoobject_loopIndex += 1; /* 075 */ } /* 076 */ /* 077 */ deserializetoobject_value = (scala.collection.immutable.List) CollectObjects_builderValue2.result(); /* 078 */ } /* 079 */ /* 080 */ boolean mapelements_isNull = true; /* 081 */ scala.collection.immutable.List mapelements_value = null; /* 082 */ if (!false) { /* 083 */ mapelements_argValue = deserializetoobject_value; /* 084 */ /* 085 */ mapelements_isNull = false; /* 086 */ if (!mapelements_isNull) { /* 087 */ Object mapelements_funcResult = null; /* 088 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 089 */ if (mapelements_funcResult == null) { /* 090 */ mapelements_isNull = true; /* 091 */ } else { /* 092 */ mapelements_value = (scala.collection.immutable.List) mapelements_funcResult; /* 093 */ } /* 094 */ /* 095 */ } /* 096 */ mapelements_isNull = mapelements_value == null; /* 097 */ } /* 098 */ /* 099 */ if (mapelements_isNull) { /* 100 */ throw new RuntimeException(((java.lang.String) references[2])); /* 101 */ } /* 102 */ serializefromobject_argValue = mapelements_value; /* 103 */ /* 104 */ final ArrayData serializefromobject_value = false ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue); /* 105 */ serializefromobject_holder.reset(); /* 106 */ /* 107 */ // Remember the current cursor so that we can calculate how many bytes are /* 108 */ // written later. /* 109 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 110 */ /* 111 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 112 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 113 */ // grow the global buffer before writing data. /* 114 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 115 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 116 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 117 */ /* 118 */ } else { /* 119 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 120 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4); /* 121 */ /* 122 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 123 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 124 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index); /* 125 */ } else { /* 126 */ final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index); /* 127 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 128 */ } /* 129 */ } /* 130 */ } /* 131 */ /* 132 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 133 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 134 */ append(serializefromobject_result); /* 135 */ if (shouldStop()) return; /* 136 */ } /* 137 */ } /* 138 */ } ``` Benchmark results before: ``` OpenJDK 64-Bit Server VM 1.8.0_112-b15 on Linux 4.8.13-1-ARCH AMD A10-4600M APU with Radeon(tm) HD Graphics collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Seq 269 / 370 0.0 269125.8 1.0X List 154 / 176 0.0 154453.5 1.7X mutable.Queue 210 / 233 0.0 209691.6 1.3X ``` Benchmark results after: ``` OpenJDK 64-Bit Server VM 1.8.0_112-b15 on Linux 4.8.13-1-ARCH AMD A10-4600M APU with Radeon(tm) HD Graphics collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Seq 255 / 316 0.0 254697.3 1.0X List 152 / 177 0.0 152410.0 1.7X mutable.Queue 213 / 235 0.0 213470.0 1.2X ``` ## How was this patch tested? ```bash ./build/mvn -DskipTests clean package && ./dev/run-tests ``` Additionally in Spark Shell: ```scala case class QueueClass(q: scala.collection.immutable.Queue[Int]) spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect ``` Author: Michal Senkyr <[email protected]> Closes #16541 from michalsenkyr/dataset-seq-builder.
…cRedactionSuite ### What changes were proposed in this pull request? Changed the pattern to match the first n characters in the location field so that the string truncation does not affect it. ### How was this patch tested? N/A Author: Xiao Li <[email protected]> Closes #17448 from gatorsmile/fixTestCAse.
## What changes were proposed in this pull request? TPCDS q45 fails becuase: `ReorderJoin` collects all predicates and try to put them into join condition when creating ordered join. If a predicate with an IN subquery (`ListQuery`) is in a join condition instead of a filter condition, `RewritePredicateSubquery.rewriteExistentialExpr` would fail to convert the subquery to an `ExistenceJoin`, and thus result in error. We should prevent push down of IN subquery to Join operator. ## How was this patch tested? Add a new test case in `FilterPushdownSuite`. Author: wangzhenhua <[email protected]> Closes #17428 from wzhfy/noSubqueryInJoinCond.
…roject attributes ## What changes were proposed in this pull request? Join reorder algorithm should keep exactly the same order of output attributes in the top project. For example, if user want to select a, b, c, after reordering, we should output a, b, c in the same order as specified by user, instead of b, a, c or other orders. ## How was this patch tested? A new test case is added in `JoinReorderSuite`. Author: wangzhenhua <[email protected]> Closes #17453 from wzhfy/keepOrderInProject.
## What changes were proposed in this pull request? Commit ea36116 moved most of the logic from the SessionState classes into an accompanying builder. This makes the existence of the `HiveSessionState` redundant. This PR removes the `HiveSessionState`. ## How was this patch tested? Existing tests. Author: Herman van Hovell <[email protected]> Closes #17457 from hvanhovell/SPARK-20126.
…g of tokens in yarn client mode ## What changes were proposed in this pull request? In the current Spark on YARN code, we will obtain tokens from provided services, but we're not going to add these tokens to the current user's credentials. This will make all the following operations to these services still require TGT rather than delegation tokens. This is unnecessary since we already got the tokens, also this will lead to failure in user impersonation scenario, because the TGT is granted by real user, not proxy user. So here changing to put all the tokens to the current UGI, so that following operations to these services will honor tokens rather than TGT, and this will further handle the proxy user issue mentioned above. ## How was this patch tested? Local verified in secure cluster. vanzin tgravescs mridulm dongjoon-hyun please help to review, thanks a lot. Author: jerryshao <[email protected]> Closes #17335 from jerryshao/SPARK-19995.
## What changes were proposed in this pull request? When we build the deserializer expression for map type, we will use `StaticInvoke` to call `ArrayBasedMapData.toScalaMap`, and declare the return type as `scala.collection.immutable.Map`. If the map is inside an Option, we will wrap this `StaticInvoke` with `WrapOption`, which requires the input to be `scala.collect.Map`. Ideally this should be fine, as `scala.collection.immutable.Map` extends `scala.collect.Map`, but our `ObjectType` is too strict about this, this PR fixes it. ## How was this patch tested? new regression test Author: Wenchen Fan <[email protected]> Closes #17454 from cloud-fan/map.
## What changes were proposed in this pull request? We must set the taskset to zombie before the DAGScheduler handles the taskEnded event. It's possible the taskEnded event will cause the DAGScheduler to launch a new stage attempt (this happens when map output data was lost), and if this happens before the taskSet has been set to zombie, it will appear that we have conflicting task sets. Author: liujianhui <liujianhui@didichuxing> Closes #17208 from liujianhuiouc/spark-19868.
… for uppercase impurity type Gini Fix bug: DecisionTreeModel can't recongnize Impurity "Gini" when loading TODO: + [x] add unit test + [x] fix the bug Author: 颜发才(Yan Facai) <[email protected]> Closes #17407 from facaiy/BUG/decision_tree_loader_failer_with_Gini_impurity.
## What changes were proposed in this pull request? A pyspark wrapper for spark.ml.stat.ChiSquareTest ## How was this patch tested? unit tests doctests Author: Bago Amirbekian <[email protected]> Closes #17421 from MrBago/chiSquareTestWrapper.
…ver side metric updates ## What changes were proposed in this pull request? It is not super intuitive how to update SQLMetric on the driver side. This patch introduces a new SQLMetrics.postDriverMetricUpdates function to do that, and adds documentation to make it more obvious. ## How was this patch tested? Updated a test case to use this method. Author: Reynold Xin <[email protected]> Closes #17464 from rxin/SPARK-20134.
This change modifies the way block data is encrypted to make the more common cases faster, while penalizing an edge case. As a side effect of the change, all data that goes through the block manager is now encrypted only when needed, including the previous path (broadcast variables) where that did not happen. The way the change works is by not encrypting data that is stored in memory; so if a serialized block is in memory, it will only be encrypted once it is evicted to disk. The penalty comes when transferring that encrypted data from disk. If the data ends up in memory again, it is as efficient as before; but if the evicted block needs to be transferred directly to a remote executor, then there's now a performance penalty, since the code now uses a custom FileRegion implementation to decrypt the data before transferring. This also means that block data transferred between executors now is not encrypted (and thus relies on the network library encryption support for secrecy). Shuffle blocks are still transferred in encrypted form, since they're handled in a slightly different way by the code. This also keeps compatibility with existing external shuffle services, which transfer encrypted shuffle blocks, and avoids having to make the external service aware of encryption at all. The serialization and deserialization APIs in the SerializerManager now do not do encryption automatically; callers need to explicitly wrap their streams with an appropriate crypto stream before using those. As a result of these changes, some of the workarounds added in SPARK-19520 are removed here. Testing: a new trait ("EncryptionFunSuite") was added that provides an easy way to run a test twice, with encryption on and off; broadcast, block manager and caching tests were modified to use this new trait so that the existing tests exercise both encrypted and non-encrypted paths. I also ran some applications with encryption turned on to verify that they still work, including streaming tests that failed without the fix for SPARK-19520. Author: Marcelo Vanzin <[email protected]> Closes #17295 from vanzin/SPARK-19556.
…ovider ## What changes were proposed in this pull request? Currently we use system classloader to find HBase jars, if it is specified by `--jars`, then it will be failed with ClassNotFound issue. So here changing to use child classloader. Also putting added jars and main jar into classpath of submitted application in yarn cluster mode, otherwise HBase jars specified with `--jars` will never be honored in cluster mode, and fetching tokens in client side will always be failed. ## How was this patch tested? Unit test and local verification. Author: jerryshao <[email protected]> Closes #17388 from jerryshao/SPARK-20059.
## What changes were proposed in this pull request? Allow Jenkins Python tests to use the installed conda to test Python 2.7 support & test pip installability. ## How was this patch tested? Updated shell scripts, ran tests locally with installed conda, ran tests in Jenkins. Author: Holden Karau <[email protected]> Closes #17355 from holdenk/SPARK-19955-support-python-tests-with-conda.
…n listeners ## What changes were proposed in this pull request? Bugfix from [SPARK-19540.](#16826) Cloning SessionState does not clone query execution listeners, so cloned session is unable to listen to events on queries. ## How was this patch tested? - Unit test Author: Kunal Khamar <[email protected]> Closes #17379 from kunalkhamar/clone-bugfix.
…ns.from_json ## What changes were proposed in this pull request? This pr added `StructType.fromDDL` to convert a DDL format string into `StructType` for defining schemas in `functions.from_json`. ## How was this patch tested? Added tests in `JsonFunctionsSuite`. Author: Takeshi Yamamuro <[email protected]> Closes #17406 from maropu/SPARK-20009.
### What changes were proposed in this pull request? `FalseLiteral` and `TrueLiteral` should have been eliminated by optimizer rule `BooleanSimplification`, but null literals might be added by optimizer rule `NullPropagation`. For safety, our filter estimation should handle all the eligible literal cases. Our optimizer rule BooleanSimplification is unable to remove the null literal in many cases. For example, `a < 0 or null`. Thus, we need to handle null literal in filter estimation. `Not` can be pushed down below `And` and `Or`. Then, we could see two consecutive `Not`, which need to be collapsed into one. Because of the limited expression support for filter estimation, we just need to handle the case `Not(null)` for avoiding incorrect error due to the boolean operation on null. For details, see below matrix. ``` not NULL = NULL NULL or false = NULL NULL or true = true NULL or NULL = NULL NULL and false = false NULL and true = NULL NULL and NULL = NULL ``` ### How was this patch tested? Added the test cases. Author: Xiao Li <[email protected]> Closes #17446 from gatorsmile/constantFilterEstimation.
## What changes were proposed in this pull request? It is similar to Hive silent mode, just show the query result. see: [Hive LanguageManual+Cli](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli) and [the implementation of Hive silent mode](https://github.com/apache/hive/blob/release-1.2.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L948-L950). This PR set the Logger level to `WARN` to get similar result. ## How was this patch tested? manual tests ![manual test spark sql silent mode](https://cloud.githubusercontent.com/assets/5399861/24390165/989b7780-13b9-11e7-8496-6e68f55757e3.gif) Author: Yuming Wang <[email protected]> Closes #17449 from wangyum/SPARK-20120.
## What changes were proposed in this pull request? Commit 6c70a38 broke the build for scala 2.10. The commit uses some reflections which are not available in Scala 2.10. This PR fixes them. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <[email protected]> Closes #17473 from ueshin/issues/SPARK-19088.
## What changes were proposed in this pull request? The column comment was missing while constructing the Hive TableSchema. This fix will preserve the original comment. ## How was this patch tested? I have added a new test case to test the column with/without comment. Author: bomeng <[email protected]> Closes #17470 from bomeng/SPARK-20146.
…n operator metrics ## What changes were proposed in this pull request? This patch adds explicit metadata operation timing and number of files in data source metrics. Those would be useful to include for performance profiling. Screenshot of a UI with this change (num files and metadata time are new metrics): <img width="321" alt="screen shot 2017-03-29 at 12 29 28 am" src="https://cloud.githubusercontent.com/assets/323388/24443272/d4ea58c0-1416-11e7-8940-ecb69375554a.png"> ## How was this patch tested? N/A Author: Reynold Xin <[email protected]> Closes #17465 from rxin/SPARK-20136.
… task commit messages ## What changes were proposed in this pull request? The internal FileCommitProtocol interface returns all task commit messages in bulk to the implementation when a job finishes. However, it is sometimes useful to access those messages before the job completes, so that the driver gets incremental progress updates before the job finishes. This adds an `onTaskCommit` listener to the internal api. ## How was this patch tested? Unit tests. cc rxin Author: Eric Liang <[email protected]> Closes #17475 from ericl/file-commit-api-ext.
## What changes were proposed in this pull request? There are two examples in r folder missing the run commands. In this PR, I just add the missing comment, which is consistent with other examples. ## How was this patch tested? Manual test. Author: [email protected] <[email protected]> Closes #17474 from wangmiao1981/stat.
…orithm.version option to configuration.md ## What changes were proposed in this pull request? Add `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version` option to `configuration.md`. Set `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2` can speed up [HadoopMapReduceCommitProtocol.commitJob](https://github.com/apache/spark/blob/v2.1.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L121) for many output files. All cloudera's hadoop 2.6.0-cdh5.4.0 or higher versions(see: https://github.com/cloudera/hadoop-common/commit/1c1236182304d4075276c00c4592358f428bc433 and https://github.com/cloudera/hadoop-common/commit/16b2de27321db7ce2395c08baccfdec5562017f0) and apache's hadoop 2.7.0 or higher versions support this improvement. More see: 1. [MAPREDUCE-4815](https://issues.apache.org/jira/browse/MAPREDUCE-4815): Speed up FileOutputCommitter#commitJob for many output files. 2. [MAPREDUCE-6406](https://issues.apache.org/jira/browse/MAPREDUCE-6406): Update the default version for the property mapreduce.fileoutputcommitter.algorithm.version to 2. ## How was this patch tested? Manual test and exist tests. Author: Yuming Wang <[email protected]> Closes #17442 from wangyum/SPARK-20107.
## What changes were proposed in this pull request? Implementations of strategies for resilient block replication for different resource managers that replicate the 3-replica strategy used by HDFS, where the first replica is on an executor, the second replica within the same rack as the executor and a third replica on a different rack. The implementation involves providing two pluggable classes, one running in the driver that provides topology information for every host at cluster start and the second prioritizing a list of peer BlockManagerIds. The prioritization itself can be thought of an optimization problem to find a minimal set of peers that satisfy certain objectives and replicating to these peers first. The objectives can be used to express richer constraints over and above HDFS like 3-replica strategy. ## How was this patch tested? This patch was tested with unit tests for storage, along with new unit tests to verify prioritization behaviour. Author: Shubham Chopra <[email protected]> Closes #13932 from shubhamchopra/PrioritizerStrategy.
…adoc ## What changes were proposed in this pull request? Use recommended values for row boundaries in Window's scaladoc, i.e. `Window.unboundedPreceding`, `Window.unboundedFollowing`, and `Window.currentRow` (that were introduced in 2.1.0). ## How was this patch tested? Local build Author: Jacek Laskowski <[email protected]> Closes #17417 from jaceklaskowski/window-expression-scaladoc.
…as supporting unaligned access java.nio.Bits.unaligned() does not return true for the ppc64le arch. see https://bugs.openjdk.java.net/browse/JDK-8165231 ## What changes were proposed in this pull request? check architecture ## How was this patch tested? unit test Author: samelamin <[email protected]> Author: samelamin <[email protected]> Closes #17472 from samelamin/SPARK-19999.
…ll if set by --conf or configure file ## What changes were proposed in this pull request? while submit apps with -v or --verbose, we can print the right queue name, but if we set a queue name with `spark.yarn.queue` by --conf or in the spark-default.conf, we just got `null` for the queue in Parsed arguments. ``` bin/spark-shell -v --conf spark.yarn.queue=thequeue Using properties file: /home/hadoop/spark-2.1.0-bin-apache-hdp2.7.3/conf/spark-defaults.conf .... Adding default property: spark.yarn.queue=default Parsed arguments: master yarn deployMode client ... queue null .... verbose true Spark properties used, including those specified through --conf and those from the properties file /home/hadoop/spark-2.1.0-bin-apache-hdp2.7.3/conf/spark-defaults.conf: spark.yarn.queue -> thequeue .... ``` ## How was this patch tested? ut and local verify Author: Kent Yao <[email protected]> Closes #17430 from yaooqinn/SPARK-20096.
…tion Fixed a few typos. There is one more I'm not sure of: ``` Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in `withWatermark()` as by the modes semantics, rows can be added to the Result Table only once after they are ``` Not sure how to change `is delayed the late threshold`. Author: Seigneurin, Alexis (CONT) <[email protected]> Closes #17443 from aseigneurin/typos.
…eported Intellij IDEA ## What changes were proposed in this pull request? Few changes related to Intellij IDEA inspection. ## How was this patch tested? Changes were tested by existing unit tests Author: Denis Bolshakov <[email protected]> Closes #17458 from dbolshak/SPARK-20127.
## What changes were proposed in this pull request? MLlib ```LogisticRegression``` should support bound constrained optimization (only for L2 regularization). Users can add bound constraints to coefficients to make the solver produce solution in the specified range. Under the hood, we call Breeze [```L-BFGS-B```](https://github.com/scalanlp/breeze/blob/master/math/src/main/scala/breeze/optimize/LBFGSB.scala) as the solver for bound constrained optimization. But in the current breeze implementation, there are some bugs in L-BFGS-B, and scalanlp/breeze#633 fixed them. We need to upgrade dependent breeze later, and currently we use the workaround L-BFGS-B in this PR temporary for reviewing. ## How was this patch tested? Unit tests. Author: Yanbo Liang <[email protected]> Closes #17715 from yanboliang/spark-20047.
…x the potential hang in CachedKafkaConsumer ## What changes were proposed in this pull request? This PR changes Executor's threads to `UninterruptibleThread` so that we can use `runUninterruptibly` in `CachedKafkaConsumer`. However, this is just best effort to avoid hanging forever. If the user uses`CachedKafkaConsumer` in another thread (e.g., create a new thread or Future), the potential hang may still happen. ## How was this patch tested? The new added test. Author: Shixiong Zhu <[email protected]> Closes #17761 from zsxwing/int.
…ion for batch Kafka DataFrame ## What changes were proposed in this pull request? Cancel a batch Kafka query but one of task cannot be cancelled, and rerun the same DataFrame may cause ConcurrentModificationException because it may launch two tasks sharing the same group id. This PR always create a new consumer when `reuseKafkaConsumer = false` to avoid ConcurrentModificationException. It also contains other minor fixes. ## How was this patch tested? Jenkins. Author: Shixiong Zhu <[email protected]> Closes #17752 from zsxwing/kafka-fix.
…xecutor side ## What changes were proposed in this pull request? When sending accumulator updates back to driver, the network overhead is pretty big as there are a lot of accumulators, e.g. `TaskMetrics` will send about 20 accumulators everytime, there may be a lot of `SQLMetric` if the query plan is complicated. Therefore, it's critical to reduce the size of serialized accumulator. A simple way is to not send the name of internal accumulators to executor side, as it's unnecessary. When executor sends accumulator updates back to driver, we can look up the accumulator name in `AccumulatorContext` easily. Note that, we still need to send names of normal accumulators, as the user code run at executor side may rely on accumulator names. In the future, we should reimplement `TaskMetrics` to not rely on accumulators and use custom serialization. Tried on the example in https://issues.apache.org/jira/browse/SPARK-12837, the size of serialized accumulator has been cut down by about 40%. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes #17596 from cloud-fan/oom.
## What changes were proposed in this pull request? add link to svmLinear in the SparkR programming document. ## How was this patch tested? Build doc manually and click the link to the document. It looks good. Author: wangmiao1981 <[email protected]> Closes #17797 from wangmiao1981/doc.
…he column names ### What changes were proposed in this pull request? ```SQL hive> create table t1(`a,` string); OK Time taken: 1.399 seconds hive> create table t2(`a,` string, b string); FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.RuntimeException: MetaException(message:org.apache.hadoop.hive.serde2.SerDeException org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 3 elements while columns.types has 2 elements!) hive> create table t2(`a,` string, b string) stored as parquet; FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.IllegalArgumentException: ParquetHiveSerde initialization failed. Number of column name and column type differs. columnNames = [a, , b], columnTypes = [string, string] ``` It has a bug in Hive metastore. When users do not provide alias name in the SELECT query, we call `toPrettySQL` to generate the alias name. For example, the string `get_json_object(jstring, '$.f1')` will be the alias name for the function call in the statement ```SQL SELECT key, get_json_object(jstring, '$.f1') FROM tempView ``` Above is not an issue for the SELECT query statements. However, for CTAS, we hit the issue due to a bug in Hive metastore. Hive metastore does not like the column names containing commas and returned a confusing error message, like: ``` 17/04/26 23:12:56 ERROR [hive.log(397) -- main]: error in initSerDe: org.apache.hadoop.hive.serde2.SerDeException org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements while columns.types has 1 elements! org.apache.hadoop.hive.serde2.SerDeException: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements while columns.types has 1 elements! ``` Thus, this PR is to block users to create a table in Hive metastore when the table table has a column containing commas in the name. ### How was this patch tested? Added a test case Author: Xiao Li <[email protected]> Closes #17781 from gatorsmile/blockIllegalColumnNames.
## What changes were proposed in this pull request? This pr added a new rule in `Analyzer` to resolve aliases in `GROUP BY`. The current master throws an exception if `GROUP BY` clauses have aliases in `SELECT`; ``` scala> spark.sql("select a a1, a1 + 1 as b, count(1) from t group by a1") org.apache.spark.sql.AnalysisException: cannot resolve '`a1`' given input columns: [a]; line 1 pos 51; 'Aggregate ['a1], [a#83L AS a1#87L, ('a1 + 1) AS b#88, count(1) AS count(1)#90L] +- SubqueryAlias t +- Project [id#80L AS a#83L] +- Range (0, 10, step=1, splits=Some(8)) at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) ``` ## How was this patch tested? Added tests in `SQLQuerySuite` and `SQLQueryTestSuite`. Author: Takeshi Yamamuro <[email protected]> Closes #17191 from maropu/SPARK-14471.
… could not be got ## What changes were proposed in this pull request? This PR proposes to throw an exception with better message rather than `ArrayIndexOutOfBoundsException` when temp directories could not be created. Running the commands below: ```bash ./bin/spark-shell --conf spark.local.dir=/NONEXISTENT_DIR_ONE,/NONEXISTENT_DIR_TWO ``` produces ... **Before** ``` Exception in thread "main" java.lang.ExceptionInInitializerError ... Caused by: java.lang.ArrayIndexOutOfBoundsException: 0 ... ``` **After** ``` Exception in thread "main" java.lang.ExceptionInInitializerError ... Caused by: java.io.IOException: Failed to get a temp directory under [/NONEXISTENT_DIR_ONE,/NONEXISTENT_DIR_TWO]. ... ``` ## How was this patch tested? Unit tests in `LocalDirsSuite.scala`. Author: hyukjinkwon <[email protected]> Closes #17768 from HyukjinKwon/throws-temp-dir-exception.
## What changes were proposed in this pull request? We didn't enforce analyzed plans in Spark 2.1 when writing out to Kafka. ## How was this patch tested? New unit test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Bill Chambers <[email protected]> Closes #17804 from anabranch/SPARK-20496-2.
Upgrade Jetty so it can work with Hadoop 3 (alpha 2 release, in particular). Without this change, because of incompatibily between Jetty versions, Spark fails to compile when built against Hadoop 3 ## How was this patch tested? Unit tests being run. Author: Mark Grover <[email protected]> Closes #17790 from markgrover/spark-20514.
…hashmap is disabled but vectorized hashmap is enabled What changes were proposed in this pull request? remove AggregateBenchmark testsuite warning: such as '14:26:33.220 WARN org.apache.spark.sql.execution.aggregate.HashAggregateExec: Two level hashmap is disabled but vectorized hashmap is enabled.' How was this patch tested? unit tests: AggregateBenchmark Modify the 'ignore function for 'test funtion Author: caoxuewen <[email protected]> Closes #17771 from heary-cao/AggregateBenchmark.
## What changes were proposed in this pull request? This PR adds RDD checkpoint compression support and add a new config `spark.checkpoint.compress` to enable/disable it. Credit goes to aramesh117 Closes #17024 ## How was this patch tested? The new unit test. Author: Shixiong Zhu <[email protected]> Author: Aaditya Ramesh <[email protected]> Closes #17789 from zsxwing/pr17024.
…ned plan ## What changes were proposed in this pull request? This was a suggestion by rxin at #17780 (comment) ## How was this patch tested? - modified existing unit test - manual testing: ``` scala> hc.sql(" SELECT * FROM tejasp_bucketed_partitioned_1 where name = '' ").explain(true) == Parsed Logical Plan == 'Project [*] +- 'Filter ('name = ) +- 'UnresolvedRelation `tejasp_bucketed_partitioned_1` == Analyzed Logical Plan == user_id: bigint, name: string, ds: string Project [user_id#24L, name#25, ds#26] +- Filter (name#25 = ) +- SubqueryAlias tejasp_bucketed_partitioned_1 +- CatalogRelation `default`.`tejasp_bucketed_partitioned_1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#24L, name#25], [ds#26] == Optimized Logical Plan == Filter (isnotnull(name#25) && (name#25 = )) +- CatalogRelation `default`.`tejasp_bucketed_partitioned_1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#24L, name#25], [ds#26] == Physical Plan == *Filter (isnotnull(name#25) && (name#25 = )) +- HiveTableScan [user_id#24L, name#25, ds#26], CatalogRelation `default`.`tejasp_bucketed_partitioned_1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#24L, name#25], [ds#26] ``` Author: Tejas Patil <[email protected]> Closes #17806 from tejasapatil/add_serde.
…ing guide ## What changes were proposed in this pull request? Add hyper link in the SparkR programming guide. ## How was this patch tested? Build doc and manually check the doc link. Author: wangmiao1981 <[email protected]> Closes #17805 from wangmiao1981/doc.
## What changes were proposed in this pull request? Add a new section for fpm Add Example for FPGrowth in scala and Java updated: Rewrite transform to be more compact. ## How was this patch tested? local doc generation. Author: Yuhao Yang <[email protected]> Closes #17130 from hhbyyh/fpmdoc.
…ue should be lazy ## What changes were proposed in this pull request? MultilayerPerceptronClassifierWrapper model should be private. LogisticRegressionWrapper.scala rFeatures and rCoefficients should be lazy. ## How was this patch tested? Unit tests. Author: wangmiao1981 <[email protected]> Closes #17808 from wangmiao1981/lazy.
…in R ## What changes were proposed in this pull request? It seems we are using `SQLUtils.getSQLDataType` for type string in structField. It looks we can replace this with `CatalystSqlParser.parseDataType`. They look similar DDL-like type definitions as below: ```scala scala> Seq(Tuple1(Tuple1("a"))).toDF.show() ``` ``` +---+ | _1| +---+ |[a]| +---+ ``` ```scala scala> Seq(Tuple1(Tuple1("a"))).toDF.select($"_1".cast("struct<_1:string>")).show() ``` ``` +---+ | _1| +---+ |[a]| +---+ ``` Such type strings looks identical when R’s one as below: ```R > write.df(sql("SELECT named_struct('_1', 'a') as struct"), "/tmp/aa", "parquet") > collect(read.df("/tmp/aa", "parquet", structType(structField("struct", "struct<_1:string>")))) struct 1 a ``` R’s one is stricter because we are checking the types via regular expressions in R side ahead. Actual logics there look a bit different but as we check it ahead in R side, it looks replacing it would not introduce (I think) no behaviour changes. To make this sure, the tests dedicated for it were added in SPARK-20105. (It looks `structField` is the only place that calls this method). ## How was this patch tested? Existing tests - https://github.com/apache/spark/blob/master/R/pkg/inst/tests/testthat/test_sparkSQL.R#L143-L194 should cover this. Author: hyukjinkwon <[email protected]> Closes #17785 from HyukjinKwon/SPARK-20493.
…olumn API in PySpark ## What changes were proposed in this pull request? This PR proposes to fill up the documentation with examples for `bitwiseOR`, `bitwiseAND`, `bitwiseXOR`. `contains`, `asc` and `desc` in `Column` API. Also, this PR fixes minor typos in the documentation and matches some of the contents between Scala doc and Python doc. Lastly, this PR suggests to use `spark` rather than `sc` in doc tests in `Column` for Python documentation. ## How was this patch tested? Doc tests were added and manually tested with the commands below: `./python/run-tests.py --module pyspark-sql` `./python/run-tests.py --module pyspark-sql --python-executable python3` `./dev/lint-python` Output was checked via `make html` under `./python/docs`. The snapshots will be left on the codes with comments. Author: hyukjinkwon <[email protected]> Closes #17737 from HyukjinKwon/SPARK-20442.
…Ttl' should be 604800 in spark-standalone.md ## What changes were proposed in this pull request? Currently, our project needs to be set to clean up the worker directory cleanup cycle is three days. When I follow http://spark.apache.org/docs/latest/spark-standalone.html, configure the 'spark.worker.cleanup.appDataTtl' parameter, I configured to 3 * 24 * 3600. When I start the spark service, the startup fails, and the worker log displays the error log as follows: 2017-04-28 15:02:03,306 INFO Utils: Successfully started service 'sparkWorker' on port 48728. Exception in thread "main" java.lang.NumberFormatException: For input string: "3 * 24 * 3600" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:430) at java.lang.Long.parseLong(Long.java:483) at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276) at scala.collection.immutable.StringOps.toLong(StringOps.scala:29) at org.apache.spark.SparkConf$$anonfun$getLong$2.apply(SparkConf.scala:380) at org.apache.spark.SparkConf$$anonfun$getLong$2.apply(SparkConf.scala:380) at scala.Option.map(Option.scala:146) at org.apache.spark.SparkConf.getLong(SparkConf.scala:380) at org.apache.spark.deploy.worker.Worker.<init>(Worker.scala:100) at org.apache.spark.deploy.worker.Worker$.startRpcEnvAndEndpoint(Worker.scala:730) at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:709) at org.apache.spark.deploy.worker.Worker.main(Worker.scala) **Because we put 7 * 24 * 3600 as a string, forced to convert to the dragon type, will lead to problems in the program.** **So I think the default value of the current configuration should be a specific long value, rather than 7 * 24 * 3600,should be 604800. Because it would mislead users for similar configurations, resulting in spark start failure.** ## How was this patch tested? manual tests Please review http://spark.apache.org/contributing.html before opening a pull request. Author: 郭小龙 10207633 <[email protected]> Author: guoxiaolong <[email protected]> Author: guoxiaolongzte <[email protected]> Closes #17798 from guoxiaolongzte/SPARK-20521.
…ve types in parser ## What changes were proposed in this pull request? Currently, when the type string is invalid, it looks printing empty parentheses. This PR proposes a small improvement in an error message by removing it in the parse as below: ```scala spark.range(1).select($"col".cast("aa")) ``` **Before** ``` org.apache.spark.sql.catalyst.parser.ParseException: DataType aa() is not supported.(line 1, pos 0) == SQL == aa ^^^ ``` **After** ``` org.apache.spark.sql.catalyst.parser.ParseException: DataType aa is not supported.(line 1, pos 0) == SQL == aa ^^^ ``` ## How was this patch tested? Unit tests in `DataTypeParserSuite`. Author: hyukjinkwon <[email protected]> Closes #17784 from HyukjinKwon/SPARK-20492.
## What changes were proposed in this pull request? Ad R wrappers for - `o.a.s.sql.functions.explode_outer` - `o.a.s.sql.functions.posexplode_outer` ## How was this patch tested? Additional unit tests, manual testing. Author: zero323 <[email protected]> Closes #17809 from zero323/SPARK-20535.
…ue in fillna ## What changes were proposed in this pull request? Currently pyspark Dataframe.fillna API supports boolean type when we pass dict, but it is missing in documentation. ## How was this patch tested? >>> spark.createDataFrame([Row(a=True),Row(a=None)]).fillna({"a" : True}).show() +----+ | a| +----+ |true| |true| +----+ Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Srinivasa Reddy Vundela <[email protected]> Closes #17688 from vundela/fillna_doc_fix.
## What changes were proposed in this pull request? - Add null-safe equality operator `%<=>%` (sames as `o.a.s.sql.Column.eqNullSafe`, `o.a.s.sql.Column.<=>`) - Add boolean negation operator `!` and function `not `. ## How was this patch tested? Existing unit tests, additional unit tests, `check-cran.sh`. Author: zero323 <[email protected]> Closes #17783 from zero323/SPARK-20490.
## What changes were proposed in this pull request? Add without param for timeout - will need this to submit a job that runs until stopped Need this for 2.2 ## How was this patch tested? manually, unit test Author: Felix Cheung <[email protected]> Closes #17815 from felixcheung/rssawaitinfinite.
## What changes were proposed in this pull request? Adds Python bindings for `Column.eqNullSafe` ## How was this patch tested? Manual tests, existing unit tests, doc build. Author: zero323 <[email protected]> Closes #17605 from zero323/SPARK-20290.
## What changes were proposed in this pull request? Generate exec does not produce `null` values if the generator for the input row is empty and the generate operates in outer mode without join. This is caused by the fact that the `join=false` code path is different from the `join=true` code path, and that the `join=false` code path did deal with outer properly. This PR addresses this issue. ## How was this patch tested? Updated `outer*` tests in `GeneratorFunctionSuite`. Author: Herman van Hovell <[email protected]> Closes #17810 from hvanhovell/SPARK-20534.
The download link in history server UI is concatenated with: ``` <td><a href="{{uiroot}}/api/v1/applications/{{id}}/{{num}}/logs" class="btn btn-info btn-mini">Download</a></td> ``` Here `num` field represents number of attempts, this is not equal to REST APIs. In the REST API, if attempt id is not existed the URL should be `api/v1/applications/<id>/logs`, otherwise the URL should be `api/v1/applications/<id>/<attemptId>/logs`. Using `<num>` to represent `<attemptId>` will lead to the issue of "no such app". Manual verification. CC ajbozarth can you please review this change, since you add this feature before? Thanks! Author: jerryshao <[email protected]> Closes #17795 from jerryshao/SPARK-20517.
…es and fix cancellation of running jobs using the job group ## What changes were proposed in this pull request? Job group: adding a job group is required to properly cancel running jobs related to a query. Description: the new description makes it easier to group the batches of a query by sorting by name in the Spark Jobs UI. ## How was this patch tested? - Unit tests - UI screenshot - Order by job id: ![screen shot 2017-04-27 at 5 10 09 pm](https://cloud.githubusercontent.com/assets/7865120/25509468/15452274-2b6e-11e7-87ba-d929816688cf.png) - Order by description: ![screen shot 2017-04-27 at 5 10 22 pm](https://cloud.githubusercontent.com/assets/7865120/25509474/1c298512-2b6e-11e7-99b8-fef1ef7665c1.png) - Order by job id (no query name): ![screen shot 2017-04-27 at 5 21 33 pm](https://cloud.githubusercontent.com/assets/7865120/25509482/28c96dc8-2b6e-11e7-8df0-9d3cdbb05e36.png) - Order by description (no query name): ![screen shot 2017-04-27 at 5 21 44 pm](https://cloud.githubusercontent.com/assets/7865120/25509489/37674742-2b6e-11e7-9357-b5c38ec16ac4.png) Author: Kunal Khamar <[email protected]> Closes #17765 from kunalkhamar/sc-6696.
There are two problems fixed in this commit. First, the ExecutorAllocationManager sets a timeout to avoid requesting executors too often. However, the timeout is always updated based on its value and a timeout, not the current time. If the call is delayed by locking for more than the ongoing scheduler timeout, the manager will request more executors on every run. This seems to be the main cause of SPARK-20540. The second problem is that the total number of requested executors is not tracked by the CoarseGrainedSchedulerBackend. Instead, it calculates the value based on the current status of 3 variables: the number of known executors, the number of executors that have been killed, and the number of pending executors. But, the number of pending executors is never less than 0, even though there may be more known than requested. When executors are killed and not replaced, this can cause the request sent to YARN to be incorrect because there were too many executors due to the scheduler's state being slightly out of date. This is fixed by tracking the currently requested size explicitly. ## How was this patch tested? Existing tests. Author: Ryan Blue <[email protected]> Closes #17813 from rdblue/SPARK-20540-fix-dynamic-allocation.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Pull apache spark
How was this patch tested?
All in