Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Implement Push Notifications #1842

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
151d52c
Add Pushserver component with Pushers API
PiotrKozimor Nov 17, 2021
2db65e1
Wire Pushserver component
PiotrKozimor Nov 17, 2021
bd9a4f5
Merge remote-tracking branch 'origin/master' into pushers-api
tommie Dec 3, 2021
cb10d9c
Add PushGatewayClient.
tommie Oct 10, 2021
c6b3048
Add a pushrules module.
tommie Oct 10, 2021
d8c8bfc
Change user API account creation to use the new pushrules module's de…
tommie Oct 19, 2021
f5d8f0e
Add push rules query/put API in Pushserver.
tommie Oct 19, 2021
0548a88
Add clientapi routes for push rules to Pushserver.
tommie Oct 19, 2021
54ece78
Output rooms.join.unread_notifications in /sync.
tommie Oct 28, 2021
9262526
Implement pushserver/storage for notifications.
tommie Oct 27, 2021
ce1255e
Use PushGatewayClient and the pushrules module in Pushserver's room c…
tommie Oct 10, 2021
74b74bf
Implement read receipt consumers in Pushserver.
tommie Oct 27, 2021
bbf4b70
Add clientapi route for /unstable/notifications.
tommie Oct 27, 2021
1dbcd84
Rename to UpsertPusher for clarity and handle pusher update
PiotrKozimor Dec 3, 2021
5ccfe77
Fix linter errors
PiotrKozimor Dec 6, 2021
b89da40
Ignore body.Close() error check
PiotrKozimor Dec 6, 2021
c9e94b4
Fix push server internal http wiring
PiotrKozimor Dec 6, 2021
da437f0
Add 40 newly passing 61push tests to whitelist
PiotrKozimor Dec 10, 2021
394ea6b
Add next 12 newly passing 61push tests to whitelist
PiotrKozimor Dec 16, 2021
192e9db
Send notification data before notifying users in EDU server consumer
PiotrKozimor Dec 17, 2021
87dc8c7
Merge branch 'main' into implement-push-notifications
neilalexander Feb 10, 2022
2582a05
NATS JetStream
neilalexander Feb 10, 2022
8ee18cd
Goodbye sarama
neilalexander Feb 10, 2022
b01b918
Fix `NewStreamTokenFromString`
neilalexander Feb 10, 2022
81b71da
Consume on the correct topic for the roomserver
neilalexander Feb 10, 2022
8d5426a
Merge branch 'main' into implement-push-notifications
neilalexander Feb 10, 2022
23d175d
Don't panic, NAK instead
neilalexander Feb 10, 2022
b20bbc8
Merge branch 'main' into implement-push-notifications
neilalexander Feb 14, 2022
0cd3b5c
Merge branch 'main' into implement-push-notifications
neilalexander Feb 17, 2022
857b75d
Merge branch 'main' into implement-push-notifications
neilalexander Feb 18, 2022
d96623f
Move push notifications into the User API
neilalexander Feb 18, 2022
fa7d249
Merge branch 'main' into implement-push-notifications
neilalexander Feb 18, 2022
6d63096
Don't set null values since that apparently causes Element upsetti
neilalexander Feb 18, 2022
0d928c6
Also set omitempty on conditions
neilalexander Feb 18, 2022
6ad6b9d
Fix bug so that we don't override the push rules unnecessarily
neilalexander Feb 18, 2022
4ce5fec
Tweak defaults
neilalexander Feb 18, 2022
1aa25b3
Update defaults
neilalexander Feb 21, 2022
3a4add0
More tweaks
neilalexander Feb 21, 2022
a57ce77
Merge branch 'main' into implement-push-notifications
neilalexander Feb 21, 2022
26f20d5
Move `/notifications` onto `r0`/`v3` mux
neilalexander Feb 21, 2022
f7a4825
Merge branch 'main' into implement-push-notifications
neilalexander Feb 22, 2022
dd2518c
Merge branch 'main' into implement-push-notifications
neilalexander Mar 1, 2022
977d6b8
Merge branch 'main' into implement-push-notifications
neilalexander Mar 2, 2022
4d71df8
Merge branch 'main' into implement-push-notifications
neilalexander Mar 2, 2022
67ef8e3
User API will consume events and read/fully read markers from the syn…
neilalexander Mar 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ dendrite.yaml
*.db

# Log files
*.log*
*.log*

# Generated code
cmd/dendrite-demo-yggdrasil/embed/fs*.go

# Test dependencies
test/wasm/node_modules

media_store/
# Ignore complement folder when running locally
complement/

media_store/
11 changes: 11 additions & 0 deletions build/docker/config/dendrite.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,17 @@ user_api:
max_idle_conns: 2
conn_max_lifetime: -1

# Configuration for the Push Server API.
push_server:
neilalexander marked this conversation as resolved.
Show resolved Hide resolved
internal_api:
listen: http://localhost:7782
connect: http://localhost:7782
database:
connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_pushserver?sslmode=disable
max_open_conns: 10
max_idle_conns: 2
conn_max_lifetime: -1

# Configuration for Opentracing.
# See https://github.com/matrix-org/dendrite/tree/master/docs/tracing for information on
# how this works and how to set it up.
Expand Down
2 changes: 1 addition & 1 deletion build/gobind-pinecone/monolith.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (m *DendriteMonolith) Start() {
)

keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
m.userAPI = userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
m.userAPI = userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient())
keyAPI.SetUserAPI(m.userAPI)

eduInputAPI := eduserver.NewInternalAPI(
Expand Down
2 changes: 1 addition & 1 deletion build/gobind-yggdrasil/monolith.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (m *DendriteMonolith) Start() {
)

keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient())
keyAPI.SetUserAPI(userAPI)

eduInputAPI := eduserver.NewInternalAPI(
Expand Down
3 changes: 2 additions & 1 deletion clientapi/clientapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func AddPublicRoutes(
routing.Setup(
router, synapseAdminRouter, cfg, eduInputAPI, rsAPI, asAPI,
accountsDB, userAPI, federation,
syncProducer, transactionsCache, fsAPI, keyAPI, extRoomsProvider, mscCfg,
syncProducer, transactionsCache, fsAPI, keyAPI,
extRoomsProvider, mscCfg,
)
}
7 changes: 4 additions & 3 deletions clientapi/producers/syncapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ type SyncAPIProducer struct {
}

// SendData sends account data to the sync API server
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error {
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON) error {
m := &nats.Msg{
Subject: p.Topic,
Header: nats.Header{},
}
m.Header.Set(jetstream.UserID, userID)

data := eventutil.AccountData{
RoomID: roomID,
Type: dataType,
RoomID: roomID,
Type: dataType,
ReadMarker: readMarker,
}
var err error
m.Data, err = json.Marshal(data)
Expand Down
12 changes: 4 additions & 8 deletions clientapi/routing/account_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/eventutil"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/userapi/api"

Expand Down Expand Up @@ -127,7 +128,7 @@ func SaveAccountData(
}

// TODO: user API should do this since it's account data
if err := syncProducer.SendData(userID, roomID, dataType); err != nil {
if err := syncProducer.SendData(userID, roomID, dataType, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
return jsonerror.InternalServerError()
}
Expand All @@ -138,11 +139,6 @@ func SaveAccountData(
}
}

type readMarkerJSON struct {
FullyRead string `json:"m.fully_read"`
Read string `json:"m.read"`
}

type fullyReadEvent struct {
EventID string `json:"event_id"`
}
Expand All @@ -159,7 +155,7 @@ func SaveReadMarker(
return *resErr
}

var r readMarkerJSON
var r eventutil.ReadMarkerJSON
resErr = httputil.UnmarshalJSONRequest(req, &r)
if resErr != nil {
return *resErr
Expand Down Expand Up @@ -189,7 +185,7 @@ func SaveReadMarker(
return util.ErrorResponse(err)
}

if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read"); err != nil {
if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read", &r); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
return jsonerror.InternalServerError()
}
Expand Down
63 changes: 63 additions & 0 deletions clientapi/routing/notification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2021 Dan Peleg <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package routing

import (
"net/http"
"strconv"

"github.com/matrix-org/dendrite/clientapi/jsonerror"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)

// GetNotifications handles /_matrix/client/r0/notifications
func GetNotifications(
req *http.Request, device *userapi.Device,
userAPI userapi.UserInternalAPI,
) util.JSONResponse {
var limit int64
if limitStr := req.URL.Query().Get("limit"); limitStr != "" {
var err error
limit, err = strconv.ParseInt(limitStr, 10, 64)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("ParseInt(limit) failed")
return jsonerror.InternalServerError()
}
}

var queryRes userapi.QueryNotificationsResponse
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SplitID failed")
return jsonerror.InternalServerError()
}
err = userAPI.QueryNotifications(req.Context(), &userapi.QueryNotificationsRequest{
Localpart: localpart,
From: req.URL.Query().Get("from"),
Limit: int(limit),
Only: req.URL.Query().Get("only"),
}, &queryRes)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("QueryNotifications failed")
return jsonerror.InternalServerError()
}
util.GetLogger(req.Context()).WithField("from", req.URL.Query().Get("from")).WithField("limit", limit).WithField("only", req.URL.Query().Get("only")).WithField("next", queryRes.NextToken).Infof("QueryNotifications: len %d", len(queryRes.Notifications))
return util.JSONResponse{
Code: http.StatusOK,
JSON: queryRes,
}
}
15 changes: 15 additions & 0 deletions clientapi/routing/password.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
userdb "github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)

type newPasswordRequest struct {
Expand All @@ -37,6 +38,11 @@ func Password(
var r newPasswordRequest
r.LogoutDevices = true

logrus.WithFields(logrus.Fields{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

util.GetLogger(req.Context()) would be better as it would have request IDs associated with it.

"sessionId": device.SessionID,
"userId": device.UserID,
}).Debug("Changing password")

// Unmarshal the request.
resErr := httputil.UnmarshalJSONRequest(req, &r)
if resErr != nil {
Expand Down Expand Up @@ -116,6 +122,15 @@ func Password(
util.GetLogger(req.Context()).WithError(err).Error("PerformDeviceDeletion failed")
return jsonerror.InternalServerError()
}

pushersReq := &api.PerformPusherDeletionRequest{
Localpart: localpart,
SessionID: device.SessionID,
}
if err := userAPI.PerformPusherDeletion(req.Context(), pushersReq, &struct{}{}); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("PerformPusherDeletion failed")
return jsonerror.InternalServerError()
}
}

// Return a success code.
Expand Down
114 changes: 114 additions & 0 deletions clientapi/routing/pusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2021 Dan Peleg <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package routing

import (
"net/http"
"net/url"

"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)

// GetPushers handles /_matrix/client/r0/pushers
func GetPushers(
req *http.Request, device *userapi.Device,
userAPI userapi.UserInternalAPI,
) util.JSONResponse {
var queryRes userapi.QueryPushersResponse
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SplitID failed")
return jsonerror.InternalServerError()
}
err = userAPI.QueryPushers(req.Context(), &userapi.QueryPushersRequest{
Localpart: localpart,
}, &queryRes)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("QueryPushers failed")
return jsonerror.InternalServerError()
}
for i := range queryRes.Pushers {
queryRes.Pushers[i].SessionID = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you doing this? Needs comments.

}
return util.JSONResponse{
Code: http.StatusOK,
JSON: queryRes,
}
}

// SetPusher handles /_matrix/client/r0/pushers/set
// This endpoint allows the creation, modification and deletion of pushers for this user ID.
// The behaviour of this endpoint varies depending on the values in the JSON body.
func SetPusher(
req *http.Request, device *userapi.Device,
userAPI userapi.UserInternalAPI,
) util.JSONResponse {
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SplitID failed")
return jsonerror.InternalServerError()
}
body := userapi.PerformPusherSetRequest{}
if resErr := httputil.UnmarshalJSONRequest(req, &body); resErr != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We generally have a layer of indirection where we go from Matrix specification structs to internal API structs as they may not always match up and can become error-prone (it's generally safe to assume specification structs needs validation whereas internal API structs do not). It would be nice to do the same here.

return *resErr
}
if len(body.AppID) > 64 {
return invalidParam("length of app_id must be no more than 64 characters")
}
if len(body.PushKey) > 512 {
return invalidParam("length of pushkey must be no more than 512 bytes")
}
uInt := body.Data["url"]
if uInt != nil {
u, ok := uInt.(string)
if !ok {
return invalidParam("url must be string")
}
if u != "" {
var pushUrl *url.URL
pushUrl, err = url.Parse(u)
if err != nil {
return invalidParam("malformed url passed")
}
if pushUrl.Scheme != "https" {
return invalidParam("only https scheme is allowed")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this validation could be part of a struct function for pushserverapi.PerformPusherSetRequest which would help tidy this function up a bit.

}
}

}
body.Localpart = localpart
body.SessionID = device.SessionID
err = userAPI.PerformPusherSet(req.Context(), &body, &struct{}{})
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("PerformPusherSet failed")
return jsonerror.InternalServerError()
}

return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
}
}

func invalidParam(msg string) util.JSONResponse {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidParam(msg),
}
}
Loading