-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24257][SQL]LongToUnsafeRowMap calculate the new size may be wrong #21311
Conversation
@gatorsmile @hvanhovell Could you trigger tests? |
cc @cloud-fan |
ensureAcquireMemory(used * 8L * 2) | ||
val newPage = new Array[Long](used * 2) | ||
val multiples = math.max(math.ceil(needSize.toDouble / (used * 8L)).toInt, 2) | ||
ensureAcquireMemory(used * 8L * multiples) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we move the size check into before ensureAcquireMemory()
? IIUC, we have to check used * multiplies
<= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH` now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about shaping up this logic along with the other similar ones (spliting this func into two parts: grow
/append
)? e.g., UTF8StringBuilder
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java#L43
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on grow/append
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.Spliting append func into two parts: grow/append.
if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) { | ||
val needSize = cursor + 8 + row.getSizeInBytes | ||
val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET | ||
if (needSize > nowSize) { | ||
val used = page.length | ||
if (used >= (1 << 30)) { | ||
sys.error("Can not build a HashedRelation that is larger than 8G") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not related to this pr though, sys.error
instead of UnsupportedOperationException
?
Line 45 in b6c50d7
throw new UnsupportedOperationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. sys.error instead of UnsupportedOperationException
ok to test |
val used = page.length | ||
if (used >= (1 << 30)) { | ||
sys.error("Can not build a HashedRelation that is larger than 8G") | ||
} | ||
ensureAcquireMemory(used * 8L * 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doubling the size when growing is very typical, seems what you want to address is when the memory is enough for the requsted size but not enough for doubling the size. I'd suggest we should double the size most of the time, as long as there is enough memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok . Doubling the size when growing.
…n growing;sys.error instead of UnsupportedOperationException
Thanks for your review. @maropu @kiszk @cloud-fan I submitted a modification including the following:
|
Test build #90575 has finished for PR 21311 at commit
|
Test build #90574 has finished for PR 21311 at commit
|
@cxzl25, to clarify:
To clarify, is this a potential cause of a wrong-answer correctness bug? If so, we should be sure to backport the resulting fix to maintenance branches. /cc @cloud-fan @gatorsmile |
@JoshRosen @cloud-fan @gatorsmile
But when this patch is not introduced, no error, get wrong value. |
Can you explain more about it? IIUC if we don't have enough memory for |
@cloud-fan |
@@ -626,6 +618,32 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap | |||
} | |||
} | |||
|
|||
private def grow(neededSize: Int): Unit = { | |||
// There is 8 bytes for the pointer to next value | |||
val totalNeededSize = cursor + 8 + neededSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The grow logic should be: we must grow to fit the new row, otherwise OOM should be thrown. If possible, grow to oldSize * 2
private def grow(inputRowSize: Int): Unit = {
val neededNumWords = (cursor - Platform.LONG_ARRAY_OFFSET + 8 + inputRowSize + 7) / 8
if (neededNumWords > page.length) {
if (neededNumWords > (1 << 30)) fail...
val newNumWords = math.max(neededNumWords, math.min(page.length * 2, 1 << 30))
ensureAcquireMemory(newNumWords * 8L)
...
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Thank you for your suggestion and code.
…grow to oldSize * 2
@@ -626,6 +618,29 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap | |||
} | |||
} | |||
|
|||
private def grow(inputRowSize: Int): Unit = { | |||
val neededNumWords = (cursor - Platform.LONG_ARRAY_OFFSET + 8 + inputRowSize + 7) / 8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't forget the comment for the 8 bytes pointer
"Can not build a HashedRelation that is larger than 8G") | ||
} | ||
val newNumWords = math.max(neededNumWords, math.min(page.length * 2, 1 << 30)) | ||
if (newNumWords > ARRAY_MAX) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we won't need this check now, newNumWords
is guaranteed to be less than (1 << 30), which is much smaller than ARRAY_MAX
val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, StringType, false))) | ||
val keys = Seq(0L) | ||
val map = new LongToUnsafeRowMap(taskMemoryManager, 1) | ||
val bigStr = UTF8String.fromString("x" * 1024 * 1024 * 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a comment to say, the page array is initialized with length 1 << 17
, so here we need a value larger than 1 << 18
, to trigger the bug
val keys = Seq(0L) | ||
val map = new LongToUnsafeRowMap(taskMemoryManager, 1) | ||
val bigStr = UTF8String.fromString("x" * 1024 * 1024 * 2) | ||
keys.foreach { k => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we just have one key, why use loop?
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._ | |||
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode | |||
import org.apache.spark.sql.types.LongType | |||
import org.apache.spark.unsafe.Platform | |||
import org.apache.spark.unsafe.array.ByteArrayMethods |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed
Long.MaxValue, | ||
1), | ||
0) | ||
val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, StringType, false))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: UnsafeProjection.create(Array(StringType))
map.append(k, unsafeProj(InternalRow(bigStr))) | ||
} | ||
map.optimize() | ||
val row = unsafeProj(InternalRow(bigStr)).copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val resultRow = new UnsafeRow(1)
val key = 0L | ||
// the page array is initialized with length 1 << 17, | ||
// so here we need a value larger than 1 << 18 | ||
val bigStr = UTF8String.fromString("x" * 1024 * 1024 * 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we just do "x" * (1 << 19)
here?
LGTM, good catch! |
Test build #90966 has finished for PR 21311 at commit
|
val key = 0L | ||
// the page array is initialized with length 1 << 17 (1M bytes), | ||
// so here we need a value larger than 1 << 18 (2M bytes),to trigger the bug | ||
val bigStr = UTF8String.fromString("x" * (1 << 22)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to double check, do we have to use 1 << 22
to trigger this bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary.
Just chose a larger value to make it easier to lose data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean this bug can't be reproduced consistently? e.g. if we pick 1 << 18 + 1
, we may not expose this bug, so we have to use 1 << 22
to 100% reproduce this bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LongToUnsafeRowMap#getRow
resultRow=UnsafeRow#pointTo(page(1<<18), baseOffset(16), sizeInBytes(1<<21+16))
UTF8String#getBytes
copyMemory(base(page), offset, bytes, BYTE_ARRAY_OFFSET, numBytes(1<<21+16));
In the case of similar size sometimes, can still read the original value.
When introducing SPARK-10399,UnsafeRow#getUTF8String check the size at this time.
If we pick 1 << 18 + 1, 100% reproduce this bug.
But when this patch is not introduced, differences that are too small sometimes do not trigger.
So I chose a larger value.
My understanding may be problematic. Please advise. Thank you.
sun.misc.Unsafe unsafe;
try {
Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
unsafeField.setAccessible(true);
unsafe = (sun.misc.Unsafe) unsafeField.get(null);
} catch (Throwable cause) {
unsafe = null;
}
String value = "xxxxx";
byte[] src = value.getBytes();
byte[] dst = new byte[3];
byte[] newDst = new byte[5];
unsafe.copyMemory(src, 16, dst, 16, src.length);
unsafe.copyMemory(dst, 16, newDst, 16, src.length);
System.out.println("dst:" + new String(dst));
System.out.println("newDst:" + new String(newDst));
output:
dst:xxx
newDst:xxxxx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then 1 << 19 should be good enough as it doubles the size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think so.
Test build #90967 has finished for PR 21311 at commit
|
Test build #90970 has finished for PR 21311 at commit
|
Test build #90981 has finished for PR 21311 at commit
|
retest this please |
Test build #91009 has finished for PR 21311 at commit
|
retest this please |
Test build #91018 has finished for PR 21311 at commit
|
retest this please |
Test build #91041 has finished for PR 21311 at commit
|
retest this please |
Test build #91052 has finished for PR 21311 at commit
|
retest this please |
Test build #91066 has finished for PR 21311 at commit
|
…rong LongToUnsafeRowMap has a mistake when growing its page array: it blindly grows to `oldSize * 2`, while the new record may be larger than `oldSize * 2`. Then we may have a malformed UnsafeRow when querying this map, whose actual data is smaller than its declared size, and the data is corrupted. Author: sychen <[email protected]> Closes #21311 from cxzl25/fix_LongToUnsafeRowMap_page_size. (cherry picked from commit 8883401) Signed-off-by: Wenchen Fan <[email protected]>
…rong LongToUnsafeRowMap has a mistake when growing its page array: it blindly grows to `oldSize * 2`, while the new record may be larger than `oldSize * 2`. Then we may have a malformed UnsafeRow when querying this map, whose actual data is smaller than its declared size, and the data is corrupted. Author: sychen <[email protected]> Closes #21311 from cxzl25/fix_LongToUnsafeRowMap_page_size. (cherry picked from commit 8883401) Signed-off-by: Wenchen Fan <[email protected]>
…rong LongToUnsafeRowMap has a mistake when growing its page array: it blindly grows to `oldSize * 2`, while the new record may be larger than `oldSize * 2`. Then we may have a malformed UnsafeRow when querying this map, whose actual data is smaller than its declared size, and the data is corrupted. Author: sychen <[email protected]> Closes #21311 from cxzl25/fix_LongToUnsafeRowMap_page_size. (cherry picked from commit 8883401) Signed-off-by: Wenchen Fan <[email protected]>
…rong LongToUnsafeRowMap has a mistake when growing its page array: it blindly grows to `oldSize * 2`, while the new record may be larger than `oldSize * 2`. Then we may have a malformed UnsafeRow when querying this map, whose actual data is smaller than its declared size, and the data is corrupted. Author: sychen <[email protected]> Closes #21311 from cxzl25/fix_LongToUnsafeRowMap_page_size. (cherry picked from commit 8883401) Signed-off-by: Wenchen Fan <[email protected]>
thanks, merging to master/2.3/2.2/2.1/2.0! There is no conflict so I backported all the way to 2.0. I'll watch the jenkins build in the next few days. |
@cloud-fan Thank you very much for your help. |
…rong LongToUnsafeRowMap has a mistake when growing its page array: it blindly grows to `oldSize * 2`, while the new record may be larger than `oldSize * 2`. Then we may have a malformed UnsafeRow when querying this map, whose actual data is smaller than its declared size, and the data is corrupted. Author: sychen <[email protected]> Closes apache#21311 from cxzl25/fix_LongToUnsafeRowMap_page_size. (cherry picked from commit 8883401) Signed-off-by: Wenchen Fan <[email protected]>
…rong LongToUnsafeRowMap has a mistake when growing its page array: it blindly grows to `oldSize * 2`, while the new record may be larger than `oldSize * 2`. Then we may have a malformed UnsafeRow when querying this map, whose actual data is smaller than its declared size, and the data is corrupted. Author: sychen <[email protected]> Closes apache#21311 from cxzl25/fix_LongToUnsafeRowMap_page_size.
…rong LongToUnsafeRowMap has a mistake when growing its page array: it blindly grows to `oldSize * 2`, while the new record may be larger than `oldSize * 2`. Then we may have a malformed UnsafeRow when querying this map, whose actual data is smaller than its declared size, and the data is corrupted. Author: sychen <[email protected]> Closes apache#21311 from cxzl25/fix_LongToUnsafeRowMap_page_size.
What changes were proposed in this pull request?
LongToUnsafeRowMap
Calculate the new size simply by multiplying by 2
At this time, the size of the application may not be enough to store data
Some data is lost and the data read out is dirty
How was this patch tested?
HashedRelationSuite
test("LongToUnsafeRowMap with big values")