Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27109 Move replication queue storage from zookeeper to a separated HBase table #5202

Merged
merged 16 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
91406d0
HBASE-27212 Implement a new table based replication queue storage and…
Apache9 Aug 15, 2022
254f4bc
HBASE-27213 Add support for claim queue operation (#4708)
Apache9 Aug 20, 2022
3966835
HBASE-27214 Implement the new replication hfile/log cleaner (#4722)
Apache9 Aug 31, 2022
70945cd
HBASE-27215 Add support for sync replication (#4762)
Apache9 Sep 15, 2022
3f294ae
HBASE-27392 Add a new procedure type for implementing some global ope…
Apache9 Sep 29, 2022
d49a8a1
HBASE-27405 Fix the replication hfile/log cleaner report that the rep…
2005hithlj Oct 12, 2022
dfaff40
HBASE-27218 Support rolling upgrading (#4808)
Apache9 Nov 6, 2022
487f0c6
HBASE-27217 Revisit the DumpReplicationQueues tool (#4810)
2005hithlj Nov 13, 2022
6bb7d99
HBASE-27429 Add exponential retry backoff support for MigrateReplicat…
Apache9 Oct 18, 2022
9c5bfcf
HBASE-27430 Should disable replication log cleaner when migrating rep…
Apache9 Dec 3, 2022
acd05c7
HBASE-27216 Revisit the ReplicationSyncUp tool (#4966)
Apache9 Mar 18, 2023
252c4d7
HBASE-27623 Start a new ReplicationSyncUp after the previous failed (…
2005hithlj Apr 5, 2023
565ff49
HBASE-27775 Use a separate WAL provider for hbase:replication table (…
Apache9 Apr 8, 2023
14db76f
HBASE-27274 Re-enable the disabled tests when implementing HBASE-2721…
2005hithlj Apr 18, 2023
79265c9
HBASE-27809 Attach move replication queue storage from zookeeper to a…
2005hithlj Apr 23, 2023
168587e
HBASE-27516 Document the table based replication queue storage in ref…
2005hithlj May 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,8 @@ public static String writeMapAsString(Map<String, Object> map) throws IOExceptio
public static String writeObjectAsString(Object object) throws IOException {
return GSON.toJson(object);
}

public static <T> T fromJson(String json, Class<T> clazz) {
return GSON.fromJson(json, clazz);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,11 @@ public String getRsPath(ServerName sn) {
* @param suffix ending of znode name
* @return result of properly joining prefix with suffix
*/
public static String joinZNode(String prefix, String suffix) {
return prefix + ZNodePaths.ZNODE_PATH_SEPARATOR + suffix;
public static String joinZNode(String prefix, String... suffix) {
StringBuilder sb = new StringBuilder(prefix);
for (String s : suffix) {
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(s);
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ public enum LockedResourceType {
TABLE,
REGION,
PEER,
META
META,
GLOBAL
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.metrics.Counter;
import org.apache.hadoop.hbase.metrics.Histogram;
Expand All @@ -33,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;

/**
Expand Down Expand Up @@ -1011,6 +1013,19 @@ final void doReleaseLock(TEnvironment env, ProcedureStore store) {
releaseLock(env);
}

protected final ProcedureSuspendedException suspend(int timeoutMillis, boolean jitter)
throws ProcedureSuspendedException {
if (jitter) {
// 10% possible jitter
double add = (double) timeoutMillis * ThreadLocalRandom.current().nextDouble(0.1);
timeoutMillis += add;
}
setTimeout(timeoutMillis);
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}

@Override
public int compareTo(final Procedure<TEnvironment> other) {
return Long.compare(getProcId(), other.getProcId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,13 @@ public void add(InlineChore chore) {
}

public void add(Procedure<TEnvironment> procedure) {
LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
procedure.getTimeoutTimestamp());
queue.add(new DelayedProcedure<>(procedure));
if (procedure.getTimeout() > 0) {
LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
procedure.getTimeoutTimestamp());
queue.add(new DelayedProcedure<>(procedure));
} else {
LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure);
}
}

public boolean remove(Procedure<TEnvironment> procedure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ message UpdatePeerConfigStateData {

message RemovePeerStateData {
optional ReplicationPeer peer_config = 1;
repeated int64 ongoing_assign_replication_queues_proc_ids = 2;
}

message EnablePeerStateData {
Expand Down Expand Up @@ -679,16 +680,13 @@ message ClaimReplicationQueueRemoteStateData {
required ServerName crashed_server = 1;
required string queue = 2;
required ServerName target_server = 3;
optional ServerName source_server = 4;
}

message ClaimReplicationQueueRemoteParameter {
required ServerName crashed_server = 1;
required string queue = 2;
}

enum ClaimReplicationQueuesState {
CLAIM_REPLICATION_QUEUES_DISPATCH = 1;
CLAIM_REPLICATION_QUEUES_FINISH = 2;
optional ServerName source_server = 3;
}

enum ModifyTableDescriptorState {
Expand All @@ -715,3 +713,27 @@ message ModifyStoreFileTrackerStateData {
message ModifyColumnFamilyStoreFileTrackerStateData {
required bytes family = 1;
}

enum AssignReplicationQueuesState {
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1;
ASSIGN_REPLICATION_QUEUES_CLAIM = 2;
ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES = 3;
}

message AssignReplicationQueuesStateData {
required ServerName crashed_server = 1;
}

enum MigrateReplicationQueueFromZkToTableState {
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER = 1;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 2;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 3;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 4;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 5;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 6;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER = 7;
}

message MigrateReplicationQueueFromZkToTableStateData {
repeated string disabled_peer_id = 1;
}
10 changes: 10 additions & 0 deletions hbase-replication/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class ReplicationGroupOffset {

public static final ReplicationGroupOffset BEGIN = new ReplicationGroupOffset("", 0L);

private final String wal;

private final long offset;

public ReplicationGroupOffset(String wal, long offset) {
this.wal = wal;
this.offset = offset;
}

public String getWal() {
return wal;
}

/**
* A negative value means this file has already been fully replicated out
*/
public long getOffset() {
return offset;
}

@Override
public String toString() {
return wal + ":" + offset;
}

public static ReplicationGroupOffset parse(String str) {
int index = str.lastIndexOf(':');
return new ReplicationGroupOffset(str.substring(0, index),
Long.parseLong(str.substring(index + 1)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;

/**
* Representing all the information for a replication queue.
*/
@InterfaceAudience.Private
public class ReplicationQueueData {

private final ReplicationQueueId id;

private final ImmutableMap<String, ReplicationGroupOffset> offsets;

public ReplicationQueueData(ReplicationQueueId id,
ImmutableMap<String, ReplicationGroupOffset> offsets) {
this.id = id;
this.offsets = offsets;
}

public ReplicationQueueId getId() {
return id;
}

public ImmutableMap<String, ReplicationGroupOffset> getOffsets() {
return offsets;
}
}
Loading