Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-23696 Stop WALProcedureStore after migration finishes #1050

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,63 +308,70 @@ private void tryMigrate(FileSystem fs) throws IOException {
}
LOG.info("The old WALProcedureStore wal directory {} exists, migrating...", procWALDir);
WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery);
store.start(numThreads);
store.recoverLease();
MutableLong maxProcIdSet = new MutableLong(-1);
MutableLong maxProcIdFromProcs = new MutableLong(-1);
store.load(new ProcedureLoader() {

@Override
public void setMaxProcId(long maxProcId) {
maxProcIdSet.setValue(maxProcId);
}
try {
store.start(numThreads);
store.recoverLease();
MutableLong maxProcIdSet = new MutableLong(-1);
MutableLong maxProcIdFromProcs = new MutableLong(-1);
store.load(new ProcedureLoader() {

@Override
public void setMaxProcId(long maxProcId) {
maxProcIdSet.setValue(maxProcId);
}

@Override
public void load(ProcedureIterator procIter) throws IOException {
long procCount = 0;
while (procIter.hasNext()) {
Procedure<?> proc = procIter.next();
update(proc);
procCount++;
if (proc.getProcId() > maxProcIdFromProcs.longValue()) {
maxProcIdFromProcs.setValue(proc.getProcId());
@Override
public void load(ProcedureIterator procIter) throws IOException {
long procCount = 0;
while (procIter.hasNext()) {
Procedure<?> proc = procIter.next();
update(proc);
procCount++;
if (proc.getProcId() > maxProcIdFromProcs.longValue()) {
maxProcIdFromProcs.setValue(proc.getProcId());
}
}
LOG.info("Migrated {} procedures", procCount);
}
LOG.info("Migrated {} procedures", procCount);
}

@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
long corruptedCount = 0;
while (procIter.hasNext()) {
LOG.error("Corrupted procedure {}", procIter.next());
corruptedCount++;
@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
long corruptedCount = 0;
while (procIter.hasNext()) {
LOG.error("Corrupted procedure {}", procIter.next());
corruptedCount++;
}
if (corruptedCount > 0) {
throw new IOException("There are " + corruptedCount + " corrupted procedures when" +
" migrating from the old WAL based store to the new region based store, please" +
" fix them before upgrading again.");
}
}
if (corruptedCount > 0) {
throw new IOException("There are " + corruptedCount + " corrupted procedures when" +
" migrating from the old WAL based store to the new region based store, please" +
" fix them before upgrading again.");
});
LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}",
maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
// Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
// anyway, let's do a check here.
if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
if (maxProcIdSet.longValue() > 0) {
// let's add a fake row to retain the max proc id
region.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(FAMILY,
PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
}
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
}
});
LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}",
maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
// Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
// anyway, let's do a check here.
if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
if (maxProcIdSet.longValue() > 0) {
// let's add a fake row to retain the max proc id
region.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(FAMILY,
PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
if (!fs.delete(procWALDir, true)) {
throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " +
procWALDir);
}
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
}
if (!fs.delete(procWALDir, true)) {
throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " +
procWALDir);
LOG.info("Migration of WALProcedureStore finished");
} catch (IOException ioe) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the catch, or could just stop it on the finally and let the IOE be thrown upwards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @wchevreuil . I only now noticed you'd just put up a fix for the same thing.

On this line, I should have rethrown the caught exception. I wanted to pass in 'true' for abort if exception as opposed to 'false' for the finally block. I might be overthinking it since IIRC, if problem here, we'll crash out the Master. Let me resolve this in favor of yours.

store.stop(true);
throw ioe;
} finally {
store.stop(false);
}
LOG.info("Migration of WALProcedureStore finished");
}

@Override
Expand Down