-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[aggregator] m3aggregator with pass-through functionality #2105
base: master
Are you sure you want to change the base?
Conversation
glide.lock
Outdated
@@ -278,7 +278,7 @@ imports: | |||
subpackages: | |||
- regexp | |||
- name: github.com/magiconair/properties | |||
version: de8848e004dd33dc07a2947b3d76f618a7fc7ef1 | |||
version: 7757cc9fdb852f7579b24170bcacda2c7471bb6a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need this change?
callStart := agg.nowFn() | ||
agg.metrics.passThrough.Inc(1) | ||
|
||
if agg.state != aggregatorOpen { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
@@ -74,6 +76,9 @@ type Aggregator interface { | |||
// AddForwarded adds a forwarded metric with metadata. | |||
AddForwarded(metric aggregated.ForwardedMetric, metadata metadata.ForwardMetadata) error | |||
|
|||
// AddPassThrough add a pass-through metric with metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: period at the end of the comment.
src/aggregator/integration/client.go
Outdated
metric aggregated.Metric, | ||
metadata metadata.TimedMetadata, | ||
) error { | ||
// Siyu? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need to delete this?
// passThroughWriter writes passthrough metrics to backends. | ||
type passThroughWriter struct { | ||
numShards int | ||
writers []writer.Writer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make a new writer.Writer w/ the locking inside it? will make this simpler to read.
sp policy.StoragePolicy, | ||
callback m3msg.Callbackable, | ||
) { | ||
// The type of a pass-through metric does not matter as it is written directly into m3db. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, doesn't the incoming metric have a type?
Value: value, | ||
} | ||
metadata := metadata.TimedMetadata{ | ||
AggregationID: aggregation.MustCompressTypes(aggregation.Last), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make a const for this rather than do this at every invocation
} | ||
|
||
if err := aggregator.AddPassThrough(metric, metadata); err != nil { | ||
log.Info("[FAIL] to write pass-through metric", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you remove these logs?
|
||
if err := aggregator.AddPassThrough(metric, metadata); err != nil { | ||
log.Info("[FAIL] to write pass-through metric", | ||
zap.String("metric", metric.String()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use zap.Stringer
in cases like this
callback.Callback(m3msg.OnRetriableError) | ||
} | ||
|
||
if s != nil && s.Sample() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should definitely remove these logs.
} | ||
} | ||
|
||
type addPassThroughError struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't see any uses of this type, do you need it?
5ba28fb
to
20390d8
Compare
var multiErr xerrors.MultiError | ||
for i := 0; i < w.numShards; i++ { | ||
w.locks[i].Lock() | ||
multiErr = multiErr.Add(w.writers[i].Close()) | ||
w.locks[i].Unlock() | ||
} | ||
|
||
if multiErr.Empty() { | ||
return nil | ||
} | ||
|
||
return fmt.Errorf("failed to close sharded writer: %v", multiErr.FinalError()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably start to standardize on go.uber.org/multierr and github.com/pkg/errors as that's what we use everywhere else now.
This then becomes:
var err error
for i := 0; i < w.numShards; i++ {
w.locks[i].Lock()
err = multierr.Append(err, w.writers[i].Close())
w.locks[i].Unlock()
}
return errors.WithMessage(err, "failed to close sharded writer")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
talked offline, added errors.WithMessage
w.Lock() | ||
defer w.Unlock() | ||
|
||
if w.closed { | ||
return errShardedWriterClosed | ||
} | ||
w.closed = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on breaking reads of closed
out into a helper to simplify mutex usage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm need the lock for the duration of the functions in all three: Write(), Close() and Flush(). Don't see the formulation you're thinking of here.
sp policy.StoragePolicy, | ||
callback m3msg.Callbackable, | ||
) { | ||
// The type of a pass-through metric does not matter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(just update this docstring)
agg.RLock() | ||
if agg.state != aggregatorOpen { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth making this a helper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
closed bool | ||
|
||
numShards int | ||
writers []Writer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: no spacing except for embeds/non-embeds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done for this instance but don't always agree: sometimes i break things into groups based on vars that are mutated & the locks that guard them.
PassThroughTopicName *string `yaml:"passThroughTopicName"` | ||
|
||
// The number of passthrough writers | ||
NumPassThroughWriters *int `yaml:"numPassThroughWriters"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these need to be nullable? Can we just use defaults + config population?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
for _, handler := range flushCfg.Handlers { | ||
if handler.DynamicBackend != nil && c.PassThroughTopicName != nil { | ||
handler.DynamicBackend.Producer.Writer.TopicName = *c.PassThroughTopicName | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... not the most friendly code, esp for how simple it is. IMO further datapoints to not use config pointers if we can avoid it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
talked offline: going to leave as is because the changes are pretty pervasive to make handler not be a pointer type.
@@ -56,6 +63,12 @@ func Serve( | |||
defer httpServer.Close() | |||
log.Infof("http server: listening on %s", httpAddr) | |||
|
|||
if err := m3msgServer.ListenAndServe(); err != nil { | |||
return fmt.Errorf("could not start m3msg server at %s: %v", m3msgAddr, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errors.WithMessage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
041df7c
to
c439889
Compare
Codecov Report
@@ Coverage Diff @@
## master #2105 +/- ##
========================================
- Coverage 65.5% 64.4% -1.2%
========================================
Files 961 1008 +47
Lines 85488 86916 +1428
========================================
- Hits 56021 55983 -38
- Misses 25367 27017 +1650
+ Partials 4100 3916 -184
Continue to review full report at Codecov.
|
d1cbd2a
to
6749a21
Compare
sp policy.StoragePolicy, | ||
callback m3msg.Callbackable, | ||
) { | ||
// The type of a pass-through metric does not matter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(just update this docstring)
@@ -27,6 +27,13 @@ import ( | |||
httpserver "github.com/m3db/m3/src/aggregator/server/http" | |||
rawtcpserver "github.com/m3db/m3/src/aggregator/server/rawtcp" | |||
"github.com/m3db/m3/src/x/instrument" | |||
"github.com/m3db/m3/src/x/server" | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove space
"github.com/m3db/m3/src/cmd/services/m3aggregator/config" | ||
"github.com/m3db/m3/src/cmd/services/m3aggregator/serve" | ||
xconfig "github.com/m3db/m3/src/x/config" | ||
"github.com/m3db/m3/src/x/config/configflag" | ||
"github.com/m3db/m3/src/x/etcd" | ||
"github.com/m3db/m3/src/x/instrument" | ||
"github.com/m3db/m3/src/x/server" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this space (even though you didn't add it)
834e25e
to
99f874f
Compare
e130b7e
to
3cf877e
Compare
Siyu Yang seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
What this PR does / why we need it:
This PR introduces the pass-through functionality into m3aggregator.
Special notes for your reviewer:
This is a PR squashing commits in #1802
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: