-
Notifications
You must be signed in to change notification settings - Fork 207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
File source improvements and change to event model #601
File source improvements and change to event model #601
Conversation
Codecov Report
@@ Coverage Diff @@
## main #601 +/- ##
============================================
- Coverage 91.70% 91.59% -0.11%
- Complexity 571 584 +13
============================================
Files 72 74 +2
Lines 1747 1784 +37
Branches 145 148 +3
============================================
+ Hits 1602 1634 +32
- Misses 113 115 +2
- Partials 32 35 +3
Continue to review full report at Codecov.
|
A source plugin to read input data from the specified file path. | ||
|
||
- path (String): absolute input data file path | ||
A source plugin to read input data from the specified file path. The file source creates a new Record for eadh line of data in the file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: each
* `json`: Reads data that is in the form of a JSON string from a file. If the json string is unable to be parsed, the file source will treat it as a plaintext line. | ||
|
||
|
||
* type (String): The Event type that will be stored in the metadata of the Event. Default is `event`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: `type`
{ "message": "Example log line in file" } | ||
``` | ||
|
||
* `json`: Reads data that is in the form of a JSON string from a file. If the json string is unable to be parsed, the file source will treat it as a plaintext line. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the code change, maybe we should specify that this format expects json lines as follows
{ "key1": "val1" }
{ "key2": "val2" }
{ "key3": "val3" }
instead of a single json blob
|
||
@DataPrepperPluginConstructor | ||
public FileSource(final FileSourceConfig fileSourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) { | ||
Objects.requireNonNull(fileSourceConfig.getFilePathToRead(), "File path is required"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: we can tighten our validation on file path by Files.exists(...) check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Looks like the DataPrepperTests are using a file source and that's failing the test. It's a little weird that this was being used but I will make the change to it. |
@@ -1,8 +1,7 @@ | |||
# this configuration file is solely for testing formatting | |||
test-pipeline-1: | |||
source: | |||
file: | |||
path: "/tmp/file-source.tmp" | |||
stdin: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the failure traceback with file source here? Asking b/c we might run into the failure again when stdin is being migrated to use Event model
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a problem with how the plugin is being loaded and the fact that I changed to using @DataPrepperPluginConstructor
and PluginSetting is not being used anymore. StdinSink still uses PluginSetting so it works. We should have a separate issue for this bug as these tests should be refactored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved. Validation of path existing was breaking the test
} | ||
|
||
jacocoTestCoverageVerification { | ||
dependsOn jacocoTestReport | ||
violationRules { | ||
rule { //in addition to core projects rule | ||
limit { | ||
minimum = 0.90 | ||
minimum = 0.89 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to lower code coverage here?
|
||
|
||
|
||
* `type` (String): The Event type that will be stored in the metadata of the Event. Default is `event`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type
is very generic. Let's call this even_type
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to call it record_type
for now as I am using this type to support both event
and string
|
||
* `path` (String): absolute input data file path. It is required | ||
|
||
* `write_timeout` (int): The amount of time to attempt writing a Record to the Buffer before timing out. Unit is milliseconds and default is `5,000` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can a user disable the timeout? Is there a maximum or minimum value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just going to make this unconfigurable for now. It is too coupled to Data Prepper internals, and it takes away from the current use case of the FileSource, which is to test and get things up and running quickly
public String getType() { | ||
return type; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's introduce validation to this config. We should be able to do this while we are waiting on JSR-303. Check out the HttpSourceConfig for an example.
|
||
final List<Record<Event>> expectedEventsPlain = new ArrayList<>(); | ||
final List<Record<Event>> expectedEventsJson = new ArrayList<>(); | ||
final List<Record<Event>> expectedEventsInvalidJson = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current way you are instantiating and adding to this list is problemmatic. I would recommend removing the final modifier from these class variables so you can reassign the values for each test as part ofthe @BeforeEach
method. This will ensure your unit tests are passing some left over state through the class variables.
structuredLine.put(MESSAGE_KEY, line); | ||
break; | ||
default: | ||
LOG.error("The file source type is [{}]. It must be \"json\" or \"plain\"", fileSourceConfig.getType().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to create an event with no data. Is that what we want?
Introducing config validation will prevent this.
} | ||
|
||
private Record<Event> getEventRecordFromLine(final String line) { | ||
Map<String, Object> structuredLine = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can't be final
it's getting assigned in the JSON case. I could pass it as an argument but I like how it is now better.
Signed-off-by: Taylor Gray <[email protected]>
Signed-off-by: Taylor Gray <[email protected]>
…or json and plaintext Signed-off-by: Taylor Gray <[email protected]>
Signed-off-by: Taylor Gray <[email protected]>
…repperTest Signed-off-by: Taylor Gray <[email protected]>
…ts call Signed-off-by: Taylor Gray <[email protected]>
Signed-off-by: Taylor Gray <[email protected]>
f1d8751
to
187a3f3
Compare
``` | ||
|
||
|
||
* `record_type` (String): The Event type that will be stored in the metadata of the Event. Default is `event`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this, even though it is temporary.
I'd generally prefer to keep the default as string
since that is the default behavior. But, since this is more for test and non-Trace Analytics pipelines, it could be a reasonable breaking change. If the default is event
, the release notes should state this, and how to get past it. #618 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I can make the default string, but will have to temporarily change some grok documentation, which is less painful in the end than making it a breaking change.
@@ -14,6 +14,7 @@ plugins { | |||
} | |||
dependencies { | |||
api project(':data-prepper-api') | |||
implementation project(':data-prepper-plugins:blocking-buffer') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you only need this for the tests. It can be made to be testImplementation
and should sit with the other testImplementation
dependencies.
final HashMap<String, Object> integerHashMap = new HashMap<>(); | ||
integerHashMap.put("buffer_size", 2); | ||
integerHashMap.put("batch_size", 2); | ||
final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap) {{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an anti-pattern which I'm hoping to get away from in Data Prepper. This is creating an anonymous class with a static initializer. It is better to to break this into two statements - one for new
and the other to setPipelineName
.
You can probably even create a mock which is better.
Signed-off-by: Taylor Gray <[email protected]>
Description
@DataPrepperPluginConstructor
format
andtype
configuration options.type
configuration will be added to theeventType
of the eventMetada. Once conditionals are implemented, this could be useful for users to route their data depending on thetype
format
configuration is an enum that allows for eitherjson
orplain
lines to be read from a file. Theplain
format will add amessage
key to the Event. Thismessage
, or whatever else is decided, should be a standard for Data Prepper going forward when dealing with plaintext.Issues Resolved
Closes #600
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.