Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Commit

Permalink
module: added support for xep-0313
Browse files Browse the repository at this point in the history
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
  • Loading branch information
ortuman committed Jun 25, 2022
1 parent 8c60656 commit ce3d1be
Show file tree
Hide file tree
Showing 41 changed files with 1,770 additions and 93 deletions.
33 changes: 19 additions & 14 deletions config/example.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@
# cert_file: ""
# privkey_file: ""

#storage:
# type: pgsql
# pgsql:
# host: 127.0.0.1:5432
# user: jackal
# password: a-secret-key
# database: jackal
# max_open_conns: 16
#
# cache:
# type: redis
# redis:
# addresses:
# - localhost:6379
storage:
type: pgsql
pgsql:
host: 127.0.0.1:5432
user: jackal
password: a-secret-key
database: jackal
max_open_conns: 16

cache:
type: redis
redis:
addresses:
- localhost:6379

#cluster:
# type: kv
Expand Down Expand Up @@ -128,6 +128,7 @@ modules:
# - ping # XEP-0199: XMPP Ping
# - time # XEP-0202: Entity Time
# - carbons # XEP-0280: Message Carbons
# - mam # XEP-0313: Message Archive Management
#
# version:
# show_os: true
Expand All @@ -140,6 +141,10 @@ modules:
# interval: 3m
# send_pings: true
# timeout_action: kill
#
# mam:
# queue_size: 1500
#

components:
secret: a-super-secret-key
Expand Down
22 changes: 22 additions & 0 deletions helm/sql/postgres.up.psql
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,25 @@ CREATE TABLE IF NOT EXISTS vcards (
);

SELECT enable_updated_at('vcards');

-- archives

CREATE TABLE IF NOT EXISTS archives (
serial SERIAL PRIMARY KEY,
archive_id VARCHAR(1023),
id VARCHAR(255) NOT NULL,
"from" TEXT NOT NULL,
from_bare TEXT NOT NULL,
"to" TEXT NOT NULL,
to_bare TEXT NOT NULL,
message BYTEA NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS i_archives_archive_id ON archives(archive_id);
CREATE INDEX IF NOT EXISTS i_archives_id ON archives(id);
CREATE INDEX IF NOT EXISTS i_archives_to ON archives("to");
CREATE INDEX IF NOT EXISTS i_archives_to_bare ON archives(to_bare);
CREATE INDEX IF NOT EXISTS i_archives_from ON archives("from");
CREATE INDEX IF NOT EXISTS i_archives_from_bare ON archives(from_bare);
CREATE INDEX IF NOT EXISTS i_archives_created_at ON archives(created_at);
4 changes: 4 additions & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ jackal:
- ping # XEP-0199: XMPP Ping
- time # XEP-0202: Entity Time
- carbons # XEP-0280: Message Carbons
- mam # XEP-0313: Message Archive Management

version:
show_os: true
Expand All @@ -138,6 +139,9 @@ jackal:
send_pings: true
timeout_action: kill

mam:
queue_size: 1500

components:
# listeners:
# - port: 5275
Expand Down
22 changes: 13 additions & 9 deletions pkg/c2s/in.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,8 @@ func (s *inC2S) processIQ(ctx context.Context, iq *stravaganza.IQ) error {
case router.ErrRemoteServerTimeout:
return s.sendElement(ctx, stanzaerror.E(stanzaerror.RemoteServerTimeout, iq).Element())

case nil:
_, err := s.runHook(ctx, hook.C2SStreamIQRouted, &hook.C2SStreamInfo{
case nil, router.ErrUserNotAvailable:
_, err = s.runHook(ctx, hook.C2SStreamIQRouted, &hook.C2SStreamInfo{
ID: s.ID().String(),
JID: s.JID(),
Presence: s.Presence(),
Expand Down Expand Up @@ -647,7 +647,7 @@ func (s *inC2S) processPresence(ctx context.Context, presence *stravaganza.Prese
}
targets, err := s.router.Route(ctx, outPr)
switch err {
case nil:
case nil, router.ErrUserNotAvailable:
_, err = s.runHook(ctx, hook.C2SStreamPresenceRouted, &hook.C2SStreamInfo{
ID: s.ID().String(),
JID: s.JID(),
Expand Down Expand Up @@ -718,18 +718,21 @@ sendMsg:
case router.ErrRemoteServerTimeout:
return s.sendElement(ctx, stanzaerror.E(stanzaerror.RemoteServerTimeout, message).Element())

case router.ErrUserNotAvailable:
return s.sendElement(ctx, stanzaerror.E(stanzaerror.ServiceUnavailable, message).Element())

case nil:
_, err = s.runHook(ctx, hook.C2SStreamMessageRouted, &hook.C2SStreamInfo{
case nil, router.ErrUserNotAvailable:
halted, hErr := s.runHook(ctx, hook.C2SStreamMessageRouted, &hook.C2SStreamInfo{
ID: s.ID().String(),
JID: s.JID(),
Presence: s.Presence(),
Targets: targets,
Element: msg,
})
return err
if halted {
return nil
}
if errors.Is(err, router.ErrUserNotAvailable) {
return s.sendElement(ctx, stanzaerror.E(stanzaerror.ServiceUnavailable, message).Element())
}
return hErr

default:
return err
Expand Down Expand Up @@ -1105,6 +1108,7 @@ func (s *inC2S) close(ctx context.Context, disconnectErr error) error {
halted, err := s.runHook(ctx, hook.C2SStreamDisconnected, &hook.C2SStreamInfo{
ID: s.ID().String(),
JID: s.JID(),
Presence: s.Presence(),
DisconnectError: disconnectErr,
})
if halted {
Expand Down
8 changes: 4 additions & 4 deletions pkg/hook/c2s.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ const (
// C2SStreamWillRouteElement hook runs when an XMPP element is about to be routed over a C2S stream.
C2SStreamWillRouteElement = "c2s.stream.will_route_element"

// C2SStreamIQRouted hook runs when an iq stanza is successfully routed to one ore more C2S streams.
// C2SStreamIQRouted hook runs when an iq stanza is successfully routed to zero or more C2S streams.
C2SStreamIQRouted = "c2s.stream.iq_routed"

// C2SStreamPresenceRouted hook runs when a presence stanza is successfully routed to one ore more C2S streams.
// C2SStreamPresenceRouted hook runs when a presence stanza is successfully routed to zero or more C2S streams.
C2SStreamPresenceRouted = "c2s.stream.presence_routed"

// C2SStreamMessageRouted hook runs when a message stanza is successfully routed to one ore more C2S streams.
// C2SStreamMessageRouted hook runs when a message stanza is successfully routed to zero or more C2S streams.
C2SStreamMessageRouted = "c2s.stream.message_routed"

// C2SStreamElementSent hook runs when a XMPP element is sent over a C2S stream.
// C2SStreamElementSent hook runs when an XMPP element is sent over a C2S stream.
C2SStreamElementSent = "c2s.stream.element_sent"
)

Expand Down
10 changes: 8 additions & 2 deletions pkg/hook/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@ type Priority int32

const (
// LowestPriority defines lowest hook execution priority.
LowestPriority = Priority(math.MinInt32 + 100)
LowestPriority = Priority(math.MinInt32)

// LowPriority defines low hook execution priority.
LowPriority = Priority(math.MinInt32 + 1000)

// DefaultPriority defines default hook execution priority.
DefaultPriority = Priority(0)

// HighPriority defines high hook execution priority.
HighPriority = Priority(math.MaxInt32 - 1000)

// HighestPriority defines highest hook execution priority.
HighestPriority = Priority(math.MaxInt32 - 100)
HighestPriority = Priority(math.MaxInt32)
)

// Handler defines a generic hook handler function.
Expand Down
14 changes: 9 additions & 5 deletions pkg/hook/s2s.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package hook

import (
"github.com/jackal-xmpp/stravaganza"
"github.com/jackal-xmpp/stravaganza/jid"
)

const (
Expand All @@ -25,7 +26,7 @@ const (
// S2SOutStreamDisconnected hook runs when an outgoing S2S connection is unregistered.
S2SOutStreamDisconnected = "s2s.out.stream.disconnected"

// S2SOutStreamElementSent hook runs whenever a XMPP element is sent over an outgoing S2S stream.
// S2SOutStreamElementSent hook runs whenever an XMPP element is sent over an outgoing S2S stream.
S2SOutStreamElementSent = "s2s.out.stream.element_sent"

// S2SInStreamRegistered hook runs when an incoming S2S connection is registered.
Expand All @@ -34,7 +35,7 @@ const (
// S2SInStreamUnregistered hook runs when an incoming S2S connection is unregistered.
S2SInStreamUnregistered = "s2s.in.stream.unregistered"

// S2SInStreamElementReceived hook runs when a XMPP element is received over an incoming S2S stream.
// S2SInStreamElementReceived hook runs when an XMPP element is received over an incoming S2S stream.
S2SInStreamElementReceived = "s2s.in.stream.stanza_received"

// S2SInStreamIQReceived hook runs when an iq stanza is received over an incoming S2S stream.
Expand All @@ -49,13 +50,13 @@ const (
// S2SInStreamWillRouteElement hook runs when an XMPP element is about to be routed on an incoming S2S stream.
S2SInStreamWillRouteElement = "s2s.in.stream.will_route_element"

// S2SInStreamIQRouted hook runs when an iq stanza is successfully routed to one ore more S2S streams.
// S2SInStreamIQRouted hook runs when an iq stanza is successfully routed to zero or more C2S streams.
S2SInStreamIQRouted = "s2s.in.stream.iq_routed"

// S2SInStreamPresenceRouted hook runs when a presence stanza is successfully routed to one ore more S2S streams.
// S2SInStreamPresenceRouted hook runs when a presence stanza is successfully routed to zero or more C2S streams.
S2SInStreamPresenceRouted = "s2s.in.stream.presence_routed"

// S2SInStreamMessageRouted hook runs when a message stanza is successfully routed to one ore more S2S streams.
// S2SInStreamMessageRouted hook runs when a message stanza is successfully routed to zero or more C2S streams.
S2SInStreamMessageRouted = "s2s.in.stream.message_routed"
)

Expand All @@ -70,6 +71,9 @@ type S2SStreamInfo struct {
// Target is the S2S target domain.
Target string

// Targets contains all JIDs to which event stanza was routed.
Targets []jid.JID

// Element is the event associated XMPP element.
Element stravaganza.Element
}
5 changes: 5 additions & 0 deletions pkg/jackal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package jackal
import (
"path/filepath"

"github.com/ortuman/jackal/pkg/module/xep0313"

"github.com/kkyr/fig"
adminserver "github.com/ortuman/jackal/pkg/admin/server"
"github.com/ortuman/jackal/pkg/auth/pepper"
Expand Down Expand Up @@ -95,6 +97,9 @@ type ModulesConfig struct {

// XEP-0199: XMPP Ping
Ping xep0199.Config `fig:"ping"`

// XEP-0313: Message Archive Management
Mam xep0313.Config `fig:"mam"`
}

// Config defines jackal application configuration.
Expand Down
3 changes: 1 addition & 2 deletions pkg/jackal/jackal.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"syscall"
"time"

streamqueue "github.com/ortuman/jackal/pkg/module/xep0198/queue"

kitlog "github.com/go-kit/log"
"github.com/go-kit/log/level"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand All @@ -47,6 +45,7 @@ import (
"github.com/ortuman/jackal/pkg/host"
"github.com/ortuman/jackal/pkg/log"
"github.com/ortuman/jackal/pkg/module"
streamqueue "github.com/ortuman/jackal/pkg/module/xep0198/queue"
"github.com/ortuman/jackal/pkg/router"
"github.com/ortuman/jackal/pkg/s2s"
"github.com/ortuman/jackal/pkg/shaper"
Expand Down
9 changes: 8 additions & 1 deletion pkg/jackal/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ortuman/jackal/pkg/module/xep0199"
"github.com/ortuman/jackal/pkg/module/xep0202"
"github.com/ortuman/jackal/pkg/module/xep0280"
"github.com/ortuman/jackal/pkg/module/xep0313"
)

var defaultModules = []string{
Expand All @@ -45,6 +46,7 @@ var defaultModules = []string{
xep0198.ModuleName,
xep0199.ModuleName,
xep0280.ModuleName,
xep0313.ModuleName,
}

var modFns = map[string]func(a *Jackal, cfg *ModulesConfig) module.Module{
Expand All @@ -56,7 +58,7 @@ var modFns = map[string]func(a *Jackal, cfg *ModulesConfig) module.Module{
// Offline
// (https://xmpp.org/extensions/xep-0160.html)
offline.ModuleName: func(j *Jackal, cfg *ModulesConfig) module.Module {
return offline.New(cfg.Offline, j.router, j.hosts, j.resMng, j.rep, j.hk, j.logger)
return offline.New(cfg.Offline, j.router, j.hosts, j.rep, j.hk, j.logger)
},
// XEP-0012: Last Activity
// (https://xmpp.org/extensions/xep-0012.html)
Expand Down Expand Up @@ -114,4 +116,9 @@ var modFns = map[string]func(a *Jackal, cfg *ModulesConfig) module.Module{
xep0280.ModuleName: func(j *Jackal, _ *ModulesConfig) module.Module {
return xep0280.New(j.router, j.hosts, j.resMng, j.hk, j.logger)
},
// XEP-0313: Message Archive Management
// (https://xmpp.org/extensions/xep-0313.html)
xep0313.ModuleName: func(j *Jackal, cfg *ModulesConfig) module.Module {
return xep0313.New(cfg.Mam, j.router, j.hosts, j.rep, j.hk, j.logger)
},
}
Loading

0 comments on commit ce3d1be

Please sign in to comment.