Skip to content

Commit

Permalink
HBASE-27556 Reuse Zookeeper session of Master in LogCleaner (apache#4946
Browse files Browse the repository at this point in the history
)

Backport of HBASE-23340
hmaster /hbase/replication/rs session expired (hbase replication default value is true, we don't use )
causes logcleaner can not clean oldWALs, which resulits in oldWALs too large (more than 2TB))

Signed-off-by: Wellington Ramos Chevreuil <[email protected]>
Signed-off-by: Pankaj Kumar<[email protected]>
(cherry picked from commit 695dce5)
Change-Id: Id47c83dc38b54c62a57138cfd839279b6c26caa1
  • Loading branch information
petersomogyi authored and Jenkins User committed Jan 9, 2023
1 parent d8d31a9 commit 60b6f40
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1450,8 +1450,9 @@ private void startServiceThreads() throws IOException {
// Start log cleaner thread
int cleanerInterval =
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), logCleanerPool);
this.logCleaner =
new LogCleaner(cleanerInterval, this, conf, getMasterWalManager().getFileSystem(),
getMasterWalManager().getOldLogDir(), logCleanerPool, params);
getChoreService().scheduleChore(logCleaner);

// start the hfile archive cleaner thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -72,9 +73,9 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
* @param pool the thread pool used to scan directories
*/
public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path oldLogDir, DirScanPool pool) {
Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
pool);
pool, params);
this.pendingDelete = new LinkedBlockingQueue<>();
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
this.oldWALsCleaner = createOldWalsCleaner(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
Expand All @@ -35,6 +37,7 @@

import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;

/**
* Implementation of a log cleaner that checks if a log is still scheduled for replication before
Expand All @@ -43,7 +46,8 @@
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class);
private ZKWatcher zkw;
private ZKWatcher zkw = null;
private boolean shareZK = false;
private ReplicationQueueStorage queueStorage;
private boolean stopped = false;
private Set<String> wals;
Expand Down Expand Up @@ -92,12 +96,20 @@ public boolean apply(FileStatus file) {
}

@Override
public void setConf(Configuration config) {
// Make my own Configuration. Then I'll have my own connection to zk that
// I can close myself when comes time.
Configuration conf = new Configuration(config);
public void init(Map<String, Object> params) {
super.init(params);
try {
setConf(conf, new ZKWatcher(conf, "replicationLogCleaner", null));
if (MapUtils.isNotEmpty(params)) {
Object master = params.get(HMaster.MASTER);
if (master != null && master instanceof HMaster) {
zkw = ((HMaster) master).getZooKeeper();
shareZK = true;
}
}
if (zkw == null) {
zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null);
}
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
} catch (IOException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
}
Expand All @@ -118,7 +130,7 @@ public void setConf(Configuration conf, ZKWatcher zk) {
public void stop(String why) {
if (this.stopped) return;
this.stopped = true;
if (this.zkw != null) {
if (!shareZK && this.zkw != null) {
LOG.info("Stopping " + this.zkw);
this.zkw.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void testLogCleaning() throws Exception {
// 10 procedure WALs
assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);

LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL);
LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null);
cleaner.chore();

// In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
Expand Down Expand Up @@ -281,7 +281,7 @@ public void testOnConfigurationChange() throws Exception {
Server server = new DummyServer();

FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL);
LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null);
int size = cleaner.getSizeOfCleaners();
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
cleaner.getCleanerThreadTimeoutMsec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void stop(String why) {
public boolean isStopped() {
return stopped;
}
}, conf, fs, globalWALArchiveDir, logCleanerPool);
}, conf, fs, globalWALArchiveDir, logCleanerPool, null);
choreService.scheduleChore(logCleaner);
}

Expand Down

0 comments on commit 60b6f40

Please sign in to comment.