Skip to content

Commit

Permalink
Fix Audit Message Logging Interceptor Race Condition (#938)
Browse files Browse the repository at this point in the history
* Fix race condition in  GrpcMessageInterceptor to revert a empty message if message cannot be recorded.

* Fix GrpcMessageInterceptor race condition by allow audit log message to be called from multiple async calls

* Revert option change

* Add CoreLoggingIT integration test to test message audit logging.

* Fix GrpcMessageInterceptor race condition by moving allowing request to be unset.

* Fix lint

* Increase wait for logs in CoreLoggingIT

* Fix to compare the correct lob JsonObject with the right response.

* Debug response

* Reduce load size to make test less flaky

* Update test to only check request and response for one call.

* Fix compile failure due to uncaught exception.

* Add method name filter to prevent logs from tests from interfering with each other.

* Add intergration test to check that message logs are produced correctly under load.

* Fix imports and log4j2 not able to find config file

* Fix issue with CoreLoggingIT TestLogAppender being null due to class not found by log4j2

* Remove unused getters in MessageAuditLogEntry.

* Update CoreLoggingIT test to check that expected contents of logs produced under load.
  • Loading branch information
mrzzy authored Sep 7, 2020
1 parent eb150a2 commit e1f6ab7
Show file tree
Hide file tree
Showing 4 changed files with 359 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ public GrpcMessageInterceptor(@Nullable SecurityProperties securityProperties) {
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
MessageAuditLogEntry.Builder entryBuilder = MessageAuditLogEntry.newBuilder();
// default response message to empty proto in log entry.
// default response/request message to empty proto in log entry.
// request could be empty when the client closes the connection before sending a request
// message.
// response could be unset when the service encounters an error when processsing the service
// call.
entryBuilder.setRequest(Empty.newBuilder().build());
entryBuilder.setResponse(Empty.newBuilder().build());

// Unpack service & method name from call
Expand Down
232 changes: 232 additions & 0 deletions core/src/test/java/feast/core/logging/CoreLoggingIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.logging;

import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.Streams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import feast.common.it.BaseIT;
import feast.common.it.DataGenerator;
import feast.common.logging.entry.AuditLogEntryKind;
import feast.proto.core.CoreServiceGrpc;
import feast.proto.core.CoreServiceGrpc.CoreServiceBlockingStub;
import feast.proto.core.CoreServiceGrpc.CoreServiceFutureStub;
import feast.proto.core.CoreServiceProto.GetFeastCoreVersionRequest;
import feast.proto.core.CoreServiceProto.ListFeatureSetsRequest;
import feast.proto.core.CoreServiceProto.ListStoresRequest;
import feast.proto.core.CoreServiceProto.ListStoresResponse;
import feast.proto.core.CoreServiceProto.UpdateStoreRequest;
import feast.proto.core.CoreServiceProto.UpdateStoreResponse;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(
properties = {
"feast.logging.audit.enabled=true",
"feast.logging.audit.messageLoggingEnabled=true",
})
public class CoreLoggingIT extends BaseIT {
private static TestLogAppender testAuditLogAppender;
private static CoreServiceBlockingStub coreService;
private static CoreServiceFutureStub asyncCoreService;

@BeforeAll
public static void globalSetUp(@Value("${grpc.server.port}") int coreGrpcPort)
throws InterruptedException, ExecutionException {
LoggerContext logContext = (LoggerContext) LogManager.getContext(false);
// NOTE: As log appender state is shared across tests use a different method
// for each test and filter by method name to ensure that you only get logs
// for a specific test.
testAuditLogAppender = logContext.getConfiguration().getAppender("TestAuditLogAppender");

// Connect to core service.
Channel channel =
ManagedChannelBuilder.forAddress("localhost", coreGrpcPort).usePlaintext().build();
coreService = CoreServiceGrpc.newBlockingStub(channel);
asyncCoreService = CoreServiceGrpc.newFutureStub(channel);

// Preflight a request to core service stubs to verify connection
coreService.getFeastCoreVersion(GetFeastCoreVersionRequest.getDefaultInstance());
asyncCoreService.getFeastCoreVersion(GetFeastCoreVersionRequest.getDefaultInstance()).get();
}

/** Check that messsage audit log are produced on service call */
@Test
public void shouldProduceMessageAuditLogsOnCall()
throws InterruptedException, InvalidProtocolBufferException {
// Generate artifical load on feast core.
UpdateStoreRequest request =
UpdateStoreRequest.newBuilder().setStore(DataGenerator.getDefaultStore()).build();
UpdateStoreResponse response = coreService.updateStore(request);

// Wait required to ensure audit logs are flushed into test audit log appender
Thread.sleep(1000);
// Check message audit logs are produced for each audit log.
JsonFormat.Parser protoJSONParser = JsonFormat.parser();
// Pull message audit logs logs from test log appender
List<JsonObject> logJsonObjects =
parseMessageJsonLogObjects(testAuditLogAppender.getLogs(), "UpdateStore");
assertEquals(1, logJsonObjects.size());
JsonObject logObj = logJsonObjects.get(0);

// Extract & Check that request/response are returned correctly
String requestJson = logObj.getAsJsonObject("request").toString();
UpdateStoreRequest.Builder gotRequest = UpdateStoreRequest.newBuilder();
protoJSONParser.merge(requestJson, gotRequest);

String responseJson = logObj.getAsJsonObject("response").toString();
UpdateStoreResponse.Builder gotResponse = UpdateStoreResponse.newBuilder();
protoJSONParser.merge(responseJson, gotResponse);

assertThat(gotRequest.build(), equalTo(request));
assertThat(gotResponse.build(), equalTo(response));
}

/** Check that message audit logs are produced when server encounters an error */
@Test
public void shouldProduceMessageAuditLogsOnError() throws InterruptedException {
// Send a bad request which should cause Core to error
ListFeatureSetsRequest request =
ListFeatureSetsRequest.newBuilder()
.setFilter(
ListFeatureSetsRequest.Filter.newBuilder()
.setProject("*")
.setFeatureSetName("nop")
.build())
.build();

boolean hasExpectedException = false;
Code statusCode = null;
try {
coreService.listFeatureSets(request);
} catch (StatusRuntimeException e) {
hasExpectedException = true;
statusCode = e.getStatus().getCode();
}
assertTrue(hasExpectedException);

// Wait required to ensure audit logs are flushed into test audit log appender
Thread.sleep(1000);
// Pull message audit logs logs from test log appender
List<JsonObject> logJsonObjects =
parseMessageJsonLogObjects(testAuditLogAppender.getLogs(), "ListFeatureSets");

assertEquals(1, logJsonObjects.size());
JsonObject logJsonObject = logJsonObjects.get(0);
// Check correct status code is tracked on error.
assertEquals(logJsonObject.get("statusCode").getAsString(), statusCode.toString());
}

/** Check that expected message audit logs are produced when under load. */
@Test
public void shouldProduceExpectedAuditLogsUnderLoad()
throws InterruptedException, ExecutionException {
// Generate artifical requests on core to simulate load.
int LOAD_SIZE = 40; // Total number of requests to send.
int BURST_SIZE = 5; // Number of requests to send at once.

ListStoresRequest request = ListStoresRequest.getDefaultInstance();
List<ListStoresResponse> responses = new LinkedList<>();
for (int i = 0; i < LOAD_SIZE; i += 5) {
List<ListenableFuture<ListStoresResponse>> futures = new LinkedList<>();
for (int j = 0; j < BURST_SIZE; j++) {
futures.add(asyncCoreService.listStores(request));
}

responses.addAll(Futures.allAsList(futures).get());
}
// Wait required to ensure audit logs are flushed into test audit log appender
Thread.sleep(1000);

// Pull message audit logs from test log appender
List<JsonObject> logJsonObjects =
parseMessageJsonLogObjects(testAuditLogAppender.getLogs(), "ListStores");
assertEquals(responses.size(), logJsonObjects.size());

// Extract & Check that request/response are returned correctly
JsonFormat.Parser protoJSONParser = JsonFormat.parser();
Streams.zip(
responses.stream(),
logJsonObjects.stream(),
(response, logObj) -> Pair.of(response, logObj))
.forEach(
responseLogJsonPair -> {
ListStoresResponse response = responseLogJsonPair.getLeft();
JsonObject logObj = responseLogJsonPair.getRight();

ListStoresRequest.Builder gotRequest = null;
ListStoresResponse.Builder gotResponse = null;
try {
String requestJson = logObj.getAsJsonObject("request").toString();
gotRequest = ListStoresRequest.newBuilder();
protoJSONParser.merge(requestJson, gotRequest);

String responseJson = logObj.getAsJsonObject("response").toString();
gotResponse = ListStoresResponse.newBuilder();
protoJSONParser.merge(responseJson, gotResponse);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}

assertThat(gotRequest.build(), equalTo(request));
assertThat(gotResponse.build(), equalTo(response));
});
}

/**
* Filter and Parse out Message Audit Logs from the given logsStrings for the given method name
*/
private List<JsonObject> parseMessageJsonLogObjects(List<String> logsStrings, String methodName) {
JsonParser jsonParser = new JsonParser();
// copy to prevent concurrent modification.
return logsStrings.stream()
.map(logJSON -> jsonParser.parse(logJSON).getAsJsonObject())
// Filter to only include message audit logs
.filter(
logObj ->
logObj
.getAsJsonPrimitive("kind")
.getAsString()
.equals(AuditLogEntryKind.MESSAGE.toString())
// filter by method name to ensure logs from other tests do not interfere with
// test
&& logObj.get("method").getAsString().equals(methodName))
.collect(Collectors.toList());
}
}
68 changes: 68 additions & 0 deletions core/src/test/java/feast/core/logging/TestLogAppender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.logging;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Core;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.PatternLayout;

/** Test Log Appender used for collecting logs for testing logging. */
@Plugin(
name = "TestLogAppender",
category = Core.CATEGORY_NAME,
elementType = Appender.ELEMENT_TYPE)
@Getter
public class TestLogAppender extends AbstractAppender {
private List<String> logs;

protected TestLogAppender(String name, Filter filter, Layout<? extends Serializable> layout) {
super(name, filter, layout, false, new Property[] {});
logs = new ArrayList<>();
}

@Override
public void append(LogEvent event) {
getLogs().add(event.getMessage().toString());
}

@PluginFactory
public static TestLogAppender createAppender(
@PluginAttribute("name") String name,
@PluginElement("Layout") Layout<? extends Serializable> layout,
@PluginElement("Filter") final Filter filter) {
if (name == null) {
return null;
}
if (layout == null) {
layout = PatternLayout.createDefaultLayout();
}
return new TestLogAppender(name, filter, layout);
}
}
53 changes: 53 additions & 0 deletions core/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2018 The Feast Authors
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ https://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<Configuration status="WARN" packages="feast.core.logging">
<Properties>
<Property name="LOG_PATTERN">
%d{yyyy-MM-dd HH:mm:ss.SSS} %5p ${hostName} --- [%15.15t] %-40.40c{1.} : %m%n%ex
</Property>
<Property name="JSON_LOG_PATTERN">
{"time":"%d{yyyy-MM-dd'T'HH:mm:ssXXX}","hostname":"${hostName}","severity":"%p","message":%m}%n%ex
</Property>
</Properties>
<Appenders>
<Console name="ConsoleAppender" target="SYSTEM_OUT" follow="true">
<MarkerFilter marker="AUDIT_MARK" onMatch="DENY" onMismatch="ACCEPT"/>
<PatternLayout pattern="${LOG_PATTERN}"/>
</Console>
<Console name="JSONAppender" target="SYSTEM_OUT" follow="true">
<MarkerFilter marker="AUDIT_MARK" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="${JSON_LOG_PATTERN}"/>
</Console>
<TestLogAppender name="TestAuditLogAppender">
<MarkerFilter marker="AUDIT_MARK" onMatch="ACCEPT" onMismatch="DENY"/>
</TestLogAppender>
</Appenders>
<Loggers>
<Logger name="feast.core" level="info" additivity="false">
<AppenderRef ref="ConsoleAppender"/>
<AppenderRef ref="JSONAppender"/>
<AppenderRef ref="TestAuditLogAppender"/>
</Logger>
<Root level="info">
<AppenderRef ref="ConsoleAppender"/>
<AppenderRef ref="JSONAppender"/>
<AppenderRef ref="TestAuditLogAppender"/>
</Root>
</Loggers>
</Configuration>

0 comments on commit e1f6ab7

Please sign in to comment.