diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index 42335a52b981..a56a31a5a632 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -166,38 +166,43 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, long blocksize, StreamSlowMonitor monitor) throws IOException, StreamLacksCapabilityException { - this.conf = conf; - boolean doCompress = initializeCompressionContext(conf, path); - this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); - int bufferSize = CommonFSUtils.getDefaultBufferSize(fs); - short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", - CommonFSUtils.getDefaultReplication(fs, path)); - - initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor); - - boolean doTagCompress = doCompress && - conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); - boolean doValueCompress = doCompress && - conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); - WALHeader.Builder headerBuilder = WALHeader.newBuilder() - .setHasCompression(doCompress) - .setHasTagCompression(doTagCompress) - .setHasValueCompression(doValueCompress); - if (doValueCompress) { - headerBuilder.setValueCompressionAlgorithm( - CompressionContext.getValueCompressionAlgorithm(conf).ordinal()); - } - length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, - buildWALHeader(conf, headerBuilder))); + try { + this.conf = conf; + boolean doCompress = initializeCompressionContext(conf, path); + this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); + int bufferSize = CommonFSUtils.getDefaultBufferSize(fs); + short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", + CommonFSUtils.getDefaultReplication(fs, path)); + + initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor); + + boolean doTagCompress = + doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); + boolean doValueCompress = + doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); + WALHeader.Builder headerBuilder = + WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress) + .setHasValueCompression(doValueCompress); + if (doValueCompress) { + headerBuilder.setValueCompressionAlgorithm( + CompressionContext.getValueCompressionAlgorithm(conf).ordinal()); + } + length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, + buildWALHeader(conf, headerBuilder))); - initAfterHeader(doCompress); + initAfterHeader(doCompress); - // instantiate trailer to default value. - trailer = WALTrailer.newBuilder().build(); + // instantiate trailer to default value. + trailer = WALTrailer.newBuilder().build(); - if (LOG.isTraceEnabled()) { - LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}" + - ", valueCompression={}", path, doCompress, doTagCompress, doValueCompress); + if (LOG.isTraceEnabled()) { + LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}" + + ", valueCompression={}", path, doCompress, doTagCompress, doValueCompress); + } + } catch (Exception e) { + LOG.warn("Init output failed, path={}", path, e); + closeOutput(); + throw e; } } @@ -265,6 +270,11 @@ protected abstract void initOutput(FileSystem fs, Path path, boolean overwritabl short replication, long blockSize, StreamSlowMonitor monitor) throws IOException, StreamLacksCapabilityException; + /** + * simply close the output, do not need to write trailer like the Writer.close + */ + protected abstract void closeOutput(); + /** * return the file length after written. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index bc2998b9fd50..fbd3882d4f73 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -47,7 +47,6 @@ import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; - import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; @@ -197,6 +196,17 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu this.asyncOutputWrapper = new OutputStreamWrapper(output); } + @Override + protected void closeOutput() { + if (this.output != null) { + try { + this.output.close(); + } catch (IOException e) { + LOG.warn("Close output failed", e); + } + } + } + private long writeWALMetadata(Consumer> action) throws IOException { CompletableFuture future = new CompletableFuture<>(); action.accept(future); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index c75998e1f868..dd586b3e0a96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -131,6 +131,17 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu } } + @Override + protected void closeOutput() { + if (this.output != null) { + try { + this.output.close(); + } catch (IOException e) { + LOG.warn("Close output failed", e); + } + } + } + @Override protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { output.write(magic); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index 95d6a24b5d21..f5c39c0edf27 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -90,13 +90,6 @@ public static Writer createWriter(final Configuration conf, final FileSystem fs, } else { LOG.debug("Error instantiating log writer.", e); } - if (writer != null) { - try{ - writer.close(); - } catch(IOException ee){ - LOG.error("cannot close log writer", ee); - } - } throw new IOException("cannot get log writer", e); } }