diff --git a/server/src/main/java/org/elasticsearch/common/logging/LogConfigurator.java b/server/src/main/java/org/elasticsearch/common/logging/LogConfigurator.java index 6f8358e113d40..c6b4f0d0427a3 100644 --- a/server/src/main/java/org/elasticsearch/common/logging/LogConfigurator.java +++ b/server/src/main/java/org/elasticsearch/common/logging/LogConfigurator.java @@ -48,6 +48,8 @@ import java.io.IOException; import java.io.InputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.nio.file.FileVisitOption; import java.nio.file.FileVisitResult; import java.nio.file.Files; @@ -242,6 +244,11 @@ public FileVisitResult visitFile(final Path file, final BasicFileAttributes attr + "log4j2.properties but will stop this behavior in 7.0. You should manually replace `%node_name` with " + "`[%node_name]%marker ` in these locations:\n {}", deprecatedLocationsString); } + + // Redirect stdout/stderr to log4j. While we ensure Elasticsearch code does not write to those streams, + // third party libraries may do that + System.setOut(new PrintStream(new LoggingOutputStream(LogManager.getLogger("stdout"), Level.INFO), false, StandardCharsets.UTF_8)); + System.setOut(new PrintStream(new LoggingOutputStream(LogManager.getLogger("stderr"), Level.WARN), false, StandardCharsets.UTF_8)); } private static void configureStatusLogger() { diff --git a/server/src/main/java/org/elasticsearch/common/logging/LoggingOutputStream.java b/server/src/main/java/org/elasticsearch/common/logging/LoggingOutputStream.java new file mode 100644 index 0000000000000..72d4b34da702d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/logging/LoggingOutputStream.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.logging; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +/** + * A stream whose output is sent to the configured logger, line by line. + */ +class LoggingOutputStream extends OutputStream { + /** The starting length of the buffer */ + static final int DEFAULT_BUFFER_LENGTH = 1024; + + // limit a single log message to 64k + static final int MAX_BUFFER_LENGTH = DEFAULT_BUFFER_LENGTH * 64; + + class Buffer { + + /** The buffer of bytes sent to the stream */ + byte[] bytes = new byte[DEFAULT_BUFFER_LENGTH]; + + /** Number of used bytes in the buffer */ + int used = 0; + } + + // each thread gets its own buffer so messages don't get garbled + ThreadLocal threadLocal = ThreadLocal.withInitial(Buffer::new); + + private final Logger logger; + + private final Level level; + + LoggingOutputStream(Logger logger, Level level) { + this.logger = logger; + this.level = level; + } + + @Override + public void write(int b) throws IOException { + if (threadLocal == null) { + throw new IOException("buffer closed"); + } + if (b == 0) return; + if (b == '\n') { + // always flush with newlines instead of adding to the buffer + flush(); + return; + } + + Buffer buffer = threadLocal.get(); + + if (buffer.used == buffer.bytes.length) { + if (buffer.bytes.length >= MAX_BUFFER_LENGTH) { + // don't let the buffer get infinitely big + flush(); + // we reset the buffer in flush so get the new instance + buffer = threadLocal.get(); + } else { + // extend the buffer + buffer.bytes = Arrays.copyOf(buffer.bytes, 2 * buffer.bytes.length); + } + } + + buffer.bytes[buffer.used++] = (byte) b; + } + + @Override + public void flush() { + Buffer buffer = threadLocal.get(); + if (buffer.used == 0) return; + log(new String(buffer.bytes, 0, buffer.used, StandardCharsets.UTF_8)); + if (buffer.bytes.length != DEFAULT_BUFFER_LENGTH) { + threadLocal.set(new Buffer()); // reset size + } else { + buffer.used = 0; + } + } + + @Override + public void close() { + threadLocal = null; + } + + // pkg private for testing + void log(String msg) { + logger.log(level, msg); + } +} diff --git a/server/src/test/java/org/elasticsearch/common/logging/LoggingOutputStreamTests.java b/server/src/test/java/org/elasticsearch/common/logging/LoggingOutputStreamTests.java new file mode 100644 index 0000000000000..0787ff78fafe7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/logging/LoggingOutputStreamTests.java @@ -0,0 +1,114 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.logging; + +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.common.logging.LoggingOutputStream.DEFAULT_BUFFER_LENGTH; +import static org.elasticsearch.common.logging.LoggingOutputStream.MAX_BUFFER_LENGTH; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class LoggingOutputStreamTests extends ESTestCase { + + class TestLoggingOutputStream extends LoggingOutputStream { + List lines = new ArrayList<>(); + + TestLoggingOutputStream() { + super(null, null); + } + + @Override + void log(String msg) { + lines.add(msg); + } + } + + TestLoggingOutputStream loggingStream; + PrintStream printStream; + + @Before + public void createStream() { + loggingStream = new TestLoggingOutputStream(); + printStream = new PrintStream(loggingStream, false, StandardCharsets.UTF_8); + } + + public void testEmptyLine() { + printStream.println(""); + assertTrue(loggingStream.lines.isEmpty()); + printStream.flush(); + assertTrue(loggingStream.lines.isEmpty()); + } + + public void testNull() { + printStream.write(0); + printStream.flush(); + assertTrue(loggingStream.lines.isEmpty()); + } + + public void testFlushOnNewline() { + printStream.println("hello"); + printStream.println("world"); + assertThat(loggingStream.lines, contains("hello", "world")); + } + + public void testBufferExtension() { + String longStr = randomAlphaOfLength(DEFAULT_BUFFER_LENGTH); + String extraLongStr = randomAlphaOfLength(DEFAULT_BUFFER_LENGTH + 1); + printStream.println(longStr); + assertThat(loggingStream.threadLocal.get().bytes.length, equalTo(DEFAULT_BUFFER_LENGTH)); + printStream.println(extraLongStr); + assertThat(loggingStream.lines, contains(longStr, extraLongStr)); + assertThat(loggingStream.threadLocal.get().bytes.length, equalTo(DEFAULT_BUFFER_LENGTH)); + } + + public void testMaxBuffer() { + String longStr = randomAlphaOfLength(MAX_BUFFER_LENGTH); + String extraLongStr = longStr + "OVERFLOW"; + printStream.println(longStr); + printStream.println(extraLongStr); + assertThat(loggingStream.lines, contains(longStr, longStr, "OVERFLOW")); + } + + public void testClosed() { + loggingStream.close(); + IOException e = expectThrows(IOException.class, () -> loggingStream.write('a')); + assertThat(e.getMessage(), containsString("buffer closed")); + } + + public void testThreadIsolation() throws Exception { + printStream.print("from thread 1"); + Thread thread2 = new Thread(() -> { + printStream.println("from thread 2"); + }); + thread2.start(); + thread2.join(); + printStream.flush(); + assertThat(loggingStream.lines, contains("from thread 2", "from thread 1")); + } +}