Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Make sure historical state events don't come down /transactions for application services (MSC2716) #221

Closed
34 changes: 34 additions & 0 deletions internal/b/blueprints.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ import (
"crypto/rand"
"encoding/hex"
"fmt"
"net"
"strconv"
"strings"
)

var (
// HostnameRunningComplement is the hostname of Complement from the perspective of a Homeserver.
HostnameRunningComplement = "host.docker.internal"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't import this from docker.HostnameRunningComplement because then we will have a circular dependency

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't mean you should make the constant here, that means you should move the logic to where we actually care about the host/port aka /docker.

)

// KnownBlueprints lists static blueprints
var KnownBlueprints = map[string]*Blueprint{
BlueprintCleanHS.Name: &BlueprintCleanHS,
Expand Down Expand Up @@ -190,6 +196,22 @@ func normaliseUser(u string, hsName string) (string, error) {
return u, nil
}

// Asks the kernel for a free open port that is ready to use.
// via https://github.com/phayes/freeport/blob/95f893ade6f232a5f1511d61735d89b1ae2df543/freeport.go#L7-L20
func getFreePort() (int, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the right place for this.

addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}

func normalizeApplicationService(as ApplicationService) (ApplicationService, error) {
hsToken := make([]byte, 32)
_, err := rand.Read(hsToken)
Expand All @@ -206,6 +228,18 @@ func normalizeApplicationService(as ApplicationService) (ApplicationService, err
as.HSToken = hex.EncodeToString(hsToken)
as.ASToken = hex.EncodeToString(asToken)

if as.URL == "" {
// Since, we're just checking and not reserving the port, we could
// potentially run into an issue where the port is no longer available when
// we actually try to bind to it later on
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests are run concurrently, so we really do care about this.

port, err := getFreePort()
if err != nil {
return as, err
}

as.URL = fmt.Sprintf("http://%s:%d", HostnameRunningComplement, port)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking, do we even care about having a random port for the application service?

Previously I just had it run on port 9111 but changed it to a random port in 06935a7

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, tests are run concurrently so we can't bind to the same port all the time.

}

return as, err
}

Expand Down
1 change: 0 additions & 1 deletion internal/b/hs_with_application_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ var BlueprintHSWithApplicationService = MustValidate(Blueprint{
ApplicationServices: []ApplicationService{
{
ID: "my_as_id",
URL: "http://localhost:9000",
SenderLocalpart: "the-bridge-user",
RateLimited: false,
},
Expand Down
155 changes: 145 additions & 10 deletions tests/msc2716_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ package tests

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"regexp"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/tidwall/gjson"

"github.com/matrix-org/complement/internal/b"
Expand Down Expand Up @@ -58,6 +61,9 @@ var createPrivateRoomOpts = map[string]interface{}{
"room_version": "org.matrix.msc2716v3",
}

// Find the URL and port of the application service in some registration yaml text
var asURLRegexp = regexp.MustCompile(`url: '(.+):(\d+)'`)

func TestImportHistoricalMessages(t *testing.T) {
deployment := Deploy(t, b.BlueprintHSWithApplicationService)
defer deployment.Destroy(t)
Expand Down Expand Up @@ -255,7 +261,7 @@ func TestImportHistoricalMessages(t *testing.T) {
})
})

t.Run("Historical events from /batch_send do not come down in an incremental sync", func(t *testing.T) {
t.Run("Historical events from batch_send do not come down in an incremental sync", func(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the / in the test name so I can actually run it individually, COMPLEMENT_ALWAYS_PRINT_SERVER_LOGS=1 COMPLEMENT_DIR=../complement ./scripts-dev/complement.sh TestImportHistoricalMessages/parallel/Historical_events_from_batch_send_do_not_come_down_in_an_incremental_sync

See https://gist.github.com/MadLittleMods/4ab08f51609fab759247f299a4e33406 for why there is a problem using a / to match a single test.

t.Parallel()

roomID := as.CreateRoom(t, createPublicRoomOpts)
Expand All @@ -269,6 +275,10 @@ func TestImportHistoricalMessages(t *testing.T) {
// Create some "live" events to saturate and fill up the /sync response
createMessagesInRoom(t, alice, roomID, 5)

// Get a /sync `since` pagination token we can try paginating from later
// on
since := doInitialSync(t, alice)

// Import a historical event
batchSendRes := batchSendHistoricalMessages(
t,
Expand All @@ -283,25 +293,139 @@ func TestImportHistoricalMessages(t *testing.T) {
)
batchSendResBody := client.ParseJSON(t, batchSendRes)
historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")
historicalEventId := historicalEventIDs[0]
historicalStateEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "state_event_ids")

// This is just a dummy event we search for after the historicalEventId
// This is just a dummy event we search for after the historicalEventIDs/historicalStateEventIDs
eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1)
eventIDAfterHistoricalImport := eventIDsAfterHistoricalImport[0]

// Sync until we find the eventIDAfterHistoricalImport.
// If we're able to see the eventIDAfterHistoricalImport that occurs after
// the historicalEventId without seeing eventIDAfterHistoricalImport in
// between, we're probably safe to assume it won't sync
alice.SyncUntil(t, "", `{ "room": { "timeline": { "limit": 3 } } }`, "rooms.join."+client.GjsonEscape(roomID)+".timeline.events", func(r gjson.Result) bool {
if r.Get("event_id").Str == historicalEventId {
t.Fatalf("We should not see the %s historical event in /sync response but it was present", historicalEventId)
// Sync from before we did any batch sending until we find the
// eventIDAfterHistoricalImport. If we're able to see
// eventIDAfterHistoricalImport without any the
// historicalEventIDs/historicalStateEventIDs in between, we're probably
// safe to assume it won't sync.
alice.SyncUntil(t, since, "", "rooms.join."+client.GjsonEscape(roomID)+".timeline.events", func(r gjson.Result) bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this test more clear on what's happening. We now paginate sync from before we /batch_send until an event that occurred after /batch_send and just make sure we didnt' see any historical state or events in between.

if includes(r.Get("event_id").Str, historicalEventIDs) || includes(r.Get("event_id").Str, historicalStateEventIDs) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where we additionally check for the historicalStateEventIDs now

t.Fatalf("We should not see the %s historical event in /sync response but it was present", r.Get("event_id").Str)
}

return r.Get("event_id").Str == eventIDAfterHistoricalImport
})
})

t.Run("Historical events from batch_send do not get pushed out as application service transactions", func(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please split out the /sync stuff into a different PR (which can be accepted) and leave this PR with me.

Thanks for the review @kegsay 🐢, I've split out the /sync stuff to #235

t.Parallel()

// Find the application service port defined in the registration file
asRegistration := deployment.HS["hs1"].ApplicationServices["my_as_id"]
asURLMatches := asURLRegexp.FindStringSubmatch(asRegistration)
if asURLMatches == nil {
t.Fatalf("Unable to find application service `url` in registration=%s", asRegistration)
}
asPort := asURLMatches[2]
Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parsing yaml with regex probably isn't very good, "not a regular language, blah" but didn't want to pull in a yaml parser here. Feel free to poke in a different direction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pull in a YAML parser.


// Create a listener and handler to stub an application service listening
// for transactions from a homeserver.
Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we want to create NewApplicationService to mirror federation.NewServer?

The Application service here just needs to listen to a single endpoint though (simple http listener), nothing special.

handler := mux.NewRouter()
// Application Service API: /_matrix/app/v1/transactions/{txnId}
waiter := NewWaiter()
var eventIDsWeSawOverTransactions []string
var eventIDAfterHistoricalImport string
handler.HandleFunc("/transactions/{txnId}", func(w http.ResponseWriter, req *http.Request) {
must.MatchRequest(t, req, match.HTTPRequest{
JSON: []match.JSON{
match.JSONArrayEach("events", func(r gjson.Result) error {
// Add to our running list of events
eventIDsWeSawOverTransactions = append(eventIDsWeSawOverTransactions, r.Get("event_id").Str)

// If we found the event that occurs after our batch send. we can
// probably safely assume the historical events won't come later.
if r.Get("event_id").Str != "" && r.Get("event_id").Str == eventIDAfterHistoricalImport {
defer waiter.Finish()
}

return nil
}),
},
})

// Acknowledge that we've seen the transaction
w.WriteHeader(200)
w.Write([]byte("{}"))
}).Methods("PUT")

srv := &http.Server{
Addr: fmt.Sprintf(":%s", asPort),
Handler: handler,
}
go func() {
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
// Note that running s.t.FailNow is not allowed in a separate goroutine
// Tests will likely fail if the server is not listening anyways
t.Logf("Failed to listen and serve our fake application service: %s", err)
}
}()
defer func() {
err := srv.Shutdown(context.Background())
if err != nil {
t.Fatalf("Failed to shutdown our fake application service: %s", err)
}
}()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inspired by the Server.Listen code

// ----------------------------------------------------------

// Create the room all of the action is going to happen in
roomID := as.CreateRoom(t, createPublicRoomOpts)
alice.JoinRoom(t, roomID, nil)

// Create the "live" event we are going to insert our historical events next to
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

// Import a historical event
batchSendRes := batchSendHistoricalMessages(
t,
as,
roomID,
eventIdBefore,
"",
createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore),
createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 1),
// Status
200,
)
batchSendResBody := client.ParseJSON(t, batchSendRes)
historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")
historicalStateEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "state_event_ids")

// This is just a dummy event we search for after the historicalEventIDs/historicalStateEventIDs
eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1)
eventIDAfterHistoricalImport = eventIDsAfterHistoricalImport[0]

// Check if eventIDAfterHistoricalImport already came over `/transactions`.
if !includes(eventIDAfterHistoricalImport, eventIDsWeSawOverTransactions) {
// If not, wait 5 seconds for to see if it happens. The waiter will only
// resolve if we see eventIDAfterHistoricalImport, otherwise timeout
waiter.Wait(t, 5*time.Second)
}

// Now, that we know eventIDAfterHistoricalImport came over /transactions,
// we can probably safely assume the historical events won't come later.

// Check to make sure the historical events didn't come over /transactions
for _, historicalEventID := range historicalEventIDs {
if includes(historicalEventID, eventIDsWeSawOverTransactions) {
t.Fatalf("We should not see the %s historical event come over /transactions but it did", historicalEventID)
}
}
// Check to make sure the historical state events didn't come over /transactions
for _, historicalStateEventID := range historicalStateEventIDs {
if includes(historicalStateEventID, eventIDsWeSawOverTransactions) {
t.Fatalf("We should not see the %s historical state event come over /transactions but it did", historicalStateEventID)
}
}
})

t.Run("Batch send endpoint only returns state events that we passed in via state_events_at_start", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -926,6 +1050,17 @@ func reversed(in []string) []string {
return out
}

// Find a given "needle" string in a list of strings, the haystack
func includes(needle string, haystack []string) bool {
for _, item := range haystack {
if needle == item {
return true
}
}

return false
}

func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, check func(gjson.Result) bool) {
t.Helper()
start := time.Now()
Expand Down