Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26680 Close and do not write trailer for the broken WAL writer #4174

Merged
merged 2 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,38 +166,43 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
long blocksize, StreamSlowMonitor monitor) throws IOException,
StreamLacksCapabilityException {
this.conf = conf;
boolean doCompress = initializeCompressionContext(conf, path);
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
CommonFSUtils.getDefaultReplication(fs, path));

initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);

boolean doTagCompress = doCompress &&
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
boolean doValueCompress = doCompress &&
conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
WALHeader.Builder headerBuilder = WALHeader.newBuilder()
.setHasCompression(doCompress)
.setHasTagCompression(doTagCompress)
.setHasValueCompression(doValueCompress);
if (doValueCompress) {
headerBuilder.setValueCompressionAlgorithm(
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
}
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
buildWALHeader(conf, headerBuilder)));
try {
this.conf = conf;
boolean doCompress = initializeCompressionContext(conf, path);
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
CommonFSUtils.getDefaultReplication(fs, path));

initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);

boolean doTagCompress =
doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
boolean doValueCompress =
doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
WALHeader.Builder headerBuilder =
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)
.setHasValueCompression(doValueCompress);
if (doValueCompress) {
headerBuilder.setValueCompressionAlgorithm(
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
}
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
buildWALHeader(conf, headerBuilder)));

initAfterHeader(doCompress);
initAfterHeader(doCompress);

// instantiate trailer to default value.
trailer = WALTrailer.newBuilder().build();
// instantiate trailer to default value.
trailer = WALTrailer.newBuilder().build();

if (LOG.isTraceEnabled()) {
LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}" +
", valueCompression={}", path, doCompress, doTagCompress, doValueCompress);
if (LOG.isTraceEnabled()) {
LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}"
+ ", valueCompression={}", path, doCompress, doTagCompress, doValueCompress);
}
} catch (Exception e) {
LOG.warn("Init output failed, path={}", path, e);
closeOutput();
throw e;
}
}

Expand Down Expand Up @@ -265,6 +270,11 @@ protected abstract void initOutput(FileSystem fs, Path path, boolean overwritabl
short replication, long blockSize, StreamSlowMonitor monitor)
throws IOException, StreamLacksCapabilityException;

/**
* simply close the output, do not need to write trailer like the Writer.close
*/
protected abstract void closeOutput();

/**
* return the file length after written.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;

import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;

Expand Down Expand Up @@ -197,6 +196,17 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu
this.asyncOutputWrapper = new OutputStreamWrapper(output);
}

@Override
protected void closeOutput() {
if (this.output != null) {
try {
this.output.close();
} catch (IOException e) {
LOG.warn("Close output failed", e);
}
}
}

private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException {
CompletableFuture<Long> future = new CompletableFuture<>();
action.accept(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu
}
}

@Override
protected void closeOutput() {
if (this.output != null) {
try {
this.output.close();
} catch (IOException e) {
LOG.warn("Close output failed", e);
}
}
}

@Override
protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
output.write(magic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,6 @@ public static Writer createWriter(final Configuration conf, final FileSystem fs,
} else {
LOG.debug("Error instantiating log writer.", e);
}
if (writer != null) {
sunhelly marked this conversation as resolved.
Show resolved Hide resolved
try{
writer.close();
} catch(IOException ee){
LOG.error("cannot close log writer", ee);
}
}
throw new IOException("cannot get log writer", e);
}
}
Expand Down