Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Peeking via MSC2753 #1370

Merged
merged 66 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
b9342d9
a very very WIP first cut of peeking via MSC2753.
ara4n Aug 30, 2020
d7bdf71
make PeekingDeviceSet private
ara4n Aug 30, 2020
cfa0be5
merge master
ara4n Aug 31, 2020
9b79f9a
add server_name param
ara4n Aug 31, 2020
d343b8f
blind stab at adding a `peek` section to /sync
ara4n Aug 31, 2020
c4e5f60
make it build
ara4n Aug 31, 2020
d1e4d66
make it launch
ara4n Aug 31, 2020
f006b37
add peeking to getResponseWithPDUsForCompleteSync
ara4n Aug 31, 2020
6c3a896
cancel any peeks when we join a room
ara4n Aug 31, 2020
7b38d48
spell out how to runoutside of docker if you want speed
ara4n Aug 31, 2020
e589984
fix SQL
ara4n Aug 31, 2020
0bb2c2c
remove unnecessary txn for SelectPeeks
ara4n Aug 31, 2020
28219c6
Merge branch 'master' into matthew/peeking
ara4n Sep 1, 2020
86e9736
fix s/join/peek/ cargocult fail
ara4n Sep 1, 2020
d0d5f70
Merge branch 'master' into matthew/peeking
ara4n Sep 1, 2020
bfecc8e
HACK: Track goroutine IDs to determine when we write by the wrong thread
kegsay Sep 1, 2020
7bf2a27
Track partition offsets and only log unsafe for non-selects
kegsay Sep 1, 2020
fcdb90c
Put redactions in the writer goroutine
kegsay Sep 1, 2020
6410b70
Update filters on writer goroutine
kegsay Sep 1, 2020
ed4b3a5
Merge branch 'kegan/redact-txn' into matthew/peeking
ara4n Sep 1, 2020
3cebd8d
Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matthew/peeking
ara4n Sep 1, 2020
5d7f688
wrap peek storage in goid hack
ara4n Sep 1, 2020
6424117
use exclusive writer, and MarkPeeksAsOld more efficiently
ara4n Sep 1, 2020
85bce11
don't log ascii in binary at sql trace...
ara4n Sep 1, 2020
75b91ac
strip out empty roomd deltas
ara4n Sep 1, 2020
b6cc441
re-add txn to SelectPeeks
ara4n Sep 2, 2020
f6af656
re-add accidentally deleted field
ara4n Sep 2, 2020
8712ea3
Merge branch 'master' into matthew/peeking
ara4n Sep 3, 2020
eda84cd
reject peeks for non-worldreadable rooms
ara4n Sep 3, 2020
da3742c
move perform_peek
ara4n Sep 3, 2020
3c5e079
fix package
ara4n Sep 3, 2020
c775643
correctly refactor perform_peek
ara4n Sep 3, 2020
4f5c8ca
Merge branch 'master' into matthew/peeking
ara4n Sep 3, 2020
2b8f0b8
Merge branch 'master' into matthew/peeking
neilalexander Sep 4, 2020
56d772f
Cancel peeks on join again
neilalexander Sep 4, 2020
9ad05e7
Merge branch 'master' into matthew/peeking
neilalexander Sep 4, 2020
ca0406e
Fix unit test
neilalexander Sep 4, 2020
83c559c
Merge branch 'master' into matthew/peeking
neilalexander Sep 4, 2020
64fe274
Merge branch 'master' into matthew/peeking
neilalexander Sep 4, 2020
5c04c52
Add PerformPeek full-HTTP serverside
neilalexander Sep 4, 2020
ff65f0e
Revert "Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matth…
neilalexander Sep 4, 2020
2ccd4fa
Clear a whole bunch of lint issues
neilalexander Sep 4, 2020
8bbccab
Merge branch 'matthew/peeking' of github.com:matrix-org/dendrite into…
neilalexander Sep 4, 2020
843b7a7
Fix PerformPeekPath
neilalexander Sep 4, 2020
55c7f2c
add (broken) postgres; advance streampos whenever sync output changes
ara4n Sep 7, 2020
034ff32
move peek-cancelling to consumer
ara4n Sep 8, 2020
7f41f39
remove erroneous commentary
ara4n Sep 8, 2020
9f2bc62
(broken) rewrite to use SelectPeeksInRange rather than MarkPeeksAsOld…
ara4n Sep 8, 2020
b96a31d
track transitions more clearly
ara4n Sep 8, 2020
7a76f49
notify new streampos after joining peeked rooms
ara4n Sep 8, 2020
56001d0
Only call Membership() on membership events
neilalexander Sep 8, 2020
6fe281a
Don't return sql.ErrNoRows on DeletePeeks since that is not an error …
neilalexander Sep 8, 2020
bcbe651
Merge branch 'master' into matthew/peeking
neilalexander Sep 8, 2020
faa070b
Don't panic
neilalexander Sep 8, 2020
de53608
fix nightmare bug where sqlite doesn't let you use out of order sub s…
ara4n Sep 9, 2020
b45436a
handle exclusive writer txn for cleanliness
ara4n Sep 9, 2020
052351e
go fmt
ara4n Sep 9, 2020
2300549
fix lint
ara4n Sep 9, 2020
d2a0bad
Merge branch 'master' into matthew/peeking
ara4n Sep 9, 2020
f09afe0
Remove accidental formerly-untracked file
neilalexander Sep 9, 2020
15349d8
Return errors from SQL statements to handle rollbacks correctly
kegsay Sep 9, 2020
34b8961
Fix sqlite DeletePeeks API to match postgres; fix bug which incorrect…
kegsay Sep 9, 2020
af47237
Add peeking tests to whitelist
kegsay Sep 9, 2020
8cec7e0
fix lots of places where we didn't rollback txns upon go errs (#1417)
ara4n Sep 9, 2020
b1d1ccd
allow duplicate stream ids in the peeks table
ara4n Sep 10, 2020
9561843
stop the peek table growing by reusing rows correctly
ara4n Sep 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ matrixdotorg/sytest-dendrite:latest tests/50federation/40devicelists.pl
```
See [sytest.md](docs/sytest.md) for the full description of these flags.

You can try running sytest outside of docker for faster runs, but the dependencies can be temperamental
and we recommend using docker where possible.
```
cd sytest
export PERL5LIB=$HOME/lib/perl5
export PERL_MB_OPT=--install_base=$HOME
export PERL_MM_OPT=INSTALL_BASE=$HOME
./install-deps.pl

./run-tests.pl -I Dendrite::Monolith -d $PATH_TO_DENDRITE_BINARIES
```

Sometimes Sytest is testing the wrong thing or is flakey, so it will need to be patched.
Ask on `#dendrite-dev:matrix.org` if you think this is the case for you and we'll be happy to help.

Expand Down
2 changes: 1 addition & 1 deletion clientapi/routing/joinroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func JoinRoomByIDOrAlias(
}
}

// If content was provided in the request then incude that
// If content was provided in the request then include that
// in the request. It'll get used as a part of the membership
// event content.
_ = httputil.UnmarshalJSONRequest(req, &joinReq.Content)
Expand Down
79 changes: 79 additions & 0 deletions clientapi/routing/peekroom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2020 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package routing

import (
"net/http"

roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)

func PeekRoomByIDOrAlias(
req *http.Request,
device *api.Device,
rsAPI roomserverAPI.RoomserverInternalAPI,
accountDB accounts.Database,
roomIDOrAlias string,
) util.JSONResponse {
// if this is a remote roomIDOrAlias, we have to ask the roomserver (or federation sender?) to
// to call /peek and /state on the remote server.
// TODO: in future we could skip this if we know we're already participating in the room,
// but this is fiddly in case we stop participating in the room.

// then we create a local peek.
peekReq := roomserverAPI.PerformPeekRequest{
RoomIDOrAlias: roomIDOrAlias,
UserID: device.UserID,
DeviceID: device.ID,
}
peekRes := roomserverAPI.PerformPeekResponse{}

// Check to see if any ?server_name= query parameters were
// given in the request.
if serverNames, ok := req.URL.Query()["server_name"]; ok {
for _, serverName := range serverNames {
peekReq.ServerNames = append(
peekReq.ServerNames,
gomatrixserverlib.ServerName(serverName),
)
}
}

// Ask the roomserver to perform the peek.
rsAPI.PerformPeek(req.Context(), &peekReq, &peekRes)
if peekRes.Error != nil {
return peekRes.Error.JSONResponse()
}

// if this user is already joined to the room, we let them peek anyway
// (given they might be about to part the room, and it makes things less fiddly)

// Peeking stops if none of the devices who started peeking have been
// /syncing for a while, or if everyone who was peeking calls /leave
// (or /unpeek with a server_name param? or DELETE /peek?)
// on the peeked room.

return util.JSONResponse{
Code: http.StatusOK,
// TODO: Put the response struct somewhere internal.
JSON: struct {
RoomID string `json:"room_id"`
}{peekRes.RoomID},
}
}
11 changes: 11 additions & 0 deletions clientapi/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ func Setup(
)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/peek/{roomIDOrAlias}",
httputil.MakeAuthAPI(gomatrixserverlib.Peek, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
return PeekRoomByIDOrAlias(
req, device, rsAPI, accountDB, vars["roomIDOrAlias"],
)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/joined_rooms",
httputil.MakeAuthAPI("joined_rooms", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return GetJoinedRooms(req, device, rsAPI)
Expand Down
19 changes: 19 additions & 0 deletions docs/peeking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
## Peeking

Peeking is implemented as per [MSC2753](https://github.com/matrix-org/matrix-doc/pull/2753).

Implementationwise, this means:
* Users call `/peek` and `/unpeek` on the clientapi from a given device.
* The clientapi delegates these via HTTP to the roomserver, which coordinates peeking in general for a given room
* The roomserver writes an NewPeek event into the kafka log headed to the syncserver
* The syncserver tracks the existence of the local peek in its DB, and then starts waking up the peeking devices for the room in question, putting it in the `peek` section of the /sync response.

Questions (given this is [my](https://github.com/ara4n) first time hacking on Dendrite):
* The whole clientapi -> roomserver -> syncapi flow to initiate a peek seems very indirect. Is there a reason not to just let syncapi itself host the implementation of `/peek`?
ara4n marked this conversation as resolved.
Show resolved Hide resolved

In future, peeking over federation will be added as per [MSC2444](https://github.com/matrix-org/matrix-doc/pull/2444).
* The `roomserver` will kick the `federationsender` much as it does for a federated `/join` in order to trigger a federated `/peek`
* The `federationsender` tracks the existence of the remote peek in question
* The `federationsender` regularly renews the remote peek as long as there are still peeking devices syncing for it.
ara4n marked this conversation as resolved.
Show resolved Hide resolved
* TBD: how do we tell if there are no devices currently syncing for a given peeked room? The syncserver needs to tell the roomserver
somehow who then needs to warn the federationsender.
7 changes: 7 additions & 0 deletions federationapi/routing/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ func (t *testRoomserverAPI) PerformJoin(
) {
}

func (t *testRoomserverAPI) PerformPeek(
ctx context.Context,
req *api.PerformPeekRequest,
res *api.PerformPeekResponse,
) {
}

func (t *testRoomserverAPI) PerformPublish(
ctx context.Context,
req *api.PerformPublishRequest,
Expand Down
2 changes: 1 addition & 1 deletion federationsender/storage/shared/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (d *Database) StoreJSON(
var err error
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
nid, err = d.FederationSenderQueueJSON.InsertQueueJSON(ctx, txn, js)
return nil
return err
})
if err != nil {
return nil, fmt.Errorf("d.insertQueueJSON: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/sqlutil/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (in *traceInterceptor) RowsNext(c context.Context, rows driver.Rows, dest [

b := strings.Builder{}
for i, val := range dest {
b.WriteString(fmt.Sprintf("%v", val))
b.WriteString(fmt.Sprintf("%q", val))
if i+1 <= len(dest)-1 {
b.WriteString(" | ")
}
Expand Down
2 changes: 1 addition & 1 deletion keyserver/storage/shared/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (d *Database) ExistingOneTimeKeys(ctx context.Context, userID, deviceID str
func (d *Database) StoreOneTimeKeys(ctx context.Context, keys api.OneTimeKeys) (counts *api.OneTimeKeysCount, err error) {
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
counts, err = d.OneTimeKeysTable.InsertOneTimeKeys(ctx, txn, keys)
return nil
return err
})
return
}
Expand Down
6 changes: 6 additions & 0 deletions roomserver/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ type RoomserverInternalAPI interface {
res *PerformLeaveResponse,
) error

PerformPeek(
ctx context.Context,
req *PerformPeekRequest,
res *PerformPeekResponse,
)

PerformPublish(
ctx context.Context,
req *PerformPublishRequest,
Expand Down
9 changes: 9 additions & 0 deletions roomserver/api/api_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ func (t *RoomserverInternalAPITrace) PerformInvite(
return t.Impl.PerformInvite(ctx, req, res)
}

func (t *RoomserverInternalAPITrace) PerformPeek(
ctx context.Context,
req *PerformPeekRequest,
res *PerformPeekResponse,
) {
t.Impl.PerformPeek(ctx, req, res)
util.GetLogger(ctx).Infof("PerformPeek req=%+v res=%+v", js(req), js(res))
}

func (t *RoomserverInternalAPITrace) PerformJoin(
ctx context.Context,
req *PerformJoinRequest,
Expand Down
15 changes: 14 additions & 1 deletion roomserver/api/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const (
// - Redact the event and set the corresponding `unsigned` fields to indicate it as redacted.
// - Replace the event in the database.
OutputTypeRedactedEvent OutputType = "redacted_event"

// OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek
OutputTypeNewPeek OutputType = "new_peek"
)

// An OutputEvent is an entry in the roomserver output kafka log.
Expand All @@ -59,8 +62,10 @@ type OutputEvent struct {
NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"`
// The content of event with type OutputTypeRetireInviteEvent
RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"`
// The content of event with type OutputTypeRedactedEvent
// The content of event with type OutputTypeRedactedEvent
RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"`
// The content of event with type OutputTypeNewPeek
NewPeek *OutputNewPeek `json:"new_peek,omitempty"`
}

// An OutputNewRoomEvent is written when the roomserver receives a new event.
Expand Down Expand Up @@ -195,3 +200,11 @@ type OutputRedactedEvent struct {
// The value of `unsigned.redacted_because` - the redaction event itself
RedactedBecause gomatrixserverlib.HeaderedEvent
}

// An OutputNewPeek is written whenever a user starts peeking into a room
// using a given device.
type OutputNewPeek struct {
ara4n marked this conversation as resolved.
Show resolved Hide resolved
RoomID string
UserID string
DeviceID string
}
14 changes: 14 additions & 0 deletions roomserver/api/perform.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,20 @@ type PerformInviteResponse struct {
Error *PerformError
}

type PerformPeekRequest struct {
RoomIDOrAlias string `json:"room_id_or_alias"`
UserID string `json:"user_id"`
DeviceID string `json:"device_id"`
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
}

type PerformPeekResponse struct {
// The room ID, populated on success.
RoomID string `json:"room_id"`
// If non-nil, the join request failed. Contains more information why it failed.
Error *PerformError
}

// PerformBackfillRequest is a request to PerformBackfill.
type PerformBackfillRequest struct {
// The room to backfill
Expand Down
8 changes: 8 additions & 0 deletions roomserver/internal/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type RoomserverInternalAPI struct {
*query.Queryer
*perform.Inviter
*perform.Joiner
*perform.Peeker
*perform.Leaver
*perform.Publisher
*perform.Backfiller
Expand Down Expand Up @@ -83,6 +84,13 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
FSAPI: r.fsAPI,
Inputer: r.Inputer,
}
r.Peeker = &perform.Peeker{
ServerName: r.Cfg.Matrix.ServerName,
Cfg: r.Cfg,
DB: r.DB,
FSAPI: r.fsAPI,
Inputer: r.Inputer,
}
r.Leaver = &perform.Leaver{
Cfg: r.Cfg,
DB: r.DB,
Expand Down
Loading