diff --git a/pkg/oonimkall/taskrunner.go b/pkg/oonimkall/taskrunner.go index bf59e0477..4fa5785c3 100644 --- a/pkg/oonimkall/taskrunner.go +++ b/pkg/oonimkall/taskrunner.go @@ -117,6 +117,8 @@ func (r *runnerForTask) Run(rootCtx context.Context) { // // - rootCtx is the root context and is controlled by the user; // + // - loadCtx derives from rootCtx and is used to load inputs; + // // - measCtx derives from rootCtx and is possibly tied to the // maximum runtime and is used to choose when to stop measuring; // @@ -126,28 +128,36 @@ func (r *runnerForTask) Run(rootCtx context.Context) { // See https://github.com/ooni/probe/issues/2037. var logger model.Logger = newTaskLogger(r.emitter, r.settings.LogLevel) r.emitter.Emit(eventTypeStatusQueued, eventEmpty{}) + + // check whether we support the provided settings if r.hasUnsupportedSettings() { // event failureStartup already emitted return } r.emitter.Emit(eventTypeStatusStarted, eventEmpty{}) + + // create a new measurement session sess, err := r.newsession(rootCtx, logger) if err != nil { r.emitter.EmitFailureStartup(err.Error()) return } + + // make sure we emit the status.end event when we're done endEvent := new(eventStatusEnd) defer func() { _ = sess.Close() r.emitter.Emit(eventTypeStatusEnd, endEvent) }() + // create an experiment builder for the given experiment name builder, err := sess.NewExperimentBuilder(r.settings.Name) if err != nil { r.emitter.EmitFailureStartup(err.Error()) return } + // choose the proper OONI backend to use logger.Info("Looking up OONI backends... please, be patient") if err := sess.MaybeLookupBackendsContext(rootCtx); err != nil { r.emitter.EmitFailureStartup(err.Error()) @@ -155,6 +165,7 @@ func (r *runnerForTask) Run(rootCtx context.Context) { } r.emitter.EmitStatusProgress(0.1, "contacted bouncer") + // discover the probe location logger.Info("Looking up your location... please, be patient") if err := sess.MaybeLookupLocationContext(rootCtx); err != nil { r.emitter.EmitFailureGeneric(eventTypeFailureIPLookup, err.Error()) @@ -177,9 +188,10 @@ func (r *runnerForTask) Run(rootCtx context.Context) { ResolverNetworkName: sess.ResolverNetworkName(), }) + // configure the callbacks to emit events builder.SetCallbacks(&runnerCallbacks{emitter: r.emitter}) - // Load targets. + // load targets using the experiment-specific loader loader := builder.NewTargetLoader(&model.ExperimentTargetLoaderConfig{ CheckInConfig: &model.OOAPICheckInConfig{ // TODO(bassosimone,DecFox): to correctly load Web Connectivity targets @@ -197,11 +209,16 @@ func (r *runnerForTask) Run(rootCtx context.Context) { return } + // create the new experiment experiment := builder.NewExperiment() + + // make sure we account for the bytes sent and received defer func() { endEvent.DownloadedKB = experiment.KibiBytesReceived() endEvent.UploadedKB = experiment.KibiBytesSent() }() + + // open a new report if possible if !r.settings.Options.NoCollector { logger.Info("Opening report... please, be patient") if err := experiment.OpenReportContext(rootCtx); err != nil { @@ -213,11 +230,18 @@ func (r *runnerForTask) Run(rootCtx context.Context) { ReportID: experiment.ReportID(), }) } + + // create the default context for measuring measCtx, measCancel := context.WithCancel(rootCtx) defer measCancel() + + // create the default context for submitting submitCtx, submitCancel := context.WithCancel(rootCtx) defer submitCancel() + // Update measCtx and submitCtx to be timeout bound in case there's + // more than one input/target to measure. + // // This deviates a little bit from measurement-kit, for which // a zero timeout is actually valid. Since it does not make much // sense, here we're changing the behaviour. @@ -246,14 +270,19 @@ func (r *runnerForTask) Run(rootCtx context.Context) { defer cancelSubmit() } + // prepare for cycling through the targets inputCount := len(targets) start := time.Now() inflatedMaxRuntime := r.settings.Options.MaxRuntime + r.settings.Options.MaxRuntime/10 eta := start.Add(time.Duration(inflatedMaxRuntime) * time.Second) + for idx, target := range targets { + // handle the case where the time allocated for measuring has elapsed if measCtx.Err() != nil { break } + + // notify the mobile app that we are about to measure a specific target logger.Infof("Starting measurement with index %d", idx) r.emitter.Emit(eventTypeStatusMeasurementStart, eventMeasurementGeneric{ CategoryCode: target.Category(), @@ -261,6 +290,8 @@ func (r *runnerForTask) Run(rootCtx context.Context) { Idx: int64(idx), Input: target.Input(), }) + + // emit progress when there is more than one target to measure if target.Input() != "" && inputCount > 0 { var percentage float64 if r.settings.Options.MaxRuntime > 0 { @@ -274,6 +305,8 @@ func (r *runnerForTask) Run(rootCtx context.Context) { )) } + // Perform the measurement proper. + // // Richer input implementation note: in mobile, we only observe richer input // for Web Connectivity and only store this kind of input into the database and // otherwise we ignore richer input for other experiments, which are just @@ -287,11 +320,14 @@ func (r *runnerForTask) Run(rootCtx context.Context) { target, ) + // Handle the case where our time for measuring has elapsed while + // we were measuring and assume the context interrupted the measurement + // midway, so it doesn't make sense to submit it. if builder.Interruptible() && measCtx.Err() != nil { - // We want to stop here only if interruptible otherwise we want to - // submit measurement and stop at beginning of next iteration break } + + // handle the case where the measurement has failed if err != nil { r.emitter.Emit(eventTypeFailureMeasurement, eventMeasurementGeneric{ Failure: err.Error(), @@ -304,14 +340,22 @@ func (r *runnerForTask) Run(rootCtx context.Context) { // now the only valid strategy here is to continue. continue } + + // make sure the measurement contains the user-specified annotations m.AddAnnotations(r.settings.Annotations) + + // serialize the measurement to JSON (cannot fail in practice) data, err := json.Marshal(m) runtimex.PanicOnError(err, "measurement.MarshalJSON failed") + + // let the mobile app know about this measurement r.emitter.Emit(eventTypeMeasurement, eventMeasurementGeneric{ Idx: int64(idx), Input: target.Input(), JSONStr: string(data), }) + + // if possible, submit the measurement to the OONI backend if !r.settings.Options.NoCollector { logger.Info("Submitting measurement... please, be patient") err := experiment.SubmitAndUpdateMeasurementContext(submitCtx, m) @@ -323,6 +367,8 @@ func (r *runnerForTask) Run(rootCtx context.Context) { Failure: measurementSubmissionFailure(err), }) } + + // let the app know that we're done measuring this entry r.emitter.Emit(eventTypeStatusMeasurementDone, eventMeasurementGeneric{ Idx: int64(idx), Input: target.Input(),