Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-1448 - CWEL JSON output #976

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ jobs:
- name: Setup PATH
uses: microsoft/[email protected]
- id: build
run: win_build_vs.bat build /CI /S /A
run: |
PATH %PATH%;C:\Program Files (x86)\Windows Kits\10\bin\10.0.17763.0\x86
PATH %PATH%;C:\Program Files (x86)\Microsoft Visual Studio\2017\Enterprise\MSBuild\15.0\Bin\Roslyn
win_build_vs.bat build /CI /S /A
shell: cmd
windows_VS2019:
name: "windows-vs2019"
Expand All @@ -59,7 +62,10 @@ jobs:
- name: Setup PATH
uses: microsoft/[email protected]
- id: build
run: win_build_vs.bat build /2019 /64 /CI
run: |
PATH %PATH%;C:\Program Files (x86)\Windows Kits\10\bin\10.0.19041.0\x64
PATH %PATH%;C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\MSBuild\Current\Bin\Roslyn
win_build_vs.bat build /2019 /64 /CI
shell: cmd
ubuntu_16_04:
name: "ubuntu-16.04"
Expand Down
132 changes: 91 additions & 41 deletions extensions/windows-event-log/ConsumeWindowsEventLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "wel/MetadataWalker.h"
#include "wel/XMLString.h"
#include "wel/UnicodeConversion.h"
#include "wel/JSONUtils.h"

#include "io/BufferStream.h"
#include "core/ProcessContext.h"
Expand Down Expand Up @@ -134,8 +135,16 @@ core::Property ConsumeWindowsEventLog::OutputFormat(
core::PropertyBuilder::createProperty("Output Format")->
isRequired(true)->
withDefaultValue(Both)->
withAllowableValues<std::string>({XML, Plaintext, Both})->
withDescription("Set the output format type. In case \'Both\' is selected the processor generates two flow files for every event captured")->
withAllowableValues<std::string>({XML, Plaintext, Both, JSON})->
withDescription("Set the output format type. In case \'Both\' is selected the processor generates two flow files for every event captured in format XML and Plaintext")->
build());

core::Property ConsumeWindowsEventLog::JSONFormat(
core::PropertyBuilder::createProperty("JSON Format")->
isRequired(true)->
withDefaultValue(JSONSimple)->
withAllowableValues<std::string>({JSONSimple, JSONFlattened, JSONRaw})->
withDescription("Set the json format type. Only applicable if Output Format is set to 'JSON'")->
build());

core::Property ConsumeWindowsEventLog::BatchCommitSize(
Expand All @@ -162,17 +171,15 @@ core::Property ConsumeWindowsEventLog::ProcessOldEvents(
core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship for successfully consumed events.");

ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid)
: core::Processor(name, uuid), logger_(logging::LoggerFactory<ConsumeWindowsEventLog>::getLogger()), apply_identifier_function_(false), batch_commit_size_(0U) {
: core::Processor(name, uuid),
logger_(logging::LoggerFactory<ConsumeWindowsEventLog>::getLogger()) {
char buff[MAX_COMPUTERNAME_LENGTH + 1];
DWORD size = sizeof(buff);
if (GetComputerName(buff, &size)) {
computerName_ = buff;
} else {
LogWindowsError();
}

writeXML_ = false;
writePlainText_ = false;
}

void ConsumeWindowsEventLog::notifyStop() {
Expand All @@ -199,7 +206,7 @@ void ConsumeWindowsEventLog::initialize() {
//! Set the supported properties
setSupportedProperties({
Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes,
EventHeaderDelimiter, EventHeader, OutputFormat, BatchCommitSize, BookmarkRootDirectory, ProcessOldEvents
EventHeaderDelimiter, EventHeader, OutputFormat, JSONFormat, BatchCommitSize, BookmarkRootDirectory, ProcessOldEvents
});

//! Set the supported relationships
Expand Down Expand Up @@ -252,11 +259,31 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
std::string mode;
context->getProperty(OutputFormat.getName(), mode);

writeXML_ = (mode == Both || mode == XML);

writePlainText_ = (mode == Both || mode == Plaintext);
output_ = {};
if (mode == XML) {
output_.xml = true;
} else if (mode == Plaintext) {
output_.plaintext = true;
} else if (mode == Both) {
output_.xml = true;
output_.plaintext = true;
} else if (mode == JSON) {
std::string json_format;
context->getProperty(JSONFormat.getName(), json_format);
if (json_format == JSONRaw) {
output_.json.type = JSONType::Raw;
} else if (json_format == JSONSimple) {
output_.json.type = JSONType::Simple;
} else if (json_format == JSONFlattened) {
output_.json.type = JSONType::Flattened;
}
} else {
// in the future this might be considered an error, but for now due to backwards
// compatibility we just fall through and execute the processor outputing nothing
// throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Unrecognized output format: " + mode);
adamdebreceni marked this conversation as resolved.
Show resolved Hide resolved
}

if (writeXML_ && !hMsobjsDll_) {
if ((output_.xml || output_.json) && !hMsobjsDll_) {
char systemDir[MAX_PATH];
if (GetSystemDirectory(systemDir, sizeof(systemDir))) {
hMsobjsDll_ = LoadLibrary((systemDir + std::string("\\msobjs.dll")).c_str());
Expand Down Expand Up @@ -564,7 +591,7 @@ bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& e

logger_->log_debug("Finish doc traversing, performing writing...");

if (writePlainText_) {
if (output_.plaintext) {
logger_->log_trace("Writing event in plain text");

auto handler = getEventLogHandler(providerName);
Expand All @@ -583,30 +610,47 @@ bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& e
// set the delimiter
log_header.setDelimiter(header_delimiter_);
// render the header.
eventRender.rendered_text_ = log_header.getEventHeader([&walker](wel::METADATA metadata) { return walker.getMetadata(metadata); });
eventRender.rendered_text_ += "Message" + header_delimiter_ + " ";
eventRender.rendered_text_ += message;
eventRender.plaintext = log_header.getEventHeader([&walker](wel::METADATA metadata) { return walker.getMetadata(metadata); });
eventRender.plaintext += "Message" + header_delimiter_ + " ";
eventRender.plaintext += message;
}
logger_->log_trace("Finish writing in plain text");
}

if (writeXML_) {
logger_->log_trace("Writing event in XML");
if (output_.xml || output_.json) {
substituteXMLPercentageItems(doc);
logger_->log_trace("Finish substituting %% in XML");

if (resolve_as_attributes_) {
eventRender.matched_fields_ = walker.getFieldValues();
eventRender.matched_fields = walker.getFieldValues();
}
}

if (output_.xml) {
logger_->log_trace("Writing event in XML");

wel::XmlString writer;
doc.print(writer, "", pugi::format_raw); // no indentation or formatting
xml = writer.xml_;

eventRender.text_ = std::move(xml);
eventRender.xml = std::move(xml);
logger_->log_trace("Finish writing in XML");
}

if (output_.json.type == JSONType::Raw) {
logger_->log_trace("Writing event in raw JSON");
eventRender.json = wel::jsonToString(wel::toRawJSON(doc));
logger_->log_trace("Finish writing in raw JSON");
} else if (output_.json.type == JSONType::Simple) {
logger_->log_trace("Writing event in simple JSON");
eventRender.json = wel::jsonToString(wel::toSimpleJSON(doc));
logger_->log_trace("Finish writing in simple JSON");
} else if (output_.json.type == JSONType::Flattened) {
logger_->log_trace("Writing event in flattened JSON");
eventRender.json = wel::jsonToString(wel::toFlattenedJSON(doc));
logger_->log_trace("Finish writing in flattened JSON");
}

return true;
}

Expand Down Expand Up @@ -658,39 +702,45 @@ void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender&
const std::string& str_;
};

if (writeXML_) {
auto flowFile = session.create();
logger_->log_trace("Writing rendered XML to a flow file");

auto commitFlowFile = [&] (const std::shared_ptr<core::FlowFile>& flowFile, const std::string& content, const std::string& mimeType) {
{
WriteCallback wc{ eventRender.text_ };
WriteCallback wc{ content };
session.write(flowFile, &wc);
}
for (const auto &fieldMapping : eventRender.matched_fields_) {
if (!fieldMapping.second.empty()) {
session.putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
}
}
session.putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, "application/xml");
session.putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
session.putAttribute(flowFile, "Timezone name", timezone_name_);
session.putAttribute(flowFile, "Timezone offset", timezone_offset_);
session.getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
session.transfer(flowFile, Success);
}
};

if (writePlainText_) {
if (output_.xml) {
auto flowFile = session.create();
logger_->log_trace("Writing rendered plain text to a flow file");
logger_->log_trace("Writing rendered XML to a flow file");

{
WriteCallback wc{ eventRender.rendered_text_ };
session.write(flowFile, &wc);
for (const auto &fieldMapping : eventRender.matched_fields) {
if (!fieldMapping.second.empty()) {
session.putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
}
}
session.putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, "text/plain");
session.putAttribute(flowFile, "Timezone name", timezone_name_);
session.putAttribute(flowFile, "Timezone offset", timezone_offset_);
session.getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
session.transfer(flowFile, Success);

commitFlowFile(flowFile, eventRender.xml, "application/xml");
}

if (output_.plaintext) {
logger_->log_trace("Writing rendered plain text to a flow file");
commitFlowFile(session.create(), eventRender.plaintext, "text/plain");
}

if (output_.json.type == JSONType::Raw) {
logger_->log_trace("Writing rendered raw JSON to a flow file");
commitFlowFile(session.create(), eventRender.json, "application/json");
} else if (output_.json.type == JSONType::Simple) {
logger_->log_trace("Writing rendered simple JSON to a flow file");
commitFlowFile(session.create(), eventRender.json, "application/json");
} else if (output_.json.type == JSONType::Flattened) {
logger_->log_trace("Writing rendered flattened JSON to a flow file");
commitFlowFile(session.create(), eventRender.json, "application/json");
}
}

Expand Down
40 changes: 29 additions & 11 deletions extensions/windows-event-log/ConsumeWindowsEventLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ namespace minifi {
namespace processors {

struct EventRender {
std::map<std::string, std::string> matched_fields_;
std::string text_;
std::string rendered_text_;
std::map<std::string, std::string> matched_fields;
std::string xml;
std::string plaintext;
std::string json;
};

class Bookmark;
Expand Down Expand Up @@ -77,6 +78,7 @@ class ConsumeWindowsEventLog : public core::Processor {
static core::Property EventHeaderDelimiter;
static core::Property EventHeader;
static core::Property OutputFormat;
static core::Property JSONFormat;
static core::Property BatchCommitSize;
static core::Property BookmarkRootDirectory;
static core::Property ProcessOldEvents;
Expand Down Expand Up @@ -107,9 +109,13 @@ class ConsumeWindowsEventLog : public core::Processor {
bool createEventRender(EVT_HANDLE eventHandle, EventRender& eventRender);
void substituteXMLPercentageItems(pugi::xml_document& doc);

static constexpr const char * const XML = "XML";
static constexpr const char * const Both = "Both";
static constexpr const char * const Plaintext = "Plaintext";
static constexpr const char* XML = "XML";
static constexpr const char* Both = "Both";
static constexpr const char* Plaintext = "Plaintext";
static constexpr const char* JSON = "JSON";
static constexpr const char* JSONRaw = "Raw";
static constexpr const char* JSONSimple = "Simple";
static constexpr const char* JSONFlattened = "Flattened";

private:
struct TimeDiff {
Expand All @@ -132,18 +138,30 @@ class ConsumeWindowsEventLog : public core::Processor {
std::wstring wstrChannel_;
std::wstring wstrQuery_;
std::string regex_;
bool resolve_as_attributes_;
bool apply_identifier_function_;
bool resolve_as_attributes_{false};
bool apply_identifier_function_{false};
std::string provenanceUri_;
std::string computerName_;
uint64_t maxBufferSize_{};
DWORD lastActivityTimestamp_{};
std::mutex cache_mutex_;
std::map<std::string, wel::WindowsEventLogHandler > providers_;
uint64_t batch_commit_size_;
uint64_t batch_commit_size_{};

enum class JSONType {None, Raw, Simple, Flattened};

struct OutputFormat {
bool xml{false};
bool plaintext{false};
struct JSON {
JSONType type{JSONType::None};

explicit operator bool() const noexcept {
return type != JSONType::None;
}
} json;
} output_;

bool writeXML_;
bool writePlainText_;
std::unique_ptr<Bookmark> bookmark_;
std::mutex on_trigger_mutex_;
std::unordered_map<std::string, std::string> xmlPercentageItemsResolutions_;
Expand Down
10 changes: 9 additions & 1 deletion extensions/windows-event-log/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@
# under the License.
#

file(GLOB WEL_INTEGRATION_TESTS "*.cpp")
set(WEL_INTEGRATION_TESTS "BookmarkTests.cpp" "ConsumeWindowsEventLogTests.cpp" "MetadataWalkerTests.cpp")
if (TEST_CUSTOM_WEL_PROVIDER)
execute_process(COMMAND
"${CMAKE_CURRENT_LIST_DIR}/custom-provider/generate-and-register.bat"
"${CMAKE_CURRENT_LIST_DIR}/custom-provider"
)
list(APPEND WEL_INTEGRATION_TESTS "CWELCustomProviderTests.cpp")
endif()

SET(WEL_TEST_COUNT 0)
FOREACH(testfile ${WEL_INTEGRATION_TESTS})
get_filename_component(testfilename "${testfile}" NAME_WE)
Expand Down
Loading