-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF #2975
Conversation
// After we removed the WAL from the queue, we should | ||
// try shipping the existing batch of entries, we do not want to reset | ||
// stream since entry stream doesn't have the correct data at this point | ||
isBatchQueuedToBeShipped(entryStream, batch, hasMoreData, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the fix to avoid permanent data loss.
...server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Show resolved
Hide resolved
@@ -840,7 +844,7 @@ public void uncaughtException(final Thread t, final Throwable e) { | |||
|
|||
// If this is a recovered queue, the queue is already full and the first log | |||
// normally has a position (unless the RS failed between 2 logs) | |||
private long getRecoveredQueueStartPos(long startPosition) { | |||
public long getRecoveredQueueStartPos(long startPosition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change needed? I don't see any changes to a caller of this method in this patch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I will revert it.
.../java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
Show resolved
Hide resolved
.../java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
Show resolved
Hide resolved
.../java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
Show resolved
Hide resolved
.../java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
Show resolved
Hide resolved
.../java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
Show resolved
Hide resolved
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
Show resolved
Hide resolved
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
Show resolved
Hide resolved
assertEquals(0, queue.size()); | ||
} | ||
|
||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice test
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
Show resolved
Hide resolved
queue, 0, fs, conf, getDummyFilter(), | ||
new MetricsSource("1"), (ReplicationSource) source); | ||
reader.run(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need to do some wait here? How did you drain the queue? I might be missing something here. thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xcangCRM i think you are confusing it with .start()
method which is async call. We are executing .run()
which is sync call.
.../java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
Show resolved
Hide resolved
.../java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
Show resolved
Hide resolved
* @throws InterruptedException throws interrupted exception | ||
* @throws IOException throws io exception from stream | ||
*/ | ||
private boolean updateBatchAndAddInShippingQueue(WALEntryStream entryStream, WALEntryBatch batch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
name is bit long,
how about updateBatchAndShippingQueue
.../java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
Show resolved
Hide resolved
.../java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
Show resolved
Hide resolved
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
Unless there is a change in review status I will merge this in a couple of hours. |
Please find the details in jira:
https://issues.apache.org/jira/browse/HBASE-25596