Skip to content

Commit

Permalink
doc: document the measurement algorithm
Browse files Browse the repository at this point in the history
No functional changes. Only adding newlines and comments.
  • Loading branch information
bassosimone committed Jun 28, 2024
1 parent 5d8e033 commit c4675c5
Showing 1 changed file with 49 additions and 3 deletions.
52 changes: 49 additions & 3 deletions pkg/oonimkall/taskrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
//
// - rootCtx is the root context and is controlled by the user;
//
// - loadCtx derives from rootCtx and is used to load inputs;
//
// - measCtx derives from rootCtx and is possibly tied to the
// maximum runtime and is used to choose when to stop measuring;
//
Expand All @@ -126,35 +128,44 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
// See https://github.com/ooni/probe/issues/2037.
var logger model.Logger = newTaskLogger(r.emitter, r.settings.LogLevel)
r.emitter.Emit(eventTypeStatusQueued, eventEmpty{})

// check whether we support the provided settings
if r.hasUnsupportedSettings() {
// event failureStartup already emitted
return
}
r.emitter.Emit(eventTypeStatusStarted, eventEmpty{})

// create a new measurement session
sess, err := r.newsession(rootCtx, logger)
if err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
}

// make sure we emit the status.end event when we're done
endEvent := new(eventStatusEnd)
defer func() {
_ = sess.Close()
r.emitter.Emit(eventTypeStatusEnd, endEvent)
}()

// create an experiment builder for the given experiment name
builder, err := sess.NewExperimentBuilder(r.settings.Name)
if err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
}

// choose the proper OONI backend to use
logger.Info("Looking up OONI backends... please, be patient")
if err := sess.MaybeLookupBackendsContext(rootCtx); err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
}
r.emitter.EmitStatusProgress(0.1, "contacted bouncer")

// discover the probe location
logger.Info("Looking up your location... please, be patient")
if err := sess.MaybeLookupLocationContext(rootCtx); err != nil {
r.emitter.EmitFailureGeneric(eventTypeFailureIPLookup, err.Error())
Expand All @@ -177,9 +188,10 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
ResolverNetworkName: sess.ResolverNetworkName(),
})

// configure the callbacks to emit events
builder.SetCallbacks(&runnerCallbacks{emitter: r.emitter})

// Load targets.
// load targets using the experiment-specific loader
loader := builder.NewTargetLoader(&model.ExperimentTargetLoaderConfig{
CheckInConfig: &model.OOAPICheckInConfig{
// TODO(bassosimone,DecFox): to correctly load Web Connectivity targets
Expand All @@ -197,11 +209,16 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
return
}

// create the new experiment
experiment := builder.NewExperiment()

// make sure we account for the bytes sent and received
defer func() {
endEvent.DownloadedKB = experiment.KibiBytesReceived()
endEvent.UploadedKB = experiment.KibiBytesSent()
}()

// open a new report if possible
if !r.settings.Options.NoCollector {
logger.Info("Opening report... please, be patient")
if err := experiment.OpenReportContext(rootCtx); err != nil {
Expand All @@ -213,11 +230,18 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
ReportID: experiment.ReportID(),
})
}

// create the default context for measuring
measCtx, measCancel := context.WithCancel(rootCtx)
defer measCancel()

// create the default context for submitting
submitCtx, submitCancel := context.WithCancel(rootCtx)
defer submitCancel()

// Update measCtx and submitCtx to be timeout bound in case there's
// more than one input/target to measure.
//
// This deviates a little bit from measurement-kit, for which
// a zero timeout is actually valid. Since it does not make much
// sense, here we're changing the behaviour.
Expand Down Expand Up @@ -246,21 +270,28 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
defer cancelSubmit()
}

// prepare for cycling through the targets
inputCount := len(targets)
start := time.Now()
inflatedMaxRuntime := r.settings.Options.MaxRuntime + r.settings.Options.MaxRuntime/10
eta := start.Add(time.Duration(inflatedMaxRuntime) * time.Second)

for idx, target := range targets {
// handle the case where the time allocated for measuring has elapsed
if measCtx.Err() != nil {
break
}

// notify the mobile app that we are about to measure a specific target
logger.Infof("Starting measurement with index %d", idx)
r.emitter.Emit(eventTypeStatusMeasurementStart, eventMeasurementGeneric{
CategoryCode: target.Category(),
CountryCode: target.Country(),
Idx: int64(idx),
Input: target.Input(),
})

// emit progress when there is more than one target to measure
if target.Input() != "" && inputCount > 0 {
var percentage float64
if r.settings.Options.MaxRuntime > 0 {
Expand All @@ -274,6 +305,8 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
))
}

// Perform the measurement proper.
//
// Richer input implementation note: in mobile, we only observe richer input
// for Web Connectivity and only store this kind of input into the database and
// otherwise we ignore richer input for other experiments, which are just
Expand All @@ -287,11 +320,14 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
target,
)

// Handle the case where our time for measuring has elapsed while
// we were measuring and assume the context interrupted the measurement
// midway, so it doesn't make sense to submit it.
if builder.Interruptible() && measCtx.Err() != nil {
// We want to stop here only if interruptible otherwise we want to
// submit measurement and stop at beginning of next iteration
break
}

// handle the case where the measurement has failed
if err != nil {
r.emitter.Emit(eventTypeFailureMeasurement, eventMeasurementGeneric{
Failure: err.Error(),
Expand All @@ -304,14 +340,22 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
// now the only valid strategy here is to continue.
continue
}

// make sure the measurement contains the user-specified annotations
m.AddAnnotations(r.settings.Annotations)

// serialize the measurement to JSON (cannot fail in practice)
data, err := json.Marshal(m)
runtimex.PanicOnError(err, "measurement.MarshalJSON failed")

// let the mobile app know about this measurement
r.emitter.Emit(eventTypeMeasurement, eventMeasurementGeneric{
Idx: int64(idx),
Input: target.Input(),
JSONStr: string(data),
})

// if possible, submit the measurement to the OONI backend
if !r.settings.Options.NoCollector {
logger.Info("Submitting measurement... please, be patient")
err := experiment.SubmitAndUpdateMeasurementContext(submitCtx, m)
Expand All @@ -323,6 +367,8 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
Failure: measurementSubmissionFailure(err),
})
}

// let the app know that we're done measuring this entry
r.emitter.Emit(eventTypeStatusMeasurementDone, eventMeasurementGeneric{
Idx: int64(idx),
Input: target.Input(),
Expand Down

0 comments on commit c4675c5

Please sign in to comment.