Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27818 Split TestReplicationDroppedTables #5206

Merged
merged 1 commit into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import static org.junit.Assert.fail;

import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
Expand All @@ -37,29 +35,23 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationDroppedTables extends TestReplicationBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationDroppedTables.class);
/**
* Base class for testing replication for dropped tables.
*/
public class ReplicationDroppedTablesTestBase extends TestReplicationBase {

private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class);
private static final int ROWS_COUNT = 1000;
private static final Logger LOG = LoggerFactory.getLogger(ReplicationDroppedTablesTestBase.class);
protected static final int ROWS_COUNT = 1000;

@Before
@Override
public void setUpBase() throws Exception {
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
Expand Down Expand Up @@ -103,38 +95,12 @@ public void setUpBase() throws Exception {
CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024);
}

@Test
public void testEditsStuckBehindDroppedTable() throws Exception {
// Sanity check Make sure by default edits for dropped tables stall the replication queue, even
// when the table(s) in question have been deleted on both ends.
testEditsBehindDroppedTable(false, "test_dropped");
}

@Test
public void testEditsDroppedWithDroppedTable() throws Exception {
// Make sure by default edits for dropped tables are themselves dropped when the
// table(s) in question have been deleted on both ends.
testEditsBehindDroppedTable(true, "test_dropped");
}

@Test
public void testEditsDroppedWithDroppedTableNS() throws Exception {
// also try with a namespace
UTIL1.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
UTIL2.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
try {
testEditsBehindDroppedTable(true, "NS:test_dropped");
} finally {
UTIL1.getAdmin().deleteNamespace("NS");
UTIL2.getAdmin().deleteNamespace("NS");
}
}

private byte[] generateRowKey(int id) {
protected final byte[] generateRowKey(int id) {
return Bytes.toBytes(String.format("NormalPut%03d", id));
}

private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception {
protected final void testEditsBehindDroppedTable(boolean allowProceeding, String tName)
throws Exception {
CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);

Expand Down Expand Up @@ -205,80 +171,6 @@ private void testEditsBehindDroppedTable(boolean allowProceeding, String tName)
CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
}

@Test
public void testEditsBehindDroppedTableTiming() throws Exception {
CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);

// make sure we have a single region server only, so that all
// edits for all tables go there
restartSourceCluster(1);

TableName tablename = TableName.valueOf("testdroppedtimed");
byte[] familyName = Bytes.toBytes("fam");
byte[] row = Bytes.toBytes("row");

TableDescriptor table =
TableDescriptorBuilder.newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();

Connection connection1 = ConnectionFactory.createConnection(CONF1);
Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin1 = connection1.getAdmin()) {
admin1.createTable(table);
}
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(table);
}
UTIL1.waitUntilAllRegionsAssigned(tablename);
UTIL2.waitUntilAllRegionsAssigned(tablename);

// now suspend replication
try (Admin admin1 = connection1.getAdmin()) {
admin1.disableReplicationPeer(PEER_ID2);
}

// put some data (lead with 0 so the edit gets sorted before the other table's edits
// in the replication batch) write a bunch of edits, making sure we fill a batch
try (Table droppedTable = connection1.getTable(tablename)) {
byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
Put put = new Put(rowKey);
put.addColumn(familyName, row, row);
droppedTable.put(put);
}

try (Table table1 = connection1.getTable(tableName)) {
for (int i = 0; i < ROWS_COUNT; i++) {
Put put = new Put(generateRowKey(i)).addColumn(famName, row, row);
table1.put(put);
}
}

try (Admin admin2 = connection2.getAdmin()) {
admin2.disableTable(tablename);
admin2.deleteTable(tablename);
}

// edit should still be stuck
try (Admin admin1 = connection1.getAdmin()) {
// enable the replication peer.
admin1.enableReplicationPeer(PEER_ID2);
// the source table still exists, replication should be stalled
verifyReplicationStuck();

admin1.disableTable(tablename);
// still stuck, source table still exists
verifyReplicationStuck();

admin1.deleteTable(tablename);
// now the source table is gone, replication should proceed, the
// offending edits be dropped
verifyReplicationProceeded();
}
// just to be safe
CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
}

private boolean peerHasAllNormalRows() throws IOException {
try (ResultScanner scanner = htable2.getScanner(new Scan())) {
Result[] results = scanner.next(ROWS_COUNT);
Expand All @@ -292,7 +184,7 @@ private boolean peerHasAllNormalRows() throws IOException {
}
}

private void verifyReplicationProceeded() throws Exception {
protected final void verifyReplicationProceeded() throws Exception {
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
fail("Waited too much time for put replication");
Expand All @@ -306,7 +198,7 @@ private void verifyReplicationProceeded() throws Exception {
}
}

private void verifyReplicationStuck() throws Exception {
protected final void verifyReplicationStuck() throws Exception {
for (int i = 0; i < NB_RETRIES; i++) {
if (peerHasAllNormalRows()) {
fail("Edit should have been stuck behind dropped tables");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import static org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_TABLE_KEY;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ ReplicationTests.class, LargeTests.class })
public class TestEditsBehindDroppedTableTiming extends ReplicationDroppedTablesTestBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestEditsBehindDroppedTableTiming.class);

@Override
public void setUpBase() throws Exception {
CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
super.setUpBase();
// make sure we have a single region server only, so that all
// edits for all tables go there
restartSourceCluster(1);
}

@Test
public void testEditsBehindDroppedTableTiming() throws Exception {
TableName tablename = TableName.valueOf("testdroppedtimed");
byte[] familyName = Bytes.toBytes("fam");
byte[] row = Bytes.toBytes("row");

TableDescriptor table =
TableDescriptorBuilder.newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();

Connection connection1 = ConnectionFactory.createConnection(CONF1);
Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin1 = connection1.getAdmin()) {
admin1.createTable(table);
}
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(table);
}
UTIL1.waitUntilAllRegionsAssigned(tablename);
UTIL2.waitUntilAllRegionsAssigned(tablename);

// now suspend replication
try (Admin admin1 = connection1.getAdmin()) {
admin1.disableReplicationPeer(PEER_ID2);
}

// put some data (lead with 0 so the edit gets sorted before the other table's edits
// in the replication batch) write a bunch of edits, making sure we fill a batch
try (Table droppedTable = connection1.getTable(tablename)) {
byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
Put put = new Put(rowKey);
put.addColumn(familyName, row, row);
droppedTable.put(put);
}

try (Table table1 = connection1.getTable(tableName)) {
for (int i = 0; i < ROWS_COUNT; i++) {
Put put = new Put(generateRowKey(i)).addColumn(famName, row, row);
table1.put(put);
}
}

try (Admin admin2 = connection2.getAdmin()) {
admin2.disableTable(tablename);
admin2.deleteTable(tablename);
}

// edit should still be stuck
try (Admin admin1 = connection1.getAdmin()) {
// enable the replication peer.
admin1.enableReplicationPeer(PEER_ID2);
// the source table still exists, replication should be stalled
verifyReplicationStuck();

admin1.disableTable(tablename);
// still stuck, source table still exists
verifyReplicationStuck();

admin1.deleteTable(tablename);
// now the source table is gone, replication should proceed, the
// offending edits be dropped
verifyReplicationProceeded();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ ReplicationTests.class, LargeTests.class })
public class TestEditsDroppedWithDroppedTable extends ReplicationDroppedTablesTestBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestEditsDroppedWithDroppedTable.class);

@Test
public void testEditsDroppedWithDroppedTable() throws Exception {
// Make sure by default edits for dropped tables are themselves dropped when the
// table(s) in question have been deleted on both ends.
testEditsBehindDroppedTable(true, "test_dropped");
}

}
Loading