Skip to content

Commit

Permalink
HBASE-27215 Add support for sync replication (#4762)
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaolin Ha <[email protected]>
  • Loading branch information
Apache9 committed Apr 21, 2023
1 parent 68bcae9 commit f2398c1
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ private void uncaughtException(Thread t, Throwable e, ReplicationSourceManager m
t.getName());
manager.refreshSources(peerId);
break;
} catch (IOException e1) {
} catch (IOException | ReplicationException e1) {
LOG.error("Replication sources refresh failed.", e1);
sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,38 +404,44 @@ public void drainSources(String peerId) throws IOException, ReplicationException
// TODO: use empty initial offsets for now, revisit when adding support for sync replication
ReplicationSourceInterface src =
createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer);
// synchronized here to avoid race with preLogRoll where we add new log to source and also
// synchronized here to avoid race with postLogRoll where we add new log to source and also
// walsById.
ReplicationSourceInterface toRemove;
Map<String, NavigableSet<String>> wals = new HashMap<>();
ReplicationQueueData queueData;
synchronized (latestPaths) {
// Here we make a copy of all the remaining wal files and then delete them from the
// replication queue storage after releasing the lock. It is not safe to just remove the old
// map from walsById since later we may fail to update the replication queue storage, and when
// we retry next time, we can not know the wal files that needs to be set to the replication
// queue storage
ImmutableMap.Builder<String, ReplicationGroupOffset> builder = ImmutableMap.builder();
synchronized (walsById) {
walsById.get(queueId).forEach((group, wals) -> {
if (!wals.isEmpty()) {
builder.put(group, new ReplicationGroupOffset(wals.last(), -1));
}
});
}
queueData = new ReplicationQueueData(queueId, builder.build());
src = createSource(queueData, peer);
toRemove = sources.put(peerId, src);
if (toRemove != null) {
LOG.info("Terminate replication source for " + toRemove.getPeerId());
toRemove.terminate(terminateMessage);
toRemove.getSourceMetrics().clear();
}
// Here we make a copy of all the remaining wal files and then delete them from the
// replication queue storage after releasing the lock. It is not safe to just remove the old
// map from walsById since later we may fail to delete them from the replication queue
// storage, and when we retry next time, we can not know the wal files that need to be deleted
// from the replication queue storage.
walsById.get(queueId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
}
for (Map.Entry<String, ReplicationGroupOffset> entry : queueData.getOffsets().entrySet()) {
queueStorage.setOffset(queueId, entry.getKey(), entry.getValue(), Collections.emptyMap());
}
LOG.info("Startup replication source for " + src.getPeerId());
src.startup();
for (NavigableSet<String> walsByGroup : wals.values()) {
// TODO: just need to reset the replication offset
// for (String wal : walsByGroup) {
// queueStorage.removeWAL(server.getServerName(), peerId, wal);
// }
}
synchronized (walsById) {
Map<String, NavigableSet<String>> oldWals = walsById.get(queueId);
wals.forEach((k, v) -> {
NavigableSet<String> walsByGroup = oldWals.get(k);
Map<String, NavigableSet<String>> wals = walsById.get(queueId);
queueData.getOffsets().forEach((group, offset) -> {
NavigableSet<String> walsByGroup = wals.get(group);
if (walsByGroup != null) {
walsByGroup.removeAll(v);
walsByGroup.headSet(offset.getWal(), true).clear();
}
});
}
Expand All @@ -458,13 +464,8 @@ public void drainSources(String peerId) throws IOException, ReplicationException
}

private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId,
ReplicationPeer peer) throws IOException {
Map<String, ReplicationGroupOffset> offsets;
try {
offsets = queueStorage.getOffsets(queueId);
} catch (ReplicationException e) {
throw new IOException(e);
}
ReplicationPeer peer) throws IOException, ReplicationException {
Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId);
return createSource(new ReplicationQueueData(queueId, ImmutableMap.copyOf(offsets)), peer);
}

Expand All @@ -474,7 +475,7 @@ private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queu
* replication queue storage and only to enqueue all logs to the new replication source
* @param peerId the id of the replication peer
*/
public void refreshSources(String peerId) throws IOException {
public void refreshSources(String peerId) throws ReplicationException, IOException {
String terminateMessage = "Peer " + peerId
+ " state or config changed. Will close the previous replication source and open a new one";
ReplicationPeer peer = replicationPeers.getPeer(peerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

// TODO: revisit later
@Ignore
@Category({ ReplicationTests.class, MediumTests.class })
public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase {

Expand Down

0 comments on commit f2398c1

Please sign in to comment.