Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add telemetry pipeline structure to new_sdk/main #2833

Closed
MrAlias opened this issue Apr 19, 2022 · 15 comments · Fixed by #3044
Closed

Add telemetry pipeline structure to new_sdk/main #2833

MrAlias opened this issue Apr 19, 2022 · 15 comments · Fixed by #3044
Assignees
Labels
area:metrics Part of OpenTelemetry Metrics pkg:SDK Related to an SDK package

Comments

@MrAlias
Copy link
Contributor

MrAlias commented Apr 19, 2022

Blocked by #2799

@MrAlias MrAlias added pkg:SDK Related to an SDK package area:metrics Part of OpenTelemetry Metrics labels Apr 19, 2022
@MrAlias MrAlias added this to the Metric SDK: Alpha milestone Apr 19, 2022
@MrAlias MrAlias moved this from Blocked to Todo in OpenTelemetry Go: Metric SDK (Alpha) Apr 20, 2022
@MrAlias MrAlias moved this from Todo to In Progress in OpenTelemetry Go: Metric SDK (Alpha) May 11, 2022
@MadVikingGod
Copy link
Contributor

Here are my ideas when conceiving of the internal view. Views seem to have two responsibilities with datastreams in the SDK:

  1. Filter and transform data points from instruments and direct the points into the correct aggregation/storage.
  2. On-demand produce all datastreams from all current aggregations/storages.

An easy optimization that can be made is that when you create an instrument the SDK should be able to query all views if the instrument matches that view or not. This will allow us to not check if the instrument matches when updating datapoints. To be able to match the instrument descriptor would need at least Name, Description, Unit, InsturmentAttributes, and any other match criteria.

When an instrument is used, eg. Add() or Observe(), it should Update() ALL Views that it matches. This method should block the caller for as little as possible. The responsibilities of Update() is to create the aggregation that results from applying the view if needed, and to update the aggregation with the provided value. The datastream Descriptor should describe the meter and the instrument that the stream belongs to, and the resolved attributes of that stream (both the instrument attributes and the attributes provided by the Observe()).

A View should be able to read from all of it's aggregations and produce the composite datastreams. This should have a similar structure to the Produce method that readers get registered with. Because this should minimize blocking of updates a snapshot should be taken of the current state of aggregations and the data read from that.

package view // import "go.opentelemetry.io/otel/sdk/metric/internal/view"

type View interface {

    // Match checks if the instrument matches this View's filter. Only
    // instruments that match should Update the view.
    Match(instrument.Descriptor) bool

    // Update is used by instruments to record a new datapoint.
    Update(datastream.Descriptor, number) 

    // Produce snapshots the current aggregations and serialize the data out
    Produce(context.Context?) (export.Metrics, error?)
    // The Context and error are only needed if this method will respect timeouts.  If this is deemed simple enough then both can be removed.

}

type SingleView struct {
    // config to do matching
    ...
    // config for transforms
    ...
    // All aggregations for datastreams
    data map[meter]map[instrument]Aggregation
}
var _ View = &SingleView{}

func NewView(cfg view.Config) *SingleView

To be able to support multiple views we should have an object that can both distribute and aggregate data for different SingleViews

type MultiView struct {
    views []SingleView
}

func NewMultiView(configs ...view.Config) *MultiView

@MadVikingGod
Copy link
Contributor

From the SIG discussion modify the interface as such

type View interface {

    // Match checkes if the instrument matches this View's filter. Only
    // instruments that match should Update the view.
    Match(instrument.Descriptor) Updater

   // Produce snapshots the current aggregations and serialize the data out
    Produce(context.Context?) (export.Metrics, error?)
    // The Context and error are only needed if this method will respect timeouts.  If this is deemed simple enough then both can be removed.
}

type Updater interface {
    // Update is used by instruments to record a new datapoint.
    Update([]attribute.KeyValue, number) // could be attribute.Set
}

@jmacd
Copy link
Contributor

jmacd commented May 13, 2022

I'm concerned that the Produce() method here doesn't know which Reader to produce data for. How would that be handled here? I'm not sure how you'll handle the differences between asynchronous vs. synchronous instruments in the collect/produce path. (Edit: I see how this will work, but this appears to increase code path length, see below.)

I'm concerned that one View returning export.Metrics means that the caller will have to merge Metrics together. In my branch, the signature of the similar thing over a set of Views is Collect(sequence Sequence, output *[]Instrument). For one view clause, it could be Collect(Sequence) Instrument, Each view should use the same timestamp, so I imagine you'll also pass in something similar to Sequence too?

I don't think Produce() should return an error. The only potential error I see is failure to allocate memory, where it's OK to panic IMO.

@jmacd
Copy link
Contributor

jmacd commented May 13, 2022

I think it would be good for us, in the future, if we were comparing end-to-end implementations that we could benchmark while having these discussions. Looking at the API above, I think it's possible to structure the SDK this way but I see a loss of performance or two.

First, the organization in #2865 uses one sync.Map lookup per synchronous operation. Even if you replace []attribute.KeyValue with attribute.Set to pay the cost of sorting/deduplicating once, the operation to lookup the attribute.Set in the map is expensive and with the APIs above I think you're going to perform the lookup once per View. This means synchronous instrument performance is expected to grow with the number of registered readers, which is not the case in my branch.

Another minor optimization in my branch is that the return type used for Instruments and Accumulators, particularly for synchronous instruments (where performance really matters), will not use the "multi" form when there is only a single behavior. This means, when there is only one Reader and only one View behavior, which I expect to the be the common case, the SDK will not have to allocate size-1 slices to store a single instrument and stream, because a single interface is used for the synchronous code path regardless of the number of view behaviors.

@jmacd
Copy link
Contributor

jmacd commented May 13, 2022

Here is a correctness question: In the API sketched above, it looks like each View has no awareness of the other views. The reason I have a single view compiler object is that there's a shared namespace by all View clauses in a single Meter. When one View is compiled, in order to know about the conflicts you have to know about the other Views. Again I think this is a reason for the organization in #2865.

@MadVikingGod
Copy link
Contributor

Let me start with your correctness comment by setting some bounds. I think that this only is an issue when you have multiple views tied to one reader, and furthermore, it is only a true error when creating instruments. In a single view the only way to have a conflict is to have a wildcard match and rename (which the spec says you shouldn't do). If a reader were to have multiple views then we can detect at creation of a MultiView if they might conflict, eg. specific name and * or rename matches name/*, but that is only an error if instruments are created that cause the conflict (the first case is way more problematic). I think the solution here is to put an error on the New methods.

As for merging multiple export.Metrics, no the reader wouldn't have that, again this is a responsibility of the MultiView. You only get different streams from a Produce if you have multiple views. When there is no conflict this is just merging InstrumentationScopes + Instruments. When there is a conflict we do need to define behavior (I don't think this is defined in the spec). For that, I would suggest we have the conflicting data in the output stream. What does it mean to have two instruments with the same name and data streams with overlapping times? A conflict in your instruments.

@jmacd
Copy link
Contributor

jmacd commented May 13, 2022

it is only a true error when creating instruments

Yes. I just don't see how the Views will know about the conflicting names. The specification does say what a conflict is, and it can happen with or without multiple views. For example, if a View renames "foo" to "bar" and there is already a "bar" instrument or "bar" created by a different view, you've got a conflict at the moment when one of the instruments is created; The view that's performing renames needs a way to see the names produced by the other views, right?

@jmacd
Copy link
Contributor

jmacd commented May 13, 2022

For that, I would suggest we have the conflicting data in the output stream.

That is the specified behavior. I'm asking how the Instrument constructor will show a conflict to the user when there are multiple views defined for a single reader and a single instrumentation library.

@jmacd
Copy link
Contributor

jmacd commented May 13, 2022

Just so the specification text is included, I am referring to https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#opentelemetry-protocol-data-model-producer-recommendations

4. Generally, for potential conflicts involving an identifying
   property (i.e., all properties except `description`), the producer
   SHOULD inform the user of a semantic error and pass through
   conflicting data.

The point is that if multiple View behaviors ever create a conflicting definition for the same metric name inside a scope, the user should have seen an error when they registered the instrument.

@MadVikingGod
Copy link
Contributor

For example, if a View renames "foo" to "bar" and there is already a "bar" instrument

This is only a conflict if they will exist in the same data stream. Just their existence doesn't necessarily mean that they are a true conflict. If those different datastreams, created by different views, go to different readers (and destinations) then there will be a bar in one system that came from foo, and a bar in a different system that came from bar. If they go to different readers, but the same destination this is no different from two applications sending the same instrument with different semantics.

If a conflict happens within a multiview, then the logic should be contained there, and maybe some helper functions to understand what the name of the instrument will be after processing with the view. But the point is there is a place that knows of the relevant views for a datastream, but it doesn't have to know about ALL views.

@jmacd
Copy link
Contributor

jmacd commented May 13, 2022

This is only a conflict if they will exist in the same data stream.

I see. I think it would help me to see an end-to-end implementation that uses this interface.

@jmacd jmacd closed this as completed May 13, 2022
@jmacd jmacd reopened this May 13, 2022
Repository owner moved this from In Progress to Done in OpenTelemetry Go: Metric SDK (Alpha) May 13, 2022
@MadVikingGod
Copy link
Contributor

I've worked on a working example of how this can be put together, and I think there are a few proposals I want to make to this.

First, I think we need to put a constraint on the public view, view.Config in the example, but I will use View to represent this idea. Because this is the locus of knowledge of both matching and transformations I think it should have methods that reflect that, eg:

  • func (v *View) Transform(instrument.Descriptor) instrument.Descriptor - this could return an empty if it doesn't match or a bool also
  • func (v *View) FilterAttributes([]attributes.KeyValue) []attributes.KeyValue //Or attribute.Set
  • func (v *View) GetAggregation(instrument.Descriptor) aggregation.Kind

This would allow the MeterProvider, the current object that knows all instruments, to find conflicting instruments and return errors.

That would mean that what we are working with in this internal package has three responsibilities:

  1. Create new space in memory (aggregations) to hold datastreams updates, and provide the instrument with access to the update method
  2. Track all of the allocated memory ([]aggregations)
  3. Gather all of the aggregate data, and marshal it into an exportable format (export.Metrics)
  4. (optional) To make the input fast most of the time when you (3)Gather it should take a snapshot of the update datastream before doing any compute intensives transforms.
package viewstate

type AttributeFilter func([]attributes.KeyValue) []attributes.KeyValue

struct Pipeline {
    accumulators []aggregator.Aggregator
}

// GetAggregator Creates an Aggregator of Kind for the instrument.Description and AttributeFilter.  
func (p *Pipeline) GetAggregator(aggregator.Kind, instrument.Description, AttributeFilter) Updator

// Produce will aggregate all data-streams recorded and return the aggregated Metrics.
func (p *Pipeline) Produce() export.Metrics

type Updator interface {
    Update([]attribute.KeyValue, number)
}

This will also put requirements on the aggregator package,

package aggregator

// Kind is the type of aggregator that should be used.  This should be mapped to 
type Kind int
//... Not shown the enum creation

type Aggregator interface {
    // Update adds a single data point to the working memory of the aggregator.
    Update([]attribute.KeyValue, number)
    // Snapshot (optional) caputres data from working memory and prepares space for new working memory.
    Snapshot()
    // Accumulate processes data points in snapshot memory into the aggregated form.
    Accumulate() export.Instrument // This is analgious to the [proto Metric](https://github.com/open-telemetry/opentelemetry-proto/blob/c31b2d8fd2e84684fa09de862e0fd7c3aa95f7ed/opentelemetry/proto/metrics/v1/metrics.proto#L207) or the example's [reader.Instrument](https://github.com/open-telemetry/opentelemetry-go/blob/8b6f0c6c4841d67e9d82562a6be0f3f3bb5c8844/sdk/metric/reader/reader.go#L114)
}

The one explicit thing this doesn't solve is that all aggregators aren't locked at the same time, so there is no way to synchronize the snapshot of all that would happen at the start of Produce and updates. The only way to solve this is to serialize all writes, and snapshots in one pipeline. This doesn't seem to happen in other languages

@MrAlias MrAlias moved this from Done to In Progress in OpenTelemetry Go: Metric SDK (Alpha) May 24, 2022
@MrAlias MrAlias changed the title Add sdk/metric/internal/viewstate structure to new_sdk/main Add telemetry pipeline structure to new_sdk/main Jun 23, 2022
@MadVikingGod
Copy link
Contributor

When creating an instrument there are four things that need to happen, not necessarily in this order:

  1. Use the readers and views to select which readers it should be considered, and apply instrument transforms (name, description, and aggregation). This functionality is provided by readers and views currently.
  2. Reserve some memory to store updates. This functionally is provided by the internal.Aggregation currently.
  3. Wire the instrument, e.g. syncint64.Counter, to the Aggregate() portion of the Aggregator. ( Implement the stubbed sync instruments #2814, Implement the stubbed async instruments #2815 )
  4. Wire the reader, via its producer, to the Aggregation() portion of the Aggregator. (This issue)

To solve number 4 the simplest way would be for the MeterProviders to store a reference to each Meter (scope), which stores all created instruments, and iterate through each. This has two downsides:

  1. Every time a reader collects, we have to iterate through All meters, and for every instrument check if there is a view that it matches.
  2. We won't know if a transformed instrument collides until a collection happens.

A more optimized solution would be to calculate the transformed instrument (1) and store that information in a way that's easy to query. This would solve both the previous solutions.

What would this look like?

type pipeline struct {
	sync.Mutex
	// aggregations are structured for easy in creating the export.ResourceMetrics in produce.
	aggregations map[instrumentation.Scope]map[instrumentKey]aggregator

	resource *resource.Resource
}

// addAggregator adds an instrument's aggregator to the pipeline.  If a duplicate instrument is registered an error is returned.
func (p *pipeline) addAggregator(scope, instrumentKey, aggregator) error

// produce returns aggregated metrics from a single collection.
func (p *pipeline) produce(ctx context.Context) (export.ResourceMetrics, error) {

@MadVikingGod MadVikingGod linked a pull request Aug 2, 2022 that will close this issue
@MrAlias
Copy link
Contributor Author

MrAlias commented Aug 11, 2022

The structure has been added. Implementation is the ready and started.

@MrAlias MrAlias closed this as completed Aug 11, 2022
Repository owner moved this from In Progress to Done in OpenTelemetry Go: Metric SDK (Alpha) Aug 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:metrics Part of OpenTelemetry Metrics pkg:SDK Related to an SDK package
Projects
No open projects
Development

Successfully merging a pull request may close this issue.

3 participants