Skip to content

Commit

Permalink
MINIFICPP-1448 CWEL JSON output
Browse files Browse the repository at this point in the history
This closes #976

Signed-off-by: Marton Szasz <[email protected]>
Co-authored-by: Marton Szasz <[email protected]>
  • Loading branch information
adebreceni and szaszm committed Jan 23, 2021
1 parent 4faf65c commit bb8ea06
Show file tree
Hide file tree
Showing 13 changed files with 846 additions and 99 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ jobs:
- name: Setup PATH
uses: microsoft/[email protected]
- id: build
run: win_build_vs.bat build /CI /S /A
run: |
PATH %PATH%;C:\Program Files (x86)\Windows Kits\10\bin\10.0.17763.0\x86
PATH %PATH%;C:\Program Files (x86)\Microsoft Visual Studio\2017\Enterprise\MSBuild\15.0\Bin\Roslyn
win_build_vs.bat build /CI /S /A
shell: cmd
windows_VS2019:
name: "windows-vs2019"
Expand All @@ -59,7 +62,10 @@ jobs:
- name: Setup PATH
uses: microsoft/[email protected]
- id: build
run: win_build_vs.bat build /2019 /64 /CI
run: |
PATH %PATH%;C:\Program Files (x86)\Windows Kits\10\bin\10.0.19041.0\x64
PATH %PATH%;C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\MSBuild\Current\Bin\Roslyn
win_build_vs.bat build /2019 /64 /CI
shell: cmd
ubuntu_16_04:
name: "ubuntu-16.04"
Expand Down
132 changes: 91 additions & 41 deletions extensions/windows-event-log/ConsumeWindowsEventLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "wel/MetadataWalker.h"
#include "wel/XMLString.h"
#include "wel/UnicodeConversion.h"
#include "wel/JSONUtils.h"

#include "io/BufferStream.h"
#include "core/ProcessContext.h"
Expand Down Expand Up @@ -134,8 +135,16 @@ core::Property ConsumeWindowsEventLog::OutputFormat(
core::PropertyBuilder::createProperty("Output Format")->
isRequired(true)->
withDefaultValue(Both)->
withAllowableValues<std::string>({XML, Plaintext, Both})->
withDescription("Set the output format type. In case \'Both\' is selected the processor generates two flow files for every event captured")->
withAllowableValues<std::string>({XML, Plaintext, Both, JSON})->
withDescription("Set the output format type. In case \'Both\' is selected the processor generates two flow files for every event captured in format XML and Plaintext")->
build());

core::Property ConsumeWindowsEventLog::JSONFormat(
core::PropertyBuilder::createProperty("JSON Format")->
isRequired(true)->
withDefaultValue(JSONSimple)->
withAllowableValues<std::string>({JSONSimple, JSONFlattened, JSONRaw})->
withDescription("Set the json format type. Only applicable if Output Format is set to 'JSON'")->
build());

core::Property ConsumeWindowsEventLog::BatchCommitSize(
Expand All @@ -162,17 +171,15 @@ core::Property ConsumeWindowsEventLog::ProcessOldEvents(
core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship for successfully consumed events.");

ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid)
: core::Processor(name, uuid), logger_(logging::LoggerFactory<ConsumeWindowsEventLog>::getLogger()), apply_identifier_function_(false), batch_commit_size_(0U) {
: core::Processor(name, uuid),
logger_(logging::LoggerFactory<ConsumeWindowsEventLog>::getLogger()) {
char buff[MAX_COMPUTERNAME_LENGTH + 1];
DWORD size = sizeof(buff);
if (GetComputerName(buff, &size)) {
computerName_ = buff;
} else {
LogWindowsError();
}

writeXML_ = false;
writePlainText_ = false;
}

void ConsumeWindowsEventLog::notifyStop() {
Expand All @@ -199,7 +206,7 @@ void ConsumeWindowsEventLog::initialize() {
//! Set the supported properties
setSupportedProperties({
Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes,
EventHeaderDelimiter, EventHeader, OutputFormat, BatchCommitSize, BookmarkRootDirectory, ProcessOldEvents
EventHeaderDelimiter, EventHeader, OutputFormat, JSONFormat, BatchCommitSize, BookmarkRootDirectory, ProcessOldEvents
});

//! Set the supported relationships
Expand Down Expand Up @@ -252,11 +259,31 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
std::string mode;
context->getProperty(OutputFormat.getName(), mode);

writeXML_ = (mode == Both || mode == XML);

writePlainText_ = (mode == Both || mode == Plaintext);
output_ = {};
if (mode == XML) {
output_.xml = true;
} else if (mode == Plaintext) {
output_.plaintext = true;
} else if (mode == Both) {
output_.xml = true;
output_.plaintext = true;
} else if (mode == JSON) {
std::string json_format;
context->getProperty(JSONFormat.getName(), json_format);
if (json_format == JSONRaw) {
output_.json.type = JSONType::Raw;
} else if (json_format == JSONSimple) {
output_.json.type = JSONType::Simple;
} else if (json_format == JSONFlattened) {
output_.json.type = JSONType::Flattened;
}
} else {
// in the future this might be considered an error, but for now due to backwards
// compatibility we just fall through and execute the processor outputing nothing
// throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Unrecognized output format: " + mode);
}

if (writeXML_ && !hMsobjsDll_) {
if ((output_.xml || output_.json) && !hMsobjsDll_) {
char systemDir[MAX_PATH];
if (GetSystemDirectory(systemDir, sizeof(systemDir))) {
hMsobjsDll_ = LoadLibrary((systemDir + std::string("\\msobjs.dll")).c_str());
Expand Down Expand Up @@ -564,7 +591,7 @@ bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& e

logger_->log_debug("Finish doc traversing, performing writing...");

if (writePlainText_) {
if (output_.plaintext) {
logger_->log_trace("Writing event in plain text");

auto handler = getEventLogHandler(providerName);
Expand All @@ -583,30 +610,47 @@ bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& e
// set the delimiter
log_header.setDelimiter(header_delimiter_);
// render the header.
eventRender.rendered_text_ = log_header.getEventHeader([&walker](wel::METADATA metadata) { return walker.getMetadata(metadata); });
eventRender.rendered_text_ += "Message" + header_delimiter_ + " ";
eventRender.rendered_text_ += message;
eventRender.plaintext = log_header.getEventHeader([&walker](wel::METADATA metadata) { return walker.getMetadata(metadata); });
eventRender.plaintext += "Message" + header_delimiter_ + " ";
eventRender.plaintext += message;
}
logger_->log_trace("Finish writing in plain text");
}

if (writeXML_) {
logger_->log_trace("Writing event in XML");
if (output_.xml || output_.json) {
substituteXMLPercentageItems(doc);
logger_->log_trace("Finish substituting %% in XML");

if (resolve_as_attributes_) {
eventRender.matched_fields_ = walker.getFieldValues();
eventRender.matched_fields = walker.getFieldValues();
}
}

if (output_.xml) {
logger_->log_trace("Writing event in XML");

wel::XmlString writer;
doc.print(writer, "", pugi::format_raw); // no indentation or formatting
xml = writer.xml_;

eventRender.text_ = std::move(xml);
eventRender.xml = std::move(xml);
logger_->log_trace("Finish writing in XML");
}

if (output_.json.type == JSONType::Raw) {
logger_->log_trace("Writing event in raw JSON");
eventRender.json = wel::jsonToString(wel::toRawJSON(doc));
logger_->log_trace("Finish writing in raw JSON");
} else if (output_.json.type == JSONType::Simple) {
logger_->log_trace("Writing event in simple JSON");
eventRender.json = wel::jsonToString(wel::toSimpleJSON(doc));
logger_->log_trace("Finish writing in simple JSON");
} else if (output_.json.type == JSONType::Flattened) {
logger_->log_trace("Writing event in flattened JSON");
eventRender.json = wel::jsonToString(wel::toFlattenedJSON(doc));
logger_->log_trace("Finish writing in flattened JSON");
}

return true;
}

Expand Down Expand Up @@ -658,39 +702,45 @@ void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender&
const std::string& str_;
};

if (writeXML_) {
auto flowFile = session.create();
logger_->log_trace("Writing rendered XML to a flow file");

auto commitFlowFile = [&] (const std::shared_ptr<core::FlowFile>& flowFile, const std::string& content, const std::string& mimeType) {
{
WriteCallback wc{ eventRender.text_ };
WriteCallback wc{ content };
session.write(flowFile, &wc);
}
for (const auto &fieldMapping : eventRender.matched_fields_) {
if (!fieldMapping.second.empty()) {
session.putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
}
}
session.putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, "application/xml");
session.putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
session.putAttribute(flowFile, "Timezone name", timezone_name_);
session.putAttribute(flowFile, "Timezone offset", timezone_offset_);
session.getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
session.transfer(flowFile, Success);
}
};

if (writePlainText_) {
if (output_.xml) {
auto flowFile = session.create();
logger_->log_trace("Writing rendered plain text to a flow file");
logger_->log_trace("Writing rendered XML to a flow file");

{
WriteCallback wc{ eventRender.rendered_text_ };
session.write(flowFile, &wc);
for (const auto &fieldMapping : eventRender.matched_fields) {
if (!fieldMapping.second.empty()) {
session.putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
}
}
session.putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, "text/plain");
session.putAttribute(flowFile, "Timezone name", timezone_name_);
session.putAttribute(flowFile, "Timezone offset", timezone_offset_);
session.getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
session.transfer(flowFile, Success);

commitFlowFile(flowFile, eventRender.xml, "application/xml");
}

if (output_.plaintext) {
logger_->log_trace("Writing rendered plain text to a flow file");
commitFlowFile(session.create(), eventRender.plaintext, "text/plain");
}

if (output_.json.type == JSONType::Raw) {
logger_->log_trace("Writing rendered raw JSON to a flow file");
commitFlowFile(session.create(), eventRender.json, "application/json");
} else if (output_.json.type == JSONType::Simple) {
logger_->log_trace("Writing rendered simple JSON to a flow file");
commitFlowFile(session.create(), eventRender.json, "application/json");
} else if (output_.json.type == JSONType::Flattened) {
logger_->log_trace("Writing rendered flattened JSON to a flow file");
commitFlowFile(session.create(), eventRender.json, "application/json");
}
}

Expand Down
40 changes: 29 additions & 11 deletions extensions/windows-event-log/ConsumeWindowsEventLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ namespace minifi {
namespace processors {

struct EventRender {
std::map<std::string, std::string> matched_fields_;
std::string text_;
std::string rendered_text_;
std::map<std::string, std::string> matched_fields;
std::string xml;
std::string plaintext;
std::string json;
};

class Bookmark;
Expand Down Expand Up @@ -77,6 +78,7 @@ class ConsumeWindowsEventLog : public core::Processor {
static core::Property EventHeaderDelimiter;
static core::Property EventHeader;
static core::Property OutputFormat;
static core::Property JSONFormat;
static core::Property BatchCommitSize;
static core::Property BookmarkRootDirectory;
static core::Property ProcessOldEvents;
Expand Down Expand Up @@ -107,9 +109,13 @@ class ConsumeWindowsEventLog : public core::Processor {
bool createEventRender(EVT_HANDLE eventHandle, EventRender& eventRender);
void substituteXMLPercentageItems(pugi::xml_document& doc);

static constexpr const char * const XML = "XML";
static constexpr const char * const Both = "Both";
static constexpr const char * const Plaintext = "Plaintext";
static constexpr const char* XML = "XML";
static constexpr const char* Both = "Both";
static constexpr const char* Plaintext = "Plaintext";
static constexpr const char* JSON = "JSON";
static constexpr const char* JSONRaw = "Raw";
static constexpr const char* JSONSimple = "Simple";
static constexpr const char* JSONFlattened = "Flattened";

private:
struct TimeDiff {
Expand All @@ -132,18 +138,30 @@ class ConsumeWindowsEventLog : public core::Processor {
std::wstring wstrChannel_;
std::wstring wstrQuery_;
std::string regex_;
bool resolve_as_attributes_;
bool apply_identifier_function_;
bool resolve_as_attributes_{false};
bool apply_identifier_function_{false};
std::string provenanceUri_;
std::string computerName_;
uint64_t maxBufferSize_{};
DWORD lastActivityTimestamp_{};
std::mutex cache_mutex_;
std::map<std::string, wel::WindowsEventLogHandler > providers_;
uint64_t batch_commit_size_;
uint64_t batch_commit_size_{};

enum class JSONType {None, Raw, Simple, Flattened};

struct OutputFormat {
bool xml{false};
bool plaintext{false};
struct JSON {
JSONType type{JSONType::None};

explicit operator bool() const noexcept {
return type != JSONType::None;
}
} json;
} output_;

bool writeXML_;
bool writePlainText_;
std::unique_ptr<Bookmark> bookmark_;
std::mutex on_trigger_mutex_;
std::unordered_map<std::string, std::string> xmlPercentageItemsResolutions_;
Expand Down
10 changes: 9 additions & 1 deletion extensions/windows-event-log/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@
# under the License.
#

file(GLOB WEL_INTEGRATION_TESTS "*.cpp")
set(WEL_INTEGRATION_TESTS "BookmarkTests.cpp" "ConsumeWindowsEventLogTests.cpp" "MetadataWalkerTests.cpp")
if (TEST_CUSTOM_WEL_PROVIDER)
execute_process(COMMAND
"${CMAKE_CURRENT_LIST_DIR}/custom-provider/generate-and-register.bat"
"${CMAKE_CURRENT_LIST_DIR}/custom-provider"
)
list(APPEND WEL_INTEGRATION_TESTS "CWELCustomProviderTests.cpp")
endif()

SET(WEL_TEST_COUNT 0)
FOREACH(testfile ${WEL_INTEGRATION_TESTS})
get_filename_component(testfilename "${testfile}" NAME_WE)
Expand Down
Loading

0 comments on commit bb8ea06

Please sign in to comment.