Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(vrl): add caching feature for VRL #21348

Draft
wants to merge 24 commits into
base: master
Choose a base branch
from

Conversation

esensar
Copy link
Contributor

@esensar esensar commented Sep 24, 2024

This adds additional VRL functions for reading and storing data into caches that can be configured in global options. Caches can store any VRL value and are meant to store data for shorter periods. All data gets TTL (time-to-live) assigned, based on cache configuration and gets removed when that TTL expires.

This adds additional VRL functions for reading and storing data into caches that can be configured
in global options. Caches can store any VRL value and are meant to store data for shorter periods.
All data gets TTL (time-to-live) assigned, based on cache configuration and gets removed when that
TTL expires.
@github-actions github-actions bot added domain: topology Anything related to Vector's topology code domain: transforms Anything related to Vector's transform components labels Sep 24, 2024
@esensar
Copy link
Contributor Author

esensar commented Sep 24, 2024

@jszwedko @pront
I hope I have tagged the right people 😄

This is a very rough draft of caching feature for VRL. The idea is to have separate storage for VRL values that can be used across different VRL runs. I have opted for a very simple interface, with just cache_get and cache_set (maybe cache_put makes more sense) functions.

There are many things to consider for this:

  • Currently TTL is not implemented. The idea is to have scan_interval for each of the caches, to prevent too many cache scans for TTL. I guess this would have be run periodically anyways, but I guess it would be best to share one thread across different caches, that would then first check scan_interval to see if scanning that cache is even needed. That thread could either check them all at some static period, or some period calculated from all their intervals (greatest common divisor?).
  • It would probably be good to have some kind of customizable size limits on these caches. In that case, I assume it would be best to just remove the oldest data from the cache when it runs out of room, but just preventing new data from being added is an option too, but maybe not an obvious one for the user.
  • Since VRL runs concurrently, we can run into issues with concurrent reads and writes. I haven't done much about that right now, except for putting caches in a RwLock. Maybe there is a better way to handle this?

I hope this feature does not break some of the promises VRL gives. Let me know your thought on this and if you think this feature makes sense. If you think it does, let me know if this interface fits VRL well, or if I need to figure something else out.

@pront
Copy link
Member

pront commented Sep 24, 2024

@jszwedko @pront I hope I have tagged the right people 😄

Thank you @esensar! I will definitely review this at some point this week.

@johnhtodd
Copy link

johnhtodd commented Sep 24, 2024

(chiming in here since this is for some of the work that we need to have done, and esensar and I have talked about this concept offline)

Cache deletions: would this just be writing to a key with an empty object to imply a deletion, or should there be an explicit cache_delete that can be called?

Monitoring: One of the other things that is probably a requirement for this would be monitoring. Some ideas that could be discussed:

  • for each defined cache (which would be a tag name in the metric set)
    • TTL of cache elements as defined by cache declaration (for monitoring consumption)
    • number of current objects
    • total size of memory
    • number of successful "read" events where key was found in this cache set
    • number of bytes of successful "read" events where key was found and copied into active event
    • number of unsuccessful "read" events where key was not found in this cache set
    • number of cache expiration events (deletions) due to TTL
    • number of cache insertion or update events

Concurrency: It seems that the interval for concurrency would be very very short if there is a lock on the object for only as long as it takes to read or write to the memory space. The user would just have to understand that cache data may change/vanish between a read and a write, or between two reads. For our purposes, the cache is to prevent multiple "heavy" events from happening. Each event predictably produces the same outcome for some reasonably long period of time, but they do change state eventually. (Our environment: DNS data.)

Cache updates: having TTL be "visible" for cached objects within VRL is necessary, since I can envision a rather crude predictive cache update method that uses randomness to refresh a cache item, based on the TTL. Otherwise, if a highly-used cached object expires, then there may be many threads trying to update it at the same time which would be very wasteful and probably a heavy load that is not desirable (after all, a cached object implies that the cache is faster than some other method.) Better to apply predictive decline to the chances that an object will be refreshed.
Here's a free-form description of how to update a cache that has a 10 minute TTL when refreshed - this would be done in VRL by the person writing the logic that includes cache items: "If the TTL is less than 10 seconds, then there is a 1:10 chance that this event will also update the cache. If the TTL is less than 5 seconds, then there is a 1:5 chance that this event will also update the cache. If the TTL is less than 2 seconds, then there is a 1:2 chance that this event will also update the cache." Of course, this means that the frequency of events that query that particular cache and that particular object will dictate the chances of the cache being refreshed, but that is for the user to figure out if that's acceptable. The higher the frequency of access of that cached object, the more likely it is that an early cache refresh will prevent too many threads from being burned doing the "heavy" lookup function.

@esensar
Copy link
Contributor Author

esensar commented Sep 25, 2024

Regarding the cache deletions, I think explicit function is better (cache_delete or something like that).

When it comes to TTL updates, would it make sense for reads to also update TTL, because I guess the idea of TTL is to avoid storing data that is not needed and frequent reads would mean that the data is still used.

@pront
Copy link
Member

pront commented Sep 25, 2024

Monitoring: One of the other things that is probably a requirement for this would be monitoring.

This is a great point, we should create metrics for this. I would also argue that we should use quote as proof for the perf gains from this optimization and add them to the PR description.

@johnhtodd
Copy link

Regarding the cache deletions, I think explicit function is better (cache_delete or something like that).

Sure - I have no opinion on the method for deletions, other than it needs to be possible somehow by events within VRL contexts.

When it comes to TTL updates, would it make sense for reads to also update TTL, because I guess the idea of TTL is to avoid storing data that is not needed and frequent reads would mean that the data is still used.

I would strongly disagree here, or this would be a separate TTL. A cache entry has an expiration because it is "volatile" data that reduces in accuracy over time, and needs to be refreshed at the end of the TTL regardless of how many times it has been used. If there is a "last read" TTL and a "expiry TTL" that would be useful and I can see sometimes that both would be useful, but we cannot combine them into a single TTL that gets refreshed/reset upon read.

@esensar
Copy link
Contributor Author

esensar commented Sep 26, 2024

I would strongly disagree here, or this would be a separate TTL. A cache entry has an expiration because it is "volatile" data that reduces in accuracy over time, and needs to be refreshed at the end of the TTL regardless of how many times it has been used. If there is a "last read" TTL and a "expiry TTL" that would be useful and I can see sometimes that both would be useful, but we cannot combine them into a single TTL that gets refreshed/reset upon read.

Right, I haven't thought about that. It makes sense to just have a single TTL, but with additional VRL function to read it, to be able to better control it.

@johnhtodd
Copy link

Right, I haven't thought about that. It makes sense to just have a single TTL, but with additional VRL function to read it, to be able to better control it.

Is it necessary or desirable to have an additional VRL function to read the TTL, or should it somehow happen at the exact moment of the "cache_get"? I think it could create race conditions if another function is required, since then the fetching of the cached object itself and the TTL would not have the same timestamp.

"cache_get(,[,ttl_variable])" might work,perhaps? so then ttl_variable would be an object which would be set to the value of the TTL, and the user could define what the name of that object was.

I'm not sure that's the best way to do it, but I think simultaneous fetching of the cache object AND setting the TTL is a good idea.

@pront
Copy link
Member

pront commented Sep 27, 2024

Hey, I did a quick pass as promised. A general comment here is that the proposed change semantically is more like a "global VRL state" vs a "VRL cache".

@fuchsnj also made the following point:

Lua transforms have global state that sequential transforms can access. It really hurts performance because we can't concurrently run transforms since they can depend on previous global state.

So back to the original problem, I would expect a caching solution to be hidden from the users i.e. no new VRL functions. For example, imagine that the following line vrl_function_foo(1, 'a') returns 42. Then, next time the VRL runtime encounter this line, it would skip execution and return 42 which would make sense for idempotent functions.

@johnhtodd
Copy link

Hey, I did a quick pass as promised. A general comment here is that the proposed change semantically is more like a "global VRL state" vs a "VRL cache".

I think it can be both. From my view, the "cache" part comes from (I believe) the concept that objects in the store have a timer that can be applied, and there is a clearing process that occurs separately from any event processing that will delete objects in the store whose timers have gone below zero.

True, an object which has an exceptionally long TTL which is longer than the expected runtime of the system (or perhaps which have a special high value that is treated as infinite) would therefore be treated as permanent items, and so become "state" instead of "cache."

@fuchsnj also made the following point:

Lua transforms have global state that sequential transforms can access. It really hurts performance because we can't concurrently run transforms since they can depend on previous global state.

So back to the original problem, I would expect a caching solution to be hidden from the users i.e. no new VRL functions. For example, imagine that the following line vrl_function_foo(1, 'a') returns 42.

I'm not understanding quite how this would work. Somehow you'd have to indicate that the "vrl_function_foo" action would look at the cache, instead of doing whatever that function does in the first place. This would mean either some sort of tagging (?) to turn on or off a "look at the cache for every function from here on" method, or would mean universally applying caching to all functions, which would perhaps be useful but possibly exceptionally wasteful in memory space for things that were not desired to be cached. Imagine where "vrl_function(1,'a')" where "a" was a random string of 50 characters that may or may not ever appear again, and the event stream is 100k per second, and the result of the function was 900 bytes, and those 900 bytes change every 2 hours. This is essentially what we're trying to solve.

The proposed method using new functions would allow very specific values to be inserted into or looked up in the cache, allowing for very focused scope of memory use, and also giving more granular control over including/not including certain things on a per-event basis.

Other than using new VRL functions, the only other way I could see this working with maximum transparency would be to use specific "magic" object namespace prefixes to indicate cached data, but that is not very clean though I suppose I haven't thought about it enough.

Then, next time the VRL runtime encounter this line, it would skip execution and return 42 which would make sense for idempotent functions.

I'm not sure how a timed cache function would ever be expected to be reliably idempotent, since at some point the TTL will expire, the cache value will be removed and/or updated to a different result, and the result may differ between iterations of examination. The user would need to understand this, and make accommodations for no data appearing in the cache, so that the heavy or slow function would be then called, and the result (hopefully) stored in the cache for subsequent references over the next time window of TTL.

The original intention of this "cache" is that items which are relatively slow to access and which may have different values across some window of time that may be faster accessed in a memory store that is more lightweight than the function that generates them. There is an implicit understanding in such timed cache models that the value stored in memory is "almost as good" as the computationally costly or slow function which is used to insert the object in the cache, but that over time that value diminishes until a threshold where the item is expunged or refreshed.

Our use case requires TTL, because we sometimes will see items inserted in the cache which will only be accessed for a few times over a few minutes, and then never again. If we cannot have those items removed automatically after a time period (regardless of how many times they are used or not used) then this is effectively a catastrophic memory leak. They also need to be refreshed on occasion, as the data loses accuracy over time.

@esensar
Copy link
Contributor Author

esensar commented Sep 28, 2024

Hey, I did a quick pass as promised. A general comment here is that the proposed change semantically is more like a "global VRL state" vs a "VRL cache".

Maybe we can think of a better name for it, I agree that VRL cache might be misleading.

@fuchsnj also made the following point:

Lua transforms have global state that sequential transforms can access. It really hurts performance because we can't concurrently run transforms since they can depend on previous global state.

So back to the original problem, I would expect a caching solution to be hidden from the users i.e. no new VRL functions. For example, imagine that the following line vrl_function_foo(1, 'a') returns 42. Then, next time the VRL runtime encounter this line, it would skip execution and return 42 which would make sense for idempotent functions.

But in this case, we are directly accessing the storage in Rust code, which is behind RwLock, so at least they can run concurrently, as long as they are all reading. I assume that in case of Lua such locking had to be applied on the level of whole transform, since that code could freely access the state.

In general I think these 2 are addressing different problems. I think proper VRL caching as you described it is a bigger undertaking, because we would probably need to think about when that caching makes sense, since many of the VRL functions are very fast and would probably take a hit from cache lookups instead of getting a speedup, so we would either have to selectively apply it to some functions, or provide a way to configure it (although then that wouldn't be hidden from the users and would probably be confusing to configure).

Does adding some kind of global state to VRL (optional, it would have no effect unless user specifically calls these functions) make sense for you? Does it break any of the promises VRL makes? There is still a lot of work to be done for this PR, so I would just like to know in advance. If something like this is not an option for VRL, we can think about other solutions for slow functions, something a bit more hidden from the user (but I think it would always have to provide at least some configuration options, to ensure users can control the size of the cache).

@pront
Copy link
Member

pront commented Sep 30, 2024

Does adding some kind of global state to VRL (optional, it would have no effect unless user specifically calls these functions) make sense for you? Does it break any of the promises VRL makes? There is still a lot of work to be done for this PR, so I would just like to know in advance. If something like this is not an option for VRL, we can think about other solutions for slow functions, something a bit more hidden from the user (but I think it would always have to provide at least some configuration options, to ensure users can control the size of the cache).

If we want to pursue this idea of global state, we might benefit from an RFC. If there are other ideas that do not require such a big change we can probably avoid the RFC. In both cases, some perf stats will make a more compelling case.

@pront
Copy link
Member

pront commented Sep 30, 2024

Ideally we would build on enrichment tables which is currently used as external state for VRL. Looping in cc @lukesteensen for future discussions.

@jszwedko
Copy link
Member

jszwedko commented Oct 1, 2024

This is a very interesting feature, but I'm wondering about the use-cases it is solving to make sure it is the best solution for those use-cases since it diverges from one of VRL's design princples that calls should be non-blocking. Could you describe the use-cases you have for this feature @esensar @johnhtodd ? I think that would help us identify if adding these functions to VRL is the correct approach or if a separate transform or enrichment table would be better suited.

@lukesteensen
Copy link
Member

Yes, as @pront mentioned, I believe that extending enrichment tables is the better path here. At a high-level, there are some important characteristics of VRL that we want to maintain:

  1. Predictable performance due to never blocking on external calls
  2. No external side effects that could introduce ordering dependencies, etc
  3. Simple to execute concurrently without affecting the output relative to sequential execution

Introducing shared mutable state between VRL invocations would complicate these quite a bit. Instead, I think it would be better to separate writes from reads by putting them in different components. This dramatically simplifies the data flow and makes it clearer that state is being shared.

One way to do this would be to introduce a new component that is basically both a sink and an enrichment table. It would look similar to the reduce transform in many ways, but instead of outputting events it would make the contents of its accumulated state available to VRL components as an enrichment table, using the existing functions. This would allow you to route relevant data to the table, store it there, and then do lookups as needed. That component's config would allow tuning things like TTL, maximum size, etc.

@esensar
Copy link
Contributor Author

esensar commented Oct 2, 2024

Yes, as @pront mentioned, I believe that extending enrichment tables is the better path here. At a high-level, there are some important characteristics of VRL that we want to maintain:

1. Predictable performance due to never blocking on external calls

2. No external side effects that could introduce ordering dependencies, etc

3. Simple to execute concurrently without affecting the output relative to sequential execution

Introducing shared mutable state between VRL invocations would complicate these quite a bit. Instead, I think it would be better to separate writes from reads by putting them in different components. This dramatically simplifies the data flow and makes it clearer that state is being shared.

One way to do this would be to introduce a new component that is basically both a sink and an enrichment table. It would look similar to the reduce transform in many ways, but instead of outputting events it would make the contents of its accumulated state available to VRL components as an enrichment table, using the existing functions. This would allow you to route relevant data to the table, store it there, and then do lookups as needed. That component's config would allow tuning things like TTL, maximum size, etc.

Alright, that makes sense to me. The separate sink into an enrichment table could be feasible for this. That would mean a new kind of enrichment table, which would be stored in memory instead of files.

I will try that and see if something like that would work instead of this. Thanks for taking the time to review this.

@esensar
Copy link
Contributor Author

esensar commented Oct 3, 2024

Yes, as @pront mentioned, I believe that extending enrichment tables is the better path here. At a high-level, there are some important characteristics of VRL that we want to maintain:

1. Predictable performance due to never blocking on external calls

2. No external side effects that could introduce ordering dependencies, etc

3. Simple to execute concurrently without affecting the output relative to sequential execution

Introducing shared mutable state between VRL invocations would complicate these quite a bit. Instead, I think it would be better to separate writes from reads by putting them in different components. This dramatically simplifies the data flow and makes it clearer that state is being shared.

One way to do this would be to introduce a new component that is basically both a sink and an enrichment table. It would look similar to the reduce transform in many ways, but instead of outputting events it would make the contents of its accumulated state available to VRL components as an enrichment table, using the existing functions. This would allow you to route relevant data to the table, store it there, and then do lookups as needed. That component's config would allow tuning things like TTL, maximum size, etc.

Thinking about this a bit more, is this solution preferred just due to simplified data flow? I think we still have the same issue about sharing mutable state, it is just no longer from different VRL invocations, but different components.

If I understood that solution correctly, something like this would be implemented:

  • new enrichment_table type - stored in-memory as opposed to a file
  • new sink component that would store data in this new enrichment table - that data would come from a source, or more probably from a remap component that would prepare data for the cache
  • instead of these cache related function, enrichment table functions would be used
  • TTL, maximum size, cleanups and all of that would be implemented on the level of that new enrichment_table

Now, when it comes to writing to that table, some kind of a lock would still have to be utilized (or maybe there would be a way to do it lock-free, but I guess that would have some other limitations).

Does this sound right @lukesteensen ?

@lukesteensen
Copy link
Member

@esensar Yes, it sounds like you have the right idea. One small point of clarification (which you may already know) is that it would only be one new component, a new enrichment table type, which would behave in some ways like a sink (i.e. it would accept input events from other components), but wouldn't actually involve creating a new type of sink. It would involve some work in our topology code to support hooking up this enrichment table to other components, likely mirroring the logic for sinks.

And yes, you're right that it is still fundamentally shared state. The difference is that now we have exactly one writer (the new enrichment table component) and all VRL components are purely readers, which maintains the property that VRL scripts can be run many times, in any order, concurrently, etc, without changing the results (i.e. they behave more like pure functions without side effects). This will make it easier to avoid unexpected behavior and high levels of contention on the shared state, and implement/maintain optimizations around how we schedule VRL scripts to be run.

One potentially useful library for implementing this would be evmap, but I'm sure there are others as well. Constraining ourselves to a single writer makes the design compatible with some of these data structures that have desirable properties.

@esensar
Copy link
Contributor Author

esensar commented Oct 11, 2024

Thank you. Alright, I was initially going to add a new sink, but that approach makes sense.

Thanks for the evmap suggestion, I was looking for something like that for this cache, since we would note that it should not be used for some critical state and that it is used as a cache only, so some delay in write visibility would be alright. It would probably make sense to expose configuration for that refresh period (to periodically make writes visible, since I assume that many events would be coming in and it wouldn't make sense to constantly refresh).

This implementation is based on `evmap`, for "lock-free" reading and writing. There is still a lock
when data is refreshed, but that can be controlled, to have less interruptions.
@github-actions github-actions bot removed the domain: transforms Anything related to Vector's transform components label Oct 24, 2024
@esensar
Copy link
Contributor Author

esensar commented Oct 24, 2024

I have also moved these events and metrics from previous cache implementation to this memory enrichment table. Since most of these events contained the name of the cache (or in this case, they should contain the name of the table), I would need a way to find out the table name from inside of it. Would it make sense to provide the name of the table in the build function of EnrichmentTableConfig, or do you think it is a bad idea for that data to be provided to table implementations?

@pront
Copy link
Member

pront commented Oct 25, 2024

I have prepared the implementation of StreamSink trait for this component, but I am not sure what would be the best way to connect it in as a sink. Looking at the code in topology::builder it works with SinkOuter, which holds many things that this sink may not need, and I am also not sure how to properly generate it from MemoryConfig.

I have also moved these events and metrics from previous cache implementation to this memory enrichment table. Since most of these events contained the name of the cache (or in this case, they should contain the name of the table), I would need a way to find out the table name from inside of it. Would it make sense to provide the name of the table in the build function of EnrichmentTableConfig, or do you think it is a bad idea for that data to be provided to table implementations?

Hi @esensar, thanks for following up; @lukesteensen should be able to help with the above questions.

@lukesteensen
Copy link
Member

@esensar sorry for the delayed response! As you can probably tell, this is fundamentally a new capability so the path forward is not super straightforward.

I have prepared the implementation of StreamSink trait for this component, but I am not sure what would be the best way to connect it in as a sink. Looking at the code in topology::builder it works with SinkOuter, which holds many things that this sink may not need, and I am also not sure how to properly generate it from MemoryConfig.

I initially tried to get something similar built for this, but that would mean that the sink would be built inside that builder, but this component is already built as an enrichment table, so we would ideally just connect it both as a sink and as an enrichment table.

The most "correct" way to do this would likely be to change how enrichment tables are handled in the topology builder, adding the ability to have an (optional) set of inputs. This would be a relatively involved change, as you'd need to duplicate some of the logic from either transforms or sinks to handle the input, run the task like a sink, etc.

One option that might be simpler is to keep this as a sink component, but add the ability for sinks to register new enrichment tables. We already pass the enrichment table registry to the sink building process here, which is currently just used for checking schemas (I believe). If we passed a mutable reference to this registry (and maybe refactored some of the enrichment table registration process to methods on the registry), we could add a new optional method to SinkConfig that takes this mutable reference to the registry and can initialize the enrichment table there.

We'd still need to test how this works with things like config reloading, etc, but I think it should give you a relatively simple path to add this functionality without needing to fundamentally change how any of the topology logic works.

Since most of these events contained the name of the cache (or in this case, they should contain the name of the table), I would need a way to find out the table name from inside of it.

This might not be necessary, as the name of the component is generally added to logs and metrics output via the tracing context we add to the running task, rather than explicitly being included in the events themselves.

@esensar
Copy link
Contributor Author

esensar commented Nov 6, 2024

The most "correct" way to do this would likely be to change how enrichment tables are handled in the topology builder, adding the ability to have an (optional) set of inputs. This would be a relatively involved change, as you'd need to duplicate some of the logic from either transforms or sinks to handle the input, run the task like a sink, etc.

This is kind of the solution I was leaning towards, since, like you said, it seems the most correct. The thing that I would have to change (maybe) is that tables are currently meant to be read-only, so maybe this would require some changes in the registry (I am not 100% sure, I haven't tried hooking everything up).

One option that might be simpler is to keep this as a sink component, but add the ability for sinks to register new enrichment tables. We already pass the enrichment table registry to the sink building process here, which is currently just used for checking schemas (I believe). If we passed a mutable reference to this registry (and maybe refactored some of the enrichment table registration process to methods on the registry), we could add a new optional method to SinkConfig that takes this mutable reference to the registry and can initialize the enrichment table there.

I don't really have a complete understanding of the topology and its building process, but I think this might be an involved change too and it seems backwards to me (making this table primarily a sink which then sets its table up). Also, not sure if I understood the tables registry correctly, but it looks to me like it is set up to be loaded once and then used as read-only afterwards. Another thing is that I think the table is accessed during VRL compilation (at least I think it is), so that would probably be something to figure out if this path is chosen.

This might not be necessary, as the name of the component is generally added to logs and metrics output via the tracing context we add to the running task, rather than explicitly being included in the events themselves.

That makes sense, I was wondering how that name was missing from other events that I have checked 😄
I will remove that table name from the events, so that will no longer be needed.

Thanks for the suggestions, I will give the first suggestion a shot. There will probably be some duplication in the sink task running, but I don't think this will require everything that other sinks require. Either way, I hope to have something reviewable in the next couple of days, so we can discuss further then.

@pront
Copy link
Member

pront commented Nov 6, 2024

@esensar let me know if you are blocked on this

@esensar
Copy link
Contributor Author

esensar commented Nov 6, 2024

@esensar let me know if you are blocked on this

Thanks, I will take a shot with the above suggestion.

I was planning on adding sink for this enrichment table in the process of building enrichment tables (so when we run into this table, it would produce a sink as well). I have no idea if that will actually work out and if some different ordering is required. 😄
I will let you know if I get stuck there.

@pront
Copy link
Member

pront commented Nov 6, 2024

Awesome.

😅 Apologies for the confusion, I commented before I refreshed the page so I didn't see the new activity in this PR.

This adds a `SinkConfig` implementation for `MemoryConfig`, making it a shared configuration for
both enrichment table component and the sink component. To ensure that the exact same `Memory`
instance is used, created `Memory` is cached inside the config instance and then shared whenever one
of the `build` variants are invoked.

`Memory` is already built with sharing between different instances in mind, since it had to be
shared across different threads and it is cloneable while keeping the same backing structure for
data.

The actual sink building is a placeholder.
@github-actions github-actions bot added the domain: topology Anything related to Vector's topology code label Nov 21, 2024
@esensar
Copy link
Contributor Author

esensar commented Nov 21, 2024

I have finally made some small progress on this.

I have added a SinkConfig implementation for MemoryConfig, so it is now shared configuration for building both the enrichment table and sink component. To ensure correct behavior, I had to cache created Memory instance inside of MemoryConfig so that both sink and enrichment table work with same data (it is not the same instance, it is cloned, but underlying data managed by evmap is shared - I probably need to update these additional fields in Memory to be shared across different instances too).

Is it an issue if MemoryConfig holds state like that? Will that mess up something?

To build a sink, I have copied most of the code from build_sinks. This feels bad to me, but I haven't found an easy way to reuse that build_sinks function. I could extract code for building a sink, or maybe I could figure out a way to let enrichment tables also go through the same process as sinks (I would have to somehow generate SinkOuter from that config, which might be needed in the end anyways, since we need to define inputs somehow and we probably want to share some of the other common sink configuration options).

@pront @lukesteensen
Let me know if you have thoughts on this.

@esensar
Copy link
Contributor Author

esensar commented Nov 27, 2024

@pront
I think I am now stuck on this 😄

Maybe there is a better way to handle this, but it feels like I need to define some of the sink related stuff on MemoryConfig. I tried to reuse the sink code in topology builder, but I would need to produce a SinkOuter for that and that requires everything that a sink may have. It gets complicated fast and I feel like I am going down the wrong path with that approach.

@esensar
Copy link
Contributor Author

esensar commented Dec 4, 2024

@pront @lukesteensen
I have finally managed to get some more time to focus on this and wrap my head around the whole topology building and running process and after a lot of trial and error, I think I have this memory table correctly hooked up as a sink (at least in cases when config is not changed, I still need to handle these cases).

There are a couple if things I don't like about this:

  • The way I have created SinkOuter - while it was good for reuse purposes, I had to make more of its fields public to be able to build it and it doesn't really make much sense, but it allowed me to reuse the same code path for sinks and for enrichment tables that can act as a sink
  • There is now even more changes, to ensure that inputs are correctly hooked up and that tasks are started for enrichment tables that can act as sink (similar to code needed for sources, transforms and sinks)

More work needs to be done for this too:

  • memory table needs some kind of a size limit and also a strategy for handling cases when its memory is full
  • cover cases when config is changing (I think I didn't update topology/running for cases when enrichment table is removed)

@pront
Copy link
Member

pront commented Dec 4, 2024

Hi @esensar, I just wanted to assure you that this is still on our radar to review.

@esensar
Copy link
Contributor Author

esensar commented Dec 4, 2024

Hi @esensar, I just wanted to assure you that this is still on our radar to review.

Alright, thanks. Sorry for all the mentions, I got stuck at one point, but now I have managed to hook everything back up, but I am not too happy about the solution.

Copy link
Member

@lukesteensen lukesteensen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry again for the delayed review! It takes me an unfortunately long time to page in all of the relevant context when I try to come back to this 😅

I think what you have here looks pretty reasonable! I've left some comments about things that could potentially be simplified, or ways to adjust how we currently lay things out that may make it a bit simpler of a change, but I don't see any particular design decisions that would be a blocker.

@@ -37,24 +37,28 @@ impl Graph {
sources: &IndexMap<ComponentKey, SourceOuter>,
transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
enrichment_tables: &IndexMap<ComponentKey, EnrichmentTableOuter<String>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you wanted to simplify somewhat, I think that these tables could be added to the graph as sinks. There's not really any function difference between them in this context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would probably makes things much easier to handle. Similar to how it was done in topology builder. I will make that update.

}
}

pub fn as_sink(&self) -> Option<SinkOuter<T>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a reasonable way to do it the way things are currently designed, but it definitely shows the limitations of how we do things with SinkOuter, etc.

If I were to re-approach it, I think I would limit SinkOuter to config deserialization (and drop the generic to just String), and map to some more granular things to build the topology (e.g. something for inputs, something for healthchecks, etc). That way we could try to unify where we handle "things that have inputs", etc.

You certainly don't need to do any of that, but it could be something to explore if you'd like to make this feel cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not sure how often components such as this one would come up, but it definitely makes sense for components such as this one, that don't really fit just one type.

I can think about it, but I think that would be a too big undertaking for this PR.

@@ -444,6 +444,7 @@ impl RunningTopology {
.filter(|&(existing_sink, _)| existing_sink)
.map(|(_, key)| key.clone());

// TODO: also remove this for enrichment tables that act as sinks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would be better off disallowing buffers in front of enrichment sinks, at least to start with. They shouldn't be needed and we can avoid this particular bit of complexity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, that would require different branches for sinks and enrichment tables, since sinks always have buffers (if I understand the code correctly). Currently I have hardcoded it to always use the default buffer (which is a single memory buffer).

That might not really be that hard to do and maybe it would be a smarter thing to do, since some of the things that sink has just don't make sense for these sinks built out of the table, but I think that brings us back to the above point, of changing SinkOuter usage into some smaller components.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: topology Anything related to Vector's topology code
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants