Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic attribute injection in transform processor #28573

Closed
xvilaca-te opened this issue Oct 24, 2023 · 12 comments
Closed

Dynamic attribute injection in transform processor #28573

xvilaca-te opened this issue Oct 24, 2023 · 12 comments
Labels
closed as inactive discussion needed Community discussion needed enhancement New feature or request pkg/ottl processor/transform Transform processor Stale

Comments

@xvilaca-te
Copy link

Component(s)

processor/transform

Is your feature request related to a problem? Please describe.

In our work at ThousandEyes, we developed an OpenTelemetry collector pipeline where we had to dynamically inject attributes with values retrieved from a local storage. Specifically, we needed to:

  • tag every data point with an attribute test.id in the receiver
  • in some processor, read the test.id from every data point, retrieve a matching slice of pairs (key,[val]) from a local store, and inject those pairs as attributes with keys of type string and values of type array of strings
  • keep the test.id attribute

Unfortunately, we could not find a processor that would allow us to do this, so we opted to develop our own custom processor to do this dynamic attribute injection.

Describe the solution you'd like

We would like to extend the transformprocessor with the ability to dynamically inject attributes and use this extension in our pipelines. We are open to discussion regarding how to do this, but after studying the transformprocessor we had some initial thoughts to start the discussion:

  • since reading the attributes values from a local store might be very specific to our use case and the transformprocessor is very generic, we think it makes more sense to provide a custom function that returns the slice of dynamically injected attributes so that the implementation lies on the user side
  • we could pass custom functions to the processor factory in a similar fashion to what is done with the kafka receiver unmarshallers, which worked quite well for us
  • we would need to either define a new editor or extend the existing set editor to allow for the injection of multiple attributes in the same statement

So, to summarise, our initial idea would be to support custom functions in the factory like the following example inspired by the kafkareceiver:

func NewFactory(options ...transformprocessor.FactoryOption) processor.Factory {
    opt := transformprocessor.WithCustomFunctions(dynamicAttributes...)
    return transformprocessor.NewFactory(opt...),
}

func dynamicAttributes(testId string) ([]transformprocessor.KeyValue, error) {
    attributes, err := localStore.get(testId)
    if err != nil {
       return nil, err
    }
    pairs := make([]transformprocessor.KeyValue, len(attributes))
    for i := 0; i < len(attributes); i++ {
       pairs[i] = transformprocessor.KeyValue {
           Key: attributes[i].Key,
           Value: attributes[i].Value,
       }
    }
   
    return pairs, nil
}

I'm assuming here that there is a KeyValue type (it's just an hypothetical type for the injected attributes; we can reuse) and that error handling on the processor side, but maybe it can be done on the user side if needed. (Still learning about transformprocessor, so I'm not sure of how to handle errors yet.)

Then, we would be able to build a statement like the following that would do the dynamic injection with the hypothetical add editor:

metric_statements:
    - context: datapoint
      statements:
      - add(attributes, dynamicAttributes(attributes["test.id"]))

Eventually, we could call our custom function in a variety statements like in the following example:

metric_statements:
    - context: datapoint
      statements:
      - set(description, toString(dynamicAttributes(attributes["test.id"])))
      - set(attributes["test.id"], dynamicSource(attributes["test.id"]))

I'm assuming here that there is a toString that converts the slice to a string, that I can call two functions in the same statement, and that we provided a custom function dynamicSource that returns a single string to be set as the value of attribute test.id. We may find better examples if the above assumptions do not apply.

Describe alternatives you've considered

We first considered the possibility of extending the attributesprocessor for this purpose, but after talking with the code owner, we realised that the transformprocessor would be the most suited processor.

Additional context

In our team capacity planning, we have some room for contributing to OpenTelemetry, so we are more than happy to help with the implementation once we reach an agreement regarding what should be done.

@xvilaca-te xvilaca-te added enhancement New feature or request needs triage New item requiring triage labels Oct 24, 2023
@github-actions github-actions bot added the processor/transform Transform processor label Oct 24, 2023
@github-actions
Copy link
Contributor

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@TylerHelmuth
Copy link
Member

retrieve a matching slice of pairs (key,[val]) from a local store

Can you expand more on what you mean by local store? As you stated in the issue, the transformprocessor, as a rule, should not have functions that interact with anything external to the function. If you need the function to make an http call or something we cannot use the transformprocessor.

So, to summarise, our initial idea would be to support custom functions in the factory like the following example inspired by the kafkareceiver:

At the moment the transformprocessor has a static list of functions. Although we are open to more functions, we currently have no way to inject functions at runtime: #16098

That said, I feel OTTL does fit this use case, and you are free to use OTTL in a custom component.

@TylerHelmuth TylerHelmuth added discussion needed Community discussion needed pkg/ottl and removed needs triage New item requiring triage labels Oct 24, 2023
@xvilaca-te
Copy link
Author

xvilaca-te commented Oct 24, 2023

Can you expand more on what you mean by local store? As you stated in the issue, the transformprocessor, as a rule, should not have functions that interact with anything external to the function. If you need the function to make an http call or something we cannot use the transformprocessor.

Our local store is a local key-value map in memory, but I understand that custom functions could open a hole. So, maybe we would have to consider a more controlled solution perhaps like building the storage internally, but I'm concerned that we would be tied to a specific implementation.

At the moment the transformprocessor has a static list of functions. Although we are open to more functions, we currently have no way to inject functions at runtime: #16098

Pardon my ignorance (I did not look into the implementation details; we are still in the phase of deciding what to do) but how are the functions mapped internally? Since config is read at runtime, I imagine you have something like a map built using reflection from function name to the actual function. If so, maybe you could add more entries to that map based on the provided options?

That said, I feel OTTL does fit this use case, and you are free to use OTTL in a custom component.

We already have a working pipeline in the PROD environments with this custom processor (and other custom modules) and it's working well. We just wish to contribute to the OpenTelemetry community. We are not tied to any particular processor. We are still in the process to understand where our proposal would fit best and your input is invaluable. So far, it looks like the transformprocessor is the best fit, but we can even create our own processor if that's the best option.

@TylerHelmuth
Copy link
Member

Our local store is a local key-value map in memory

In memory as in the pdata payload? Or somewhere else?

Components that use OTTL must, as part of startup, provide OTTL a map of function names (strings) and function implementations. OTTL uses this map to match incoming statements to functions.

@xvilaca-te
Copy link
Author

In memory as in the pdata payload? Or somewhere else?

In a global variable initialised by an extension.

Components that use OTTL must, as part of startup, provide OTTL a map of function names (strings) and function implementations. OTTL uses this map to match incoming statements to functions.

Correct me if I'm wrong, but this looks like it's similar to what kafkareceiver does and what would be our proposal (like in the factory example in the description):

  • They have a static set of unmarshaller functions
  • The user can provide its own custom functions
  • The receiver combines both sets of static and custom functions when the factory is invoked to create the component (so, before startup)
  • The configuration specifies which function to use as the unmarshaller based on its name
  • At runtime, the receiver applies the selected function to every read Kafka message

@TylerHelmuth
Copy link
Member

@xvilaca-te but how would you pass in the actual Go implementation of the function if not during compilation. It looks like the kafkareceiver provides access to pre-determined functions like OTTL does. Can you provide an example configuration that you're imagining for this usecase?

@xvilaca-te
Copy link
Author

xvilaca-te commented Oct 24, 2023

@xvilaca-te but how would you pass in the actual Go implementation of the function if not during compilation. It looks like the kafkareceiver provides access to pre-determined functions like OTTL does. Can you provide an example configuration that you're imagining for this usecase?

I did not explain myself well. What we do in the kafkareceiver case is to define our own receiver where we provide an implementation of the NewFactory function where we call the original NewFactory function by passing our custom implementations, and reference our extension in our build file. The custom function is compiled when we build our pipeline and linked with the static functions in compile time:

func NewFactory(options ...transformprocessor.FactoryOption) processor.Factory {
    opt := transformprocessor.WithCustomFunctions(dynamicAttributes...)
    return transformprocessor.NewFactory(opt...),
}

This is where we would reference our extension of the transformprocessor in the build file:

processors:
  - import: "github.com/thousandeyes/otel-collector/processors/transform"
    gomod: "github.com/thousandeyes/otel-collector master"

The configuration then uses the name of the function as if it were statically defined. It's not as easy as just configuring the processor, but we don't have to implement our own logic either. I believe the authors of kafkareceiver designed it this way precisely to allow users to extend it with custom functions. It does have the downside that users can do whatever they want in their custom implementations. For instance, if the users perform a remote call, then your rule is broken.

@xvilaca-te
Copy link
Author

Here is a link to where in kafkareceiver they apply the custom functions in the factory function.

@xvilaca-te
Copy link
Author

@TylerHelmuth Are the examples above clear?

@TylerHelmuth
Copy link
Member

@xvilaca-te if you are already implementing a custom receiver then I don't believe we need to get the transformprocessor involved. OTTL is a standalone package and can be used in your receiver. If your receiver defines custom functions then they can be used in your receiver's implementation of OTTL.

Copy link
Contributor

github-actions bot commented Jan 1, 2024

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@github-actions github-actions bot added the Stale label Jan 1, 2024
Copy link
Contributor

github-actions bot commented Mar 1, 2024

This issue has been closed as inactive because it has been stale for 120 days with no activity.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Mar 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
closed as inactive discussion needed Community discussion needed enhancement New feature or request pkg/ottl processor/transform Transform processor Stale
Projects
None yet
Development

No branches or pull requests

2 participants