Skip to content

Commit

Permalink
HBASE-28376 Column family ns does not exist in region during upgrade …
Browse files Browse the repository at this point in the history
…3.0.0-beta-2 (#5697)

Signed-off-by: Bryan Beaudreault <[email protected]>
  • Loading branch information
Apache9 authored Mar 10, 2024
1 parent 936d267 commit a5748a2
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -771,3 +771,12 @@ enum MigrateReplicationQueueFromZkToTableState {
message MigrateReplicationQueueFromZkToTableStateData {
repeated string disabled_peer_id = 1;
}

enum MigrateNamespaceTableProcedureState {
MIGRATE_NAMESPACE_TABLE_ADD_FAMILY = 1;
MIGRATE_NAMESPACE_TABLE_MIGRATE_DATA = 2;
MIGRATE_NAMESPACE_TABLE_DISABLE_TABLE = 3;
}

message MigrateNamespaceTableProcedureStateData {
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ protected synchronized void doStart() {
try {
notifyStarted();
this.tableNamespaceManager.start();
} catch (IOException ioe) {
notifyFailed(ioe);
} catch (IOException | InterruptedException e) {
notifyFailed(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,29 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.MigrateNamespaceTableProcedure;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -66,61 +66,112 @@ public class TableNamespaceManager {

private final MasterServices masterServices;

private volatile boolean migrationDone;

TableNamespaceManager(MasterServices masterServices) {
this.masterServices = masterServices;
}

private void migrateNamespaceTable() throws IOException {
try (Table nsTable = masterServices.getConnection().getTable(TableName.NAMESPACE_TABLE_NAME);
ResultScanner scanner = nsTable.getScanner(
new Scan().addFamily(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES).readAllVersions());
BufferedMutator mutator =
masterServices.getConnection().getBufferedMutator(TableName.META_TABLE_NAME)) {
private void tryMigrateNamespaceTable() throws IOException, InterruptedException {
Optional<MigrateNamespaceTableProcedure> opt = masterServices.getProcedures().stream()
.filter(p -> p instanceof MigrateNamespaceTableProcedure)
.map(p -> (MigrateNamespaceTableProcedure) p).findAny();
if (!opt.isPresent()) {
// the procedure is not present, check whether have the ns family in meta table
TableDescriptor metaTableDesc =
masterServices.getTableDescriptors().get(TableName.META_TABLE_NAME);
if (metaTableDesc.hasColumnFamily(HConstants.NAMESPACE_FAMILY)) {
// normal case, upgrading is done or the cluster is created with 3.x code
migrationDone = true;
} else {
// submit the migration procedure
MigrateNamespaceTableProcedure proc = new MigrateNamespaceTableProcedure();
masterServices.getMasterProcedureExecutor().submitProcedure(proc);
}
} else {
if (opt.get().isFinished()) {
// the procedure is already done
migrationDone = true;
}
// we have already submitted the procedure, continue
}
}

private void addToCache(Result result, byte[] family, byte[] qualifier) throws IOException {
Cell cell = result.getColumnLatestCell(family, qualifier);
NamespaceDescriptor ns =
ProtobufUtil.toNamespaceDescriptor(HBaseProtos.NamespaceDescriptor.parseFrom(CodedInputStream
.newInstance(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
cache.put(ns.getName(), ns);
}

private void loadFromMeta() throws IOException {
try (Table table = masterServices.getConnection().getTable(TableName.META_TABLE_NAME);
ResultScanner scanner = table.getScanner(HConstants.NAMESPACE_FAMILY)) {
for (Result result;;) {
result = scanner.next();
if (result == null) {
break;
}
Put put = new Put(result.getRow());
result
.getColumnCells(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES,
TableDescriptorBuilder.NAMESPACE_COL_DESC_BYTES)
.forEach(c -> put.addColumn(HConstants.NAMESPACE_FAMILY,
HConstants.NAMESPACE_COL_DESC_QUALIFIER, c.getTimestamp(), CellUtil.cloneValue(c)));
mutator.mutate(put);
addToCache(result, HConstants.NAMESPACE_FAMILY, HConstants.NAMESPACE_COL_DESC_QUALIFIER);
}
}
// schedule a disable procedure instead of block waiting here, as when disabling a table we will
// wait until master is initialized, but we are part of the initialization...
masterServices.getMasterProcedureExecutor().submitProcedure(
new DisableTableProcedure(masterServices.getMasterProcedureExecutor().getEnvironment(),
TableName.NAMESPACE_TABLE_NAME, false));
}

private void loadNamespaceIntoCache() throws IOException {
try (Table table = masterServices.getConnection().getTable(TableName.META_TABLE_NAME);
ResultScanner scanner = table.getScanner(HConstants.NAMESPACE_FAMILY)) {
private void loadFromNamespace() throws IOException {
try (Table table = masterServices.getConnection().getTable(TableName.NAMESPACE_TABLE_NAME);
ResultScanner scanner =
table.getScanner(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES)) {
for (Result result;;) {
result = scanner.next();
if (result == null) {
break;
}
Cell cell = result.getColumnLatestCell(HConstants.NAMESPACE_FAMILY,
HConstants.NAMESPACE_COL_DESC_QUALIFIER);
NamespaceDescriptor ns = ProtobufUtil
.toNamespaceDescriptor(HBaseProtos.NamespaceDescriptor.parseFrom(CodedInputStream
.newInstance(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
cache.put(ns.getName(), ns);
addToCache(result, TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES,
TableDescriptorBuilder.NAMESPACE_COL_DESC_BYTES);
}
}
}

public void start() throws IOException {
TableState nsTableState = MetaTableAccessor.getTableState(masterServices.getConnection(),
TableName.NAMESPACE_TABLE_NAME);
if (nsTableState != null && nsTableState.isEnabled()) {
migrateNamespaceTable();
private boolean shouldLoadFromMeta() throws IOException {
if (migrationDone) {
return true;
}
// the implementation is bit tricky
// if there is already a disable namespace table procedure or the namespace table is already
// disabled, we are safe to read from meta table as the migration is already done. If not, since
// we are part of the master initialization work, so we can make sure that when reaching here,
// the master has not been marked as initialize yet. And DisableTableProcedure can only be
// executed after master is initialized, so here we are safe to read from namespace table,
// without worrying about that the namespace table is disabled while we are reading and crash
// the master startup.
if (
masterServices.getTableStateManager().isTableState(TableName.NAMESPACE_TABLE_NAME,
TableState.State.DISABLED)
) {
return true;
}
if (
masterServices.getProcedures().stream().filter(p -> p instanceof DisableTableProcedure)
.anyMatch(
p -> ((DisableTableProcedure) p).getTableName().equals(TableName.NAMESPACE_TABLE_NAME))
) {
return true;
}
return false;
}

private void loadNamespaceIntoCache() throws IOException {
if (shouldLoadFromMeta()) {
loadFromMeta();
} else {
loadFromNamespace();
}

}

public void start() throws IOException, InterruptedException {
tryMigrateNamespaceTable();
loadNamespaceIntoCache();
}

Expand All @@ -135,7 +186,14 @@ public NamespaceDescriptor get(String name) throws IOException {
return cache.get(name);
}

private void checkMigrationDone() throws IOException {
if (!migrationDone) {
throw new HBaseIOException("namespace migration is ongoing, modification is disallowed");
}
}

public void addOrUpdateNamespace(NamespaceDescriptor ns) throws IOException {
checkMigrationDone();
insertNamespaceToMeta(masterServices.getConnection(), ns);
cache.put(ns.getName(), ns);
}
Expand All @@ -152,6 +210,7 @@ public static void insertNamespaceToMeta(Connection conn, NamespaceDescriptor ns
}

public void deleteNamespace(String namespaceName) throws IOException {
checkMigrationDone();
Delete d = new Delete(Bytes.toBytes(namespaceName));
try (Table table = masterServices.getConnection().getTable(TableName.META_TABLE_NAME)) {
table.delete(d);
Expand All @@ -174,6 +233,10 @@ public void validateTableAndRegionCount(NamespaceDescriptor desc) throws IOExcep
}
}

public void setMigrationDone() {
migrationDone = true;
}

public static long getMaxTables(NamespaceDescriptor ns) throws IOException {
String value = ns.getConfigurationValue(KEY_MAX_TABLES);
long maxTables = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateNamespaceTableProcedureState;

/**
* Migrate the namespace data to meta table's namespace family while upgrading
*/
@InterfaceAudience.Private
public class MigrateNamespaceTableProcedure
extends StateMachineProcedure<MasterProcedureEnv, MigrateNamespaceTableProcedureState>
implements GlobalProcedureInterface {

private static final Logger LOG = LoggerFactory.getLogger(MigrateNamespaceTableProcedure.class);

private RetryCounter retryCounter;

@Override
public String getGlobalId() {
return getClass().getSimpleName();
}

private void migrate(MasterProcedureEnv env) throws IOException {
Connection conn = env.getMasterServices().getConnection();
try (Table nsTable = conn.getTable(TableName.NAMESPACE_TABLE_NAME);
ResultScanner scanner = nsTable.getScanner(
new Scan().addFamily(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES).readAllVersions());
BufferedMutator mutator = conn.getBufferedMutator(TableName.META_TABLE_NAME)) {
for (Result result;;) {
result = scanner.next();
if (result == null) {
break;
}
Put put = new Put(result.getRow());
result
.getColumnCells(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES,
TableDescriptorBuilder.NAMESPACE_COL_DESC_BYTES)
.forEach(c -> put.addColumn(HConstants.NAMESPACE_FAMILY,
HConstants.NAMESPACE_COL_DESC_QUALIFIER, c.getTimestamp(), CellUtil.cloneValue(c)));
mutator.mutate(put);
}
}
}

@Override
protected Flow executeFromState(MasterProcedureEnv env, MigrateNamespaceTableProcedureState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
try {
switch (state) {
case MIGRATE_NAMESPACE_TABLE_ADD_FAMILY:
TableDescriptor metaTableDesc =
env.getMasterServices().getTableDescriptors().get(TableName.META_TABLE_NAME);
if (!metaTableDesc.hasColumnFamily(HConstants.NAMESPACE_FAMILY)) {
TableDescriptor newMetaTableDesc = TableDescriptorBuilder.newBuilder(metaTableDesc)
.setColumnFamily(
FSTableDescriptors.getNamespaceFamilyDescForMeta(env.getMasterConfiguration()))
.build();
addChildProcedure(new ModifyTableProcedure(env, newMetaTableDesc));
}
setNextState(MigrateNamespaceTableProcedureState.MIGRATE_NAMESPACE_TABLE_MIGRATE_DATA);
return Flow.HAS_MORE_STATE;
case MIGRATE_NAMESPACE_TABLE_MIGRATE_DATA:
migrate(env);
setNextState(MigrateNamespaceTableProcedureState.MIGRATE_NAMESPACE_TABLE_DISABLE_TABLE);
return Flow.HAS_MORE_STATE;
case MIGRATE_NAMESPACE_TABLE_DISABLE_TABLE:
addChildProcedure(new DisableTableProcedure(env, TableName.NAMESPACE_TABLE_NAME, false));
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("Unhandled state=" + state);
}
} catch (IOException e) {
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed migrating namespace data, suspend {}secs {}", backoff / 1000, this, e);
throw suspend(Math.toIntExact(backoff), true);
}
}

@Override
protected void rollbackState(MasterProcedureEnv env, MigrateNamespaceTableProcedureState state)
throws IOException, InterruptedException {
}

@Override
protected MigrateNamespaceTableProcedureState getState(int stateId) {
return MigrateNamespaceTableProcedureState.forNumber(stateId);
}

@Override
protected int getStateId(MigrateNamespaceTableProcedureState state) {
return state.getNumber();
}

@Override
protected MigrateNamespaceTableProcedureState getInitialState() {
return MigrateNamespaceTableProcedureState.MIGRATE_NAMESPACE_TABLE_ADD_FAMILY;
}

@Override
protected void completionCleanup(MasterProcedureEnv env) {
env.getMasterServices().getClusterSchema().getTableNamespaceManager().setMigrationDone();
}
}
Loading

0 comments on commit a5748a2

Please sign in to comment.