Skip to content

Commit

Permalink
get matrics for fast verification per orchestrator (livepeer#2397)
Browse files Browse the repository at this point in the history
We should be able to determine the fast verification success/failure rate for each O
  • Loading branch information
oscar-davids authored and ad-astra-video committed May 25, 2022
1 parent 0580610 commit 48122e5
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
14 changes: 8 additions & 6 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,14 +755,14 @@ func InitCensus(nodeType NodeType, version string) {
Name: "fast_verification_done",
Measure: census.mFastVerificationDone,
Description: "Number of fast verifications done",
TagKeys: baseTagsWithManifestID,
TagKeys: append([]tag.Key{census.kOrchestratorURI}, baseTagsWithManifestID...),
Aggregation: view.Count(),
},
{
Name: "fast_verification_failed",
Measure: census.mFastVerificationFailed,
Description: "Number of fast verifications failed",
TagKeys: baseTagsWithManifestID,
TagKeys: append([]tag.Key{census.kOrchestratorURI}, baseTagsWithManifestID...),
Aggregation: view.Count(),
},
{
Expand Down Expand Up @@ -1617,16 +1617,18 @@ func fracwei2gwei(wei *big.Rat) float64 {
return floatWei / gweiConversionFactor
}

func FastVerificationDone(ctx context.Context) {
func FastVerificationDone(ctx context.Context, uri string) {
if err := stats.RecordWithTags(census.ctx,
manifestIDTag(ctx), census.mFastVerificationDone.M(1)); err != nil {
manifestIDTag(ctx, tag.Insert(census.kOrchestratorURI, uri)),
census.mFastVerificationDone.M(1)); err != nil {
clog.Errorf(ctx, "Error recording metrics err=%q", err)
}
}

func FastVerificationFailed(ctx context.Context) {
func FastVerificationFailed(ctx context.Context, uri string) {
if err := stats.RecordWithTags(census.ctx,
manifestIDTag(ctx), census.mFastVerificationFailed.M(1)); err != nil {
manifestIDTag(ctx, tag.Insert(census.kOrchestratorURI, uri)),
census.mFastVerificationFailed.M(1)); err != nil {
clog.Errorf(ctx, "Error recording metrics err=%q", err)
}
}
13 changes: 7 additions & 6 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,18 +517,19 @@ func (bsm *BroadcastSessionsManager) chooseResults(ctx context.Context, submitRe
// verify untrusted hashes
var sessionsToSuspend []*BroadcastSession
for _, untrustedResult := range untrustedResults {
ouri := untrustedResult.Session.Transcoder()
untrustedHash, err := drivers.GetSegmentData(ctx, untrustedResult.TranscodeResult.Segments[segmToCheckIndex].PerceptualHashUrl)
if err != nil {
err = fmt.Errorf("error downloading perceptual hash from url=%s err=%w",
err = fmt.Errorf("error uri=%s downloading perceptual hash from url=%s err=%w", ouri,
untrustedResult.TranscodeResult.Segments[segmToCheckIndex].PerceptualHashUrl, err)
return nil, nil, err
}
equal, err := ffmpeg.CompareSignatureByBuffer(trustedHash, untrustedHash)
if monitor.Enabled {
monitor.FastVerificationDone(ctx)
monitor.FastVerificationDone(ctx, ouri)
}
if err != nil {
clog.Errorf(ctx, "error comparing perceptual hashes from url=%s err=%q",
clog.Errorf(ctx, "error uri=%s comparing perceptual hashes from url=%s err=%q", ouri,
untrustedResult.TranscodeResult.Segments[segmToCheckIndex].PerceptualHashUrl, err)
}
clog.Infof(ctx, "Hashes from url=%s and url=%s are equal=%v",
Expand All @@ -539,13 +540,13 @@ func (bsm *BroadcastSessionsManager) chooseResults(ctx context.Context, submitRe
// download untrusted video segment
untrustedSegm, err := drivers.GetSegmentData(ctx, untrustedResult.TranscodeResult.Segments[segmToCheckIndex].Url)
if err != nil {
err = fmt.Errorf("error downloading segment from url=%s err=%w",
err = fmt.Errorf("error uri=%s downloading segment from url=%s err=%w", ouri,
untrustedResult.TranscodeResult.Segments[segmToCheckIndex].Url, err)
return nil, nil, err
}
vequal, err = ffmpeg.CompareVideoByBuffer(trustedSegm, untrustedSegm)
if err != nil {
clog.Errorf(ctx, "error comparing video from url=%s err=%q",
clog.Errorf(ctx, "error uri=%s comparing video from url=%s err=%q", ouri,
untrustedResult.TranscodeResult.Segments[segmToCheckIndex].Url, err)
return nil, nil, err
}
Expand All @@ -563,7 +564,7 @@ func (bsm *BroadcastSessionsManager) chooseResults(ctx context.Context, submitRe
} else {
sessionsToSuspend = append(sessionsToSuspend, untrustedResult.Session)
if monitor.Enabled {
monitor.FastVerificationFailed(ctx)
monitor.FastVerificationFailed(ctx, ouri)
}
}
}
Expand Down

0 comments on commit 48122e5

Please sign in to comment.