diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index e078722b157f..0784a87711bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -465,7 +465,7 @@ private void uncaughtException(Thread t, Throwable e, ReplicationSourceManager m t.getName()); manager.refreshSources(peerId); break; - } catch (IOException e1) { + } catch (IOException | ReplicationException e1) { LOG.error("Replication sources refresh failed.", e1); sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 03569be86fce..f3d07315240d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -403,38 +403,44 @@ public void drainSources(String peerId) throws IOException, ReplicationException // TODO: use empty initial offsets for now, revisit when adding support for sync replication ReplicationSourceInterface src = createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer); - // synchronized here to avoid race with preLogRoll where we add new log to source and also + // synchronized here to avoid race with postLogRoll where we add new log to source and also // walsById. ReplicationSourceInterface toRemove; - Map> wals = new HashMap<>(); + ReplicationQueueData queueData; synchronized (latestPaths) { + // Here we make a copy of all the remaining wal files and then delete them from the + // replication queue storage after releasing the lock. It is not safe to just remove the old + // map from walsById since later we may fail to update the replication queue storage, and when + // we retry next time, we can not know the wal files that needs to be set to the replication + // queue storage + ImmutableMap.Builder builder = ImmutableMap.builder(); + synchronized (walsById) { + walsById.get(queueId).forEach((group, wals) -> { + if (!wals.isEmpty()) { + builder.put(group, new ReplicationGroupOffset(wals.last(), -1)); + } + }); + } + queueData = new ReplicationQueueData(queueId, builder.build()); + src = createSource(queueData, peer); toRemove = sources.put(peerId, src); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); toRemove.terminate(terminateMessage); toRemove.getSourceMetrics().clear(); } - // Here we make a copy of all the remaining wal files and then delete them from the - // replication queue storage after releasing the lock. It is not safe to just remove the old - // map from walsById since later we may fail to delete them from the replication queue - // storage, and when we retry next time, we can not know the wal files that need to be deleted - // from the replication queue storage. - walsById.get(queueId).forEach((k, v) -> wals.put(k, new TreeSet<>(v))); + } + for (Map.Entry entry : queueData.getOffsets().entrySet()) { + queueStorage.setOffset(queueId, entry.getKey(), entry.getValue(), Collections.emptyMap()); } LOG.info("Startup replication source for " + src.getPeerId()); src.startup(); - for (NavigableSet walsByGroup : wals.values()) { - // TODO: just need to reset the replication offset - // for (String wal : walsByGroup) { - // queueStorage.removeWAL(server.getServerName(), peerId, wal); - // } - } synchronized (walsById) { - Map> oldWals = walsById.get(queueId); - wals.forEach((k, v) -> { - NavigableSet walsByGroup = oldWals.get(k); + Map> wals = walsById.get(queueId); + queueData.getOffsets().forEach((group, offset) -> { + NavigableSet walsByGroup = wals.get(group); if (walsByGroup != null) { - walsByGroup.removeAll(v); + walsByGroup.headSet(offset.getWal(), true).clear(); } }); } @@ -457,13 +463,8 @@ public void drainSources(String peerId) throws IOException, ReplicationException } private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId, - ReplicationPeer peer) throws IOException { - Map offsets; - try { - offsets = queueStorage.getOffsets(queueId); - } catch (ReplicationException e) { - throw new IOException(e); - } + ReplicationPeer peer) throws IOException, ReplicationException { + Map offsets = queueStorage.getOffsets(queueId); return createSource(new ReplicationQueueData(queueId, ImmutableMap.copyOf(offsets)), peer); } @@ -473,7 +474,7 @@ private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queu * replication queue storage and only to enqueue all logs to the new replication source * @param peerId the id of the replication peer */ - public void refreshSources(String peerId) throws IOException { + public void refreshSources(String peerId) throws ReplicationException, IOException { String terminateMessage = "Peer " + peerId + " state or config changed. Will close the previous replication source and open a new one"; ReplicationPeer peer = replicationPeers.getPeer(peerId); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java index 8918f8422e1d..0189d4755754 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java @@ -35,12 +35,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -// TODO: revisit later -@Ignore @Category({ ReplicationTests.class, MediumTests.class }) public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase {