Skip to content

Commit

Permalink
HBASE-27218 Support rolling upgrading (#4808)
Browse files Browse the repository at this point in the history
Signed-off-by: Yu Li <[email protected]>
  • Loading branch information
Apache9 committed Jan 19, 2023
1 parent f0332a6 commit da08965
Show file tree
Hide file tree
Showing 22 changed files with 1,917 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,11 @@ public String getRsPath(ServerName sn) {
* @param suffix ending of znode name
* @return result of properly joining prefix with suffix
*/
public static String joinZNode(String prefix, String suffix) {
return prefix + ZNodePaths.ZNODE_PATH_SEPARATOR + suffix;
public static String joinZNode(String prefix, String... suffix) {
StringBuilder sb = new StringBuilder(prefix);
for (String s : suffix) {
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(s);
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.metrics.Counter;
import org.apache.hadoop.hbase.metrics.Histogram;
Expand All @@ -33,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;

/**
Expand Down Expand Up @@ -1011,6 +1013,19 @@ final void doReleaseLock(TEnvironment env, ProcedureStore store) {
releaseLock(env);
}

protected final ProcedureSuspendedException suspend(int timeoutMillis, boolean jitter)
throws ProcedureSuspendedException {
if (jitter) {
// 10% possible jitter
double add = (double) timeoutMillis * ThreadLocalRandom.current().nextDouble(0.1);
timeoutMillis += add;
}
setTimeout(timeoutMillis);
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}

@Override
public int compareTo(final Procedure<TEnvironment> other) {
return Long.compare(getProcId(), other.getProcId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,3 +722,15 @@ enum AssignReplicationQueuesState {
message AssignReplicationQueuesStateData {
required ServerName crashed_server = 1;
}

enum MigrateReplicationQueueFromZkToTableState {
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5;
}

message MigrateReplicationQueueFromZkToTableStateData {
repeated string disabled_peer_id = 1;
}
10 changes: 10 additions & 0 deletions hbase-replication/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -184,4 +185,22 @@ void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
* @return Whether the replication queue table exists
*/
boolean hasData() throws ReplicationException;

// the below 3 methods are used for migrating
/**
* Update the replication queue datas for a given region server.
*/
void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
throws ReplicationException;

/**
* Update last pushed sequence id for the given regions and peers.
*/
void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
throws ReplicationException;

/**
* Add the given hfile refs to the given peer.
*/
void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
Expand All @@ -46,6 +48,7 @@
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -74,12 +77,6 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage {

private final TableName tableName;

@FunctionalInterface
private interface TableCreator {

void create() throws IOException;
}

public TableReplicationQueueStorage(Connection conn, TableName tableName) {
this.conn = conn;
this.tableName = tableName;
Expand Down Expand Up @@ -541,4 +538,60 @@ public boolean hasData() throws ReplicationException {
throw new ReplicationException("failed to get replication queue table", e);
}
}

@Override
public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
throws ReplicationException {
List<Put> puts = new ArrayList<>();
for (ReplicationQueueData data : datas) {
if (data.getOffsets().isEmpty()) {
continue;
}
Put put = new Put(Bytes.toBytes(data.getId().toString()));
data.getOffsets().forEach((walGroup, offset) -> {
put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString()));
});
puts.add(put);
}
try (Table table = conn.getTable(tableName)) {
table.put(puts);
} catch (IOException e) {
throw new ReplicationException("failed to batch update queues", e);
}
}

@Override
public void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
throws ReplicationException {
Map<String, Put> peerId2Put = new HashMap<>();
for (ZkLastPushedSeqId lastPushedSeqId : lastPushedSeqIds) {
peerId2Put
.computeIfAbsent(lastPushedSeqId.getPeerId(), peerId -> new Put(Bytes.toBytes(peerId)))
.addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(lastPushedSeqId.getEncodedRegionName()),
Bytes.toBytes(lastPushedSeqId.getLastPushedSeqId()));
}
try (Table table = conn.getTable(tableName)) {
table
.put(peerId2Put.values().stream().filter(p -> !p.isEmpty()).collect(Collectors.toList()));
} catch (IOException e) {
throw new ReplicationException("failed to batch update last pushed sequence ids", e);
}
}

@Override
public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
throws ReplicationException {
if (hfileRefs.isEmpty()) {
return;
}
Put put = new Put(Bytes.toBytes(peerId));
for (String ref : hfileRefs) {
put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(ref), HConstants.EMPTY_BYTE_ARRAY);
}
try (Table table = conn.getTable(tableName)) {
table.put(put);
} catch (IOException e) {
throw new ReplicationException("failed to batch update hfile references", e);
}
}
}
Loading

0 comments on commit da08965

Please sign in to comment.