diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index c67cb37ac95f..abdd6e4be6ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -151,26 +151,32 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, long blocksize) throws IOException, StreamLacksCapabilityException { - this.conf = conf; - boolean doCompress = initializeCompressionContext(conf, path); - this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); - int bufferSize = CommonFSUtils.getDefaultBufferSize(fs); - short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", - CommonFSUtils.getDefaultReplication(fs, path)); - - initOutput(fs, path, overwritable, bufferSize, replication, blocksize); - - boolean doTagCompress = doCompress - && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); - length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf, - WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)))); - - initAfterHeader(doCompress); - - // instantiate trailer to default value. - trailer = WALTrailer.newBuilder().build(); - if (LOG.isTraceEnabled()) { - LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress); + try { + this.conf = conf; + boolean doCompress = initializeCompressionContext(conf, path); + this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); + int bufferSize = CommonFSUtils.getDefaultBufferSize(fs); + short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", + CommonFSUtils.getDefaultReplication(fs, path)); + + initOutput(fs, path, overwritable, bufferSize, replication, blocksize); + + boolean doTagCompress = + doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); + length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf, + WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)))); + + initAfterHeader(doCompress); + + // instantiate trailer to default value. + trailer = WALTrailer.newBuilder().build(); + if (LOG.isTraceEnabled()) { + LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress); + } + } catch (Exception e) { + LOG.warn("Init output failed, path={}", path, e); + closeOutput(); + throw e; } } @@ -237,6 +243,11 @@ protected void writeWALTrailer() { protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException, StreamLacksCapabilityException; + /** + * simply close the output, do not need to write trailer like the Writer.close + */ + protected abstract void closeOutput(); + /** * return the file length after written. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index ec7d3259e37d..732b493d1171 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -195,6 +195,17 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu this.asyncOutputWrapper = new OutputStreamWrapper(output); } + @Override + protected void closeOutput() { + if (this.output != null) { + try { + this.output.close(); + } catch (IOException e) { + LOG.warn("Close output failed", e); + } + } + } + private long writeWALMetadata(Consumer> action) throws IOException { CompletableFuture future = new CompletableFuture<>(); action.accept(future); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 4bbc13d3ab88..a408eb4dd790 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -117,6 +117,17 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu } } + @Override + protected void closeOutput() { + if (this.output != null) { + try { + this.output.close(); + } catch (IOException e) { + LOG.warn("Close output failed", e); + } + } + } + @Override protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { output.write(magic); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index 8f2ca0753e2f..430449395c90 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -88,13 +88,6 @@ public static Writer createWriter(final Configuration conf, final FileSystem fs, } else { LOG.debug("Error instantiating log writer.", e); } - if (writer != null) { - try{ - writer.close(); - } catch(IOException ee){ - LOG.error("cannot close log writer", ee); - } - } throw new IOException("cannot get log writer", e); } }