Skip to content

Commit

Permalink
feat(archival,dslx,measurexlite): tag all observations
Browse files Browse the repository at this point in the history
  • Loading branch information
bassosimone committed May 26, 2023
1 parent 18db631 commit 715923b
Show file tree
Hide file tree
Showing 30 changed files with 539 additions and 178 deletions.
15 changes: 13 additions & 2 deletions internal/dslx/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ func DNSLookupOptionLogger(value model.Logger) DNSLookupOption {
}
}

// DNSLookupOptionTags allows to set tags to tag observations.
func DNSLookupOptionTags(value ...string) DNSLookupOption {
return func(dis *DomainToResolve) {
dis.Tags = append(dis.Tags, value...)
}
}

// DNSLookupOptionZeroTime configures the measurement's zero time.
// See DomainToResolve docs for more information.
func DNSLookupOptionZeroTime(value time.Time) DNSLookupOption {
Expand All @@ -52,6 +59,7 @@ func NewDomainToResolve(domain DomainName, options ...DNSLookupOption) *DomainTo
Domain: string(domain),
IDGenerator: &atomic.Int64{},
Logger: model.DiscardLogger,
Tags: []string{},
ZeroTime: time.Now(),
}
for _, option := range options {
Expand Down Expand Up @@ -81,6 +89,9 @@ type DomainToResolve struct {
// implemented by NewDomainToResolve uses model.DiscardLogger.
Logger model.Logger

// Tags contains OPTIONAL tags to tag observations.
Tags []string

// ZeroTime is the MANDATORY zero time of the measurement. We will
// use this field as the zero value to compute relative elapsed times
// when generating measurements. The default construction by
Expand Down Expand Up @@ -132,7 +143,7 @@ func (f *dnsLookupGetaddrinfoFunc) Apply(
ctx context.Context, input *DomainToResolve) *Maybe[*ResolvedAddresses] {

// create trace
trace := measurexlite.NewTrace(input.IDGenerator.Add(1), input.ZeroTime)
trace := measurexlite.NewTrace(input.IDGenerator.Add(1), input.ZeroTime, input.Tags...)

// start the operation logger
ol := measurexlite.NewOperationLogger(
Expand Down Expand Up @@ -195,7 +206,7 @@ func (f *dnsLookupUDPFunc) Apply(
ctx context.Context, input *DomainToResolve) *Maybe[*ResolvedAddresses] {

// create trace
trace := measurexlite.NewTrace(input.IDGenerator.Add(1), input.ZeroTime)
trace := measurexlite.NewTrace(input.IDGenerator.Add(1), input.ZeroTime, input.Tags...)

// start the operation logger
ol := measurexlite.NewOperationLogger(
Expand Down
15 changes: 14 additions & 1 deletion internal/dslx/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/ooni/probe-cli/v3/internal/mocks"
"github.com/ooni/probe-cli/v3/internal/model"
)
Expand Down Expand Up @@ -35,6 +36,7 @@ func TestNewDomainToResolve(t *testing.T) {
DNSLookupOptionIDGenerator(idGen),
DNSLookupOptionLogger(model.DiscardLogger),
DNSLookupOptionZeroTime(zt),
DNSLookupOptionTags("antani"),
)
if domainToResolve.Domain != "www.example.com" {
t.Fatalf("unexpected domain")
Expand All @@ -48,6 +50,9 @@ func TestNewDomainToResolve(t *testing.T) {
if domainToResolve.ZeroTime != zt {
t.Fatalf("unexpected zerotime")
}
if diff := cmp.Diff([]string{"antani"}, domainToResolve.Tags); diff != "" {
t.Fatal(diff)
}
})
})
}
Expand All @@ -73,13 +78,14 @@ func TestGetaddrinfo(t *testing.T) {
Domain: "example.com",
Logger: model.DiscardLogger,
IDGenerator: &atomic.Int64{},
Tags: []string{"antani"},
ZeroTime: time.Time{},
}

t.Run("with nil resolver", func(t *testing.T) {
f := dnsLookupGetaddrinfoFunc{}
ctx, cancel := context.WithCancel(context.Background())
cancel()
cancel() // immediately cancel the lookup
res := f.Apply(ctx, domain)
if res.Observations == nil || len(res.Observations) <= 0 {
t.Fatal("unexpected empty observations")
Expand Down Expand Up @@ -130,6 +136,9 @@ func TestGetaddrinfo(t *testing.T) {
if len(res.State.Addresses) != 1 || res.State.Addresses[0] != "93.184.216.34" {
t.Fatal("unexpected addresses")
}
if diff := cmp.Diff([]string{"antani"}, res.State.Trace.Tags()); diff != "" {
t.Fatal(diff)
}
})
})
}
Expand All @@ -155,6 +164,7 @@ func TestLookupUDP(t *testing.T) {
Domain: "example.com",
Logger: model.DiscardLogger,
IDGenerator: &atomic.Int64{},
Tags: []string{"antani"},
ZeroTime: time.Time{},
}

Expand Down Expand Up @@ -214,6 +224,9 @@ func TestLookupUDP(t *testing.T) {
if len(res.State.Addresses) != 1 || res.State.Addresses[0] != "93.184.216.34" {
t.Fatal("unexpected addresses")
}
if diff := cmp.Diff([]string{"antani"}, res.State.Trace.Tags()); diff != "" {
t.Fatal(diff)
}
})
})
}
11 changes: 11 additions & 0 deletions internal/dslx/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type Endpoint struct {
// Network is the MANDATORY endpoint network.
Network string

// Tags contains OPTIONAL tags for tagging observations.
Tags []string

// ZeroTime is the MANDATORY zero time of the measurement.
ZeroTime time.Time
}
Expand Down Expand Up @@ -66,6 +69,13 @@ func EndpointOptionLogger(value model.Logger) EndpointOption {
}
}

// EndpointOptionTags allows to set tags to tag observations.
func EndpointOptionTags(value ...string) EndpointOption {
return func(es *Endpoint) {
es.Tags = append(es.Tags, value...)
}
}

// EndpointOptionZeroTime allows to set the zero time.
func EndpointOptionZeroTime(value time.Time) EndpointOption {
return func(es *Endpoint) {
Expand All @@ -92,6 +102,7 @@ func NewEndpoint(
IDGenerator: &atomic.Int64{},
Logger: model.DiscardLogger,
Network: string(network),
Tags: []string{},
ZeroTime: time.Now(),
}
for _, option := range options {
Expand Down
5 changes: 5 additions & 0 deletions internal/dslx/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/ooni/probe-cli/v3/internal/model"
)

Expand All @@ -21,6 +22,7 @@ func TestEndpoint(t *testing.T) {
EndpointOptionIDGenerator(idGen),
EndpointOptionLogger(model.DiscardLogger),
EndpointOptionZeroTime(zt),
EndpointOptionTags("antani"),
)
if testEndpoint.Network != "network" {
t.Fatalf("unexpected network")
Expand All @@ -40,5 +42,8 @@ func TestEndpoint(t *testing.T) {
if testEndpoint.ZeroTime != zt {
t.Fatalf("unexpected zero time")
}
if diff := cmp.Diff([]string{"antani"}, testEndpoint.Tags); diff != "" {
t.Fatal(diff)
}
})
}
37 changes: 35 additions & 2 deletions internal/dslx/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package dslx

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/ooni/probe-cli/v3/internal/measurexlite"
"github.com/ooni/probe-cli/v3/internal/mocks"
"github.com/ooni/probe-cli/v3/internal/model"
Expand Down Expand Up @@ -89,7 +92,7 @@ func TestHTTPRequest(t *testing.T) {
}
idGen := &atomic.Int64{}
zeroTime := time.Time{}
trace := measurexlite.NewTrace(idGen.Add(1), zeroTime)
trace := measurexlite.NewTrace(idGen.Add(1), zeroTime, "antani")

t.Run("with EOF", func(t *testing.T) {
httpTransport := HTTPTransport{
Expand Down Expand Up @@ -159,6 +162,35 @@ func TestHTTPRequest(t *testing.T) {
}
})

// makeSureObservationsContainTags ensures the observations you can extract from
// the given HTTPResponse contain the tags we configured when testing
makeSureObservationsContainTags := func(res *Maybe[*HTTPResponse]) error {
// exclude the case where there was an error
if res.Error != nil {
return fmt.Errorf("unexpected error: %w", res.Error)
}

// obtain the observations
for _, obs := range ExtractObservations(res) {

// check the network events
for _, ev := range obs.NetworkEvents {
if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" {
return errors.New(diff)
}
}

// check the HTTP events
for _, ev := range obs.Requests {
if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" {
return errors.New(diff)
}
}
}

return nil
}

t.Run("with success (https)", func(t *testing.T) {
httpTransport := HTTPTransport{
Address: "1.2.3.4:443",
Expand All @@ -178,6 +210,7 @@ func TestHTTPRequest(t *testing.T) {
if res.State.HTTPResponse == nil || res.State.HTTPResponse.Status != "expected" {
t.Fatal("unexpected request")
}
makeSureObservationsContainTags(res)
})

t.Run("with success (http)", func(t *testing.T) {
Expand All @@ -199,6 +232,7 @@ func TestHTTPRequest(t *testing.T) {
if res.State.HTTPResponse == nil || res.State.HTTPResponse.Status != "expected" {
t.Fatal("unexpected request")
}
makeSureObservationsContainTags(res)
})

t.Run("with header options", func(t *testing.T) {
Expand Down Expand Up @@ -239,7 +273,6 @@ func TestHTTPRequest(t *testing.T) {
t.Fatal("unexpected URL path", res.State.HTTPRequest.URL.Path)
}
})

})
}

Expand Down
8 changes: 7 additions & 1 deletion internal/dslx/httpcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,19 @@ func (f *httpRequestFunc) do(
) (*http.Response, []byte, []*Observations, error) {
const maxbody = 1 << 19 // TODO(bassosimone): allow to configure this value?
started := input.Trace.TimeSince(input.Trace.ZeroTime)

// manually create a single 1-length observations structure because
// the trace cannot automatically capture HTTP events
observations := []*Observations{
NewObservations(),
} // one entry
}

observations[0].NetworkEvents = append(observations[0].NetworkEvents,
measurexlite.NewAnnotationArchivalNetworkEvent(
input.Trace.Index,
started,
"http_transaction_start",
input.Trace.Tags()...,
))

resp, err := input.Transport.RoundTrip(req)
Expand All @@ -312,6 +316,7 @@ func (f *httpRequestFunc) do(
input.Trace.Index,
finished,
"http_transaction_done",
input.Trace.Tags()...,
))

observations[0].Requests = append(observations[0].Requests,
Expand All @@ -328,6 +333,7 @@ func (f *httpRequestFunc) do(
body,
err,
finished,
input.Trace.Tags()...,
))

return resp, body, observations, err
Expand Down
2 changes: 1 addition & 1 deletion internal/dslx/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type quicHandshakeFunc struct {
func (f *quicHandshakeFunc) Apply(
ctx context.Context, input *Endpoint) *Maybe[*QUICConnection] {
// create trace
trace := measurexlite.NewTrace(input.IDGenerator.Add(1), input.ZeroTime)
trace := measurexlite.NewTrace(input.IDGenerator.Add(1), input.ZeroTime, input.Tags...)

// use defaults or user-configured overrides
serverName := f.serverName(input)
Expand Down
Loading

0 comments on commit 715923b

Please sign in to comment.