diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 41fc99b537b7..d1db90f6335d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1427,8 +1427,9 @@ private void startServiceThreads() throws IOException { // Start log cleaner thread int cleanerInterval = conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL); - this.logCleaner = new LogCleaner(cleanerInterval, this, conf, - getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), logCleanerPool); + this.logCleaner = + new LogCleaner(cleanerInterval, this, conf, getMasterWalManager().getFileSystem(), + getMasterWalManager().getOldLogDir(), logCleanerPool, params); getChoreService().scheduleChore(logCleaner); // start the hfile archive cleaner thread diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index dadcc36994dd..90e498583c0c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -72,9 +73,9 @@ public class LogCleaner extends CleanerChore * @param pool the thread pool used to scan directories */ public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, - Path oldLogDir, DirScanPool pool) { + Path oldLogDir, DirScanPool pool, Map params) { super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS, - pool); + pool, params); this.pendingDelete = new LinkedBlockingQueue<>(); int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); this.oldWALsCleaner = createOldWalsCleaner(size); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 138095f223dd..a355c61e621d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -19,10 +19,12 @@ import java.io.IOException; import java.util.Collections; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; @@ -35,6 +37,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Predicate; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; /** * Implementation of a log cleaner that checks if a log is still scheduled for replication before @@ -43,7 +46,8 @@ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class ReplicationLogCleaner extends BaseLogCleanerDelegate { private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class); - private ZKWatcher zkw; + private ZKWatcher zkw = null; + private boolean shareZK = false; private ReplicationQueueStorage queueStorage; private boolean stopped = false; private Set wals; @@ -92,12 +96,20 @@ public boolean apply(FileStatus file) { } @Override - public void setConf(Configuration config) { - // Make my own Configuration. Then I'll have my own connection to zk that - // I can close myself when comes time. - Configuration conf = new Configuration(config); + public void init(Map params) { + super.init(params); try { - setConf(conf, new ZKWatcher(conf, "replicationLogCleaner", null)); + if (MapUtils.isNotEmpty(params)) { + Object master = params.get(HMaster.MASTER); + if (master != null && master instanceof HMaster) { + zkw = ((HMaster) master).getZooKeeper(); + shareZK = true; + } + } + if (zkw == null) { + zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null); + } + this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); } catch (IOException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } @@ -118,7 +130,7 @@ public void setConf(Configuration conf, ZKWatcher zk) { public void stop(String why) { if (this.stopped) return; this.stopped = true; - if (this.zkw != null) { + if (!shareZK && this.zkw != null) { LOG.info("Stopping " + this.zkw); this.zkw.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 1947ed90200e..43b04ff7bad7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -187,7 +187,7 @@ public void testLogCleaning() throws Exception { // 10 procedure WALs assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); - LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL); + LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null); cleaner.chore(); // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which @@ -281,7 +281,7 @@ public void testOnConfigurationChange() throws Exception { Server server = new DummyServer(); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); - LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL); + LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null); int size = cleaner.getSizeOfCleaners(); assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, cleaner.getCleanerThreadTimeoutMsec()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java index b82f6f26c845..6a596a6f138c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java @@ -72,7 +72,7 @@ public void stop(String why) { public boolean isStopped() { return stopped; } - }, conf, fs, globalWALArchiveDir, logCleanerPool); + }, conf, fs, globalWALArchiveDir, logCleanerPool, null); choreService.scheduleChore(logCleaner); }