From 0e5f627ff2ad0049274928aa6a59ea4f902ac102 Mon Sep 17 00:00:00 2001 From: Louis Royer Date: Wed, 4 Sep 2024 13:00:58 +0200 Subject: [PATCH 1/8] Add PFCPEntity.Stop() close #28 --- README.md | 2 ++ pfcp/entity.go | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/README.md b/README.md index 02913ed..06c0237 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ ```golang upNode := NewPFCPEntityUP(UPF_NODE_ID, UPF_IP_ADDR) // node id can be an IP Address or a FQDN upNode.Start() +defer upNode.Close() // Access list of associations associations := upNode.GetPFCPAssociations() // Access list of sessions @@ -23,6 +24,7 @@ sessions := upNode.GetPFCPSessions() ```golang cpNode := NewPFCPEntityCP(SMF_NODE_ID, SMF_IP_ADDR) // node id can be an IP Address or a FQDN cpNode.Start() +defer cpNode.Close() association, _ := cpNode.NewEstablishedPFCPAssociation(ie.NewNodeIDHeuristic(UPFADDR)) session, _ := a.CreateSession(pdrs, fars) diff --git a/pfcp/entity.go b/pfcp/entity.go index 4d3800c..fa6e158 100644 --- a/pfcp/entity.go +++ b/pfcp/entity.go @@ -206,6 +206,10 @@ func (e *PFCPEntity) Start() error { return nil } +func (e *PFCPEntity) Stop() error { + return e.conn.Close() +} + func (e *PFCPEntity) IsUserPlane() bool { return e.kind == "UP" } From ab49b6e82eb6010e8cac3fc2493e5a3c3812d0d0 Mon Sep 17 00:00:00 2001 From: Louis Royer Date: Wed, 4 Sep 2024 16:03:02 +0200 Subject: [PATCH 2/8] Update to allow to use different context than background --- README.md | 4 +- pfcp/api/entity_interface.go | 3 - pfcp/entity.go | 164 +++++++++++++++++++++++------------ pfcp/handlers.go | 80 ++++++++--------- pfcp/outcoming_message.go | 17 ++++ pfcp/pfcp_conn.go | 48 ++++++++++ pfcp/received_message.go | 22 ++--- 7 files changed, 223 insertions(+), 115 deletions(-) create mode 100644 pfcp/outcoming_message.go create mode 100644 pfcp/pfcp_conn.go diff --git a/README.md b/README.md index 06c0237..42ce48b 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ ```golang upNode := NewPFCPEntityUP(UPF_NODE_ID, UPF_IP_ADDR) // node id can be an IP Address or a FQDN -upNode.Start() +upNode.ListenAndServe() // starts the node in a new goroutine defer upNode.Close() // Access list of associations associations := upNode.GetPFCPAssociations() @@ -23,7 +23,7 @@ sessions := upNode.GetPFCPSessions() ```golang cpNode := NewPFCPEntityCP(SMF_NODE_ID, SMF_IP_ADDR) // node id can be an IP Address or a FQDN -cpNode.Start() +cpNode.ListenAndServe() // starts the node in a new goroutine defer cpNode.Close() association, _ := cpNode.NewEstablishedPFCPAssociation(ie.NewNodeIDHeuristic(UPFADDR)) session, _ := a.CreateSession(pdrs, fars) diff --git a/pfcp/api/entity_interface.go b/pfcp/api/entity_interface.go index c7e1f97..41950c1 100644 --- a/pfcp/api/entity_interface.go +++ b/pfcp/api/entity_interface.go @@ -6,8 +6,6 @@ package api import ( - "net" - "github.com/wmnsk/go-pfcp/ie" ) @@ -19,7 +17,6 @@ type PFCPEntityInterface interface { NewEstablishedPFCPAssociation(nodeID *ie.IE) (association PFCPAssociationInterface, err error) RemovePFCPAssociation(association PFCPAssociationInterface) error GetPFCPAssociation(nid string) (association PFCPAssociationInterface, err error) - SendTo(msg []byte, dst net.Addr) error GetPFCPSessions() []PFCPSessionInterface GetPFCPSession(localIP string, seid SEID) (PFCPSessionInterface, error) AddEstablishedPFCPSession(session PFCPSessionInterface) error diff --git a/pfcp/entity.go b/pfcp/entity.go index fa6e158..028a8b7 100644 --- a/pfcp/entity.go +++ b/pfcp/entity.go @@ -6,11 +6,11 @@ package pfcp_networking import ( + "context" "fmt" "log" "net" "sync" - "time" "github.com/nextmn/go-pfcp-networking/pfcp/api" "github.com/nextmn/go-pfcp-networking/pfcputil" @@ -23,8 +23,6 @@ type PFCPEntity struct { listenAddr string recoveryTimeStamp *ie.IE handlers map[pfcputil.MessageType]PFCPMessageHandler - conn *net.UDPConn - connMu sync.Mutex associationsMap AssociationsMap // each session is associated with a specific PFCPAssociation // (can be changed with some requests) @@ -33,6 +31,50 @@ type PFCPEntity struct { sessionsMap api.SessionsMapInterface kind string // "CP" or "UP" options api.EntityOptionsInterface + + mu sync.Mutex + pfcpConns []*onceClosePfcpConn + closeFunc context.CancelFunc +} + +// onceClosePfcpConn wraps a PfcpConn, protecting it from multiple Close calls. +type onceClosePfcpConn struct { + *PFCPConn + once sync.Once + closeErr error +} + +func (oc *onceClosePfcpConn) Close() error { + oc.once.Do(oc.close) + return oc.closeErr +} + +func (oc *onceClosePfcpConn) close() { + oc.closeErr = oc.PFCPConn.Close() +} + +func (e *PFCPEntity) registerPfcpConn(conn *onceClosePfcpConn) { + e.mu.Lock() + defer e.mu.Unlock() + if e.pfcpConns == nil { + e.pfcpConns = make([]*onceClosePfcpConn, 1) + } + e.pfcpConns = append(e.pfcpConns, conn) +} + +func (e *PFCPEntity) closePfcpConn() error { + e.mu.Lock() + defer e.mu.Unlock() + if e.pfcpConns == nil { + return nil + } + for _, v := range e.pfcpConns { + if err := v.Close(); err != nil { + return err + } + } + e.pfcpConns = nil + return nil } func (e *PFCPEntity) Options() api.EntityOptionsInterface { @@ -52,15 +94,6 @@ func (e *PFCPEntity) GetPFCPSession(localIP string, seid api.SEID) (api.PFCPSess return e.sessionsMap.GetPFCPSession(localIP, seid) } -func (e *PFCPEntity) SendTo(msg []byte, dst net.Addr) error { - e.connMu.Lock() - defer e.connMu.Unlock() - if _, err := e.conn.WriteTo(msg, dst); err != nil { - return err - } - return nil -} - func (e *PFCPEntity) NodeID() *ie.IE { return e.nodeID } @@ -80,32 +113,14 @@ func NewPFCPEntity(nodeID string, listenAddr string, kind string, options api.En listenAddr: listenAddr, recoveryTimeStamp: nil, handlers: newDefaultPFCPEntityHandlers(), - conn: nil, - connMu: sync.Mutex{}, associationsMap: NewAssociationsMap(), sessionsMap: NewSessionsMap(), kind: kind, options: options, + pfcpConns: nil, } } -func (e *PFCPEntity) listen() error { - e.recoveryTimeStamp = ie.NewRecoveryTimeStamp(time.Now()) - // TODO: listen on multiple ip addresses (ipv4 + ipv6) - ipAddr := e.listenAddr - udpAddr := pfcputil.CreateUDPAddr(ipAddr, pfcputil.PFCP_PORT) - laddr, err := net.ResolveUDPAddr("udp", udpAddr) - if err != nil { - return err - } - e.conn, err = net.ListenUDP("udp", laddr) - if err != nil { - return err - } - - return nil -} - func (e *PFCPEntity) GetHandler(t pfcputil.MessageType) (h PFCPMessageHandler, err error) { if f, exists := e.handlers[t]; exists { return f, nil @@ -176,38 +191,73 @@ func (e *PFCPEntity) NewEstablishedPFCPAssociation(nodeID *ie.IE) (association a } -func (e *PFCPEntity) Start() error { - if err := e.listen(); err != nil { +// Listen PFCP and run the entity with the provided context. +// Always return a non-nil error. +func (e *PFCPEntity) ListenAndServe() error { + // TODO: listen on both ipv4 and ipv6 + ipaddr, err := net.ResolveIPAddr("ip", e.listenAddr) + if err != nil { return err } - buf := make([]byte, pfcputil.DEFAULT_MTU) // TODO: get MTU of interface instead of using DEFAULT_MTU - go func() error { - for { - n, addr, err := e.conn.ReadFrom(buf) - if err != nil { - return err - } - msg, err := message.Parse(buf[:n]) - if err != nil { - // undecodable pfcp message - continue - } - f, err := e.GetHandler(msg.MessageType()) - if err != nil { - log.Println("No Handler for message of this type:", err) - continue - } - err = f(ReceivedMessage{Message: msg, SenderAddr: addr, Entity: e}) - if err != nil { - log.Println(err) + if conn, err := ListenPFCP("udp", ipaddr); err != nil { + return err + } else { + e.Serve(context.Background(), conn) + return fmt.Errorf("Server closed") + } +} + +// Run the entity with the provided context. +// Always return a non-nil error and close the PFCPConn. +func (e *PFCPEntity) Serve(ctx context.Context, conn *PFCPConn) error { + if conn == nil { + return fmt.Errorf("Conn is nil") + } + newconn := &onceClosePfcpConn{PFCPConn: conn} + e.registerPfcpConn(newconn) + serveCtx, cancel := context.WithCancel(ctx) + e.closeFunc = cancel + defer newconn.Close() + for { + select { + case <-serveCtx.Done(): + // Stop signal received + return serveCtx.Err() + default: + buf := make([]byte, pfcputil.DEFAULT_MTU) // TODO: get MTU of interface instead of using DEFAULT_MTU + if n, addr, err := conn.ReadFrom(buf); err == nil { + go func(ctx context.Context, buffer []byte, sender net.Addr) { + msg, err := message.Parse(buffer) + if err != nil { + // undecodable pfcp message + return + } + f, err := e.GetHandler(msg.MessageType()) + if err != nil { + log.Println("No Handler for message of this type:", err) + return + } + if resp, err := f(ctx, ReceivedMessage{Message: msg, SenderAddr: addr, Entity: e}); err != nil { + log.Println(err) + } else { + select { + case <-ctx.Done(): + return + default: + conn.Write(resp) + } + } + }(serveCtx, buf[:n], addr) } } - }() - return nil + } } -func (e *PFCPEntity) Stop() error { - return e.conn.Close() +// Close stop the server and closes active PFCP connection. +func (e *PFCPEntity) Close() error { + e.closeFunc() + e.closePfcpConn() + return nil } func (e *PFCPEntity) IsUserPlane() bool { diff --git a/pfcp/handlers.go b/pfcp/handlers.go index bfe29d9..f08e99b 100644 --- a/pfcp/handlers.go +++ b/pfcp/handlers.go @@ -6,6 +6,7 @@ package pfcp_networking import ( + "context" "fmt" "io" "log" @@ -16,47 +17,47 @@ import ( "github.com/wmnsk/go-pfcp/message" ) -type PFCPMessageHandler = func(receivedMessage ReceivedMessage) error +type PFCPMessageHandler = func(ctx context.Context, receivedMessage ReceivedMessage) (*OutcomingMessage, error) -func DefaultHeartbeatRequestHandler(msg ReceivedMessage) error { +func DefaultHeartbeatRequestHandler(ctx context.Context, msg ReceivedMessage) (*OutcomingMessage, error) { log.Println("Received Heartbeat Request") res := message.NewHeartbeatResponse(msg.Sequence(), msg.Entity.RecoveryTimeStamp()) - return msg.ReplyTo(res) + return msg.NewResponse(res) } -func DefaultAssociationSetupRequestHandler(msg ReceivedMessage) error { +func DefaultAssociationSetupRequestHandler(ctx context.Context, msg ReceivedMessage) (*OutcomingMessage, error) { log.Println("Received Association Setup Request") m, ok := msg.Message.(*message.AssociationSetupRequest) if !ok { - return fmt.Errorf("Issue with Association Setup Request") + return nil, fmt.Errorf("Issue with Association Setup Request") } switch { case msg.Message == nil: - return fmt.Errorf("msg is nil") + return nil, fmt.Errorf("msg is nil") case msg.Entity == nil: - return fmt.Errorf("entity is nil") + return nil, fmt.Errorf("entity is nil") case msg.Entity.NodeID() == nil: - return fmt.Errorf("entity.NodeID() is nil") + return nil, fmt.Errorf("entity.NodeID() is nil") case msg.Entity.RecoveryTimeStamp() == nil: - return fmt.Errorf("entity.RecoveryTimeStamp() is nil") + return nil, fmt.Errorf("entity.RecoveryTimeStamp() is nil") } if _, err := msg.Entity.NewEstablishedPFCPAssociation(m.NodeID); err != nil { log.Println("Rejected Association:", err) res := message.NewAssociationSetupResponse(msg.Sequence(), msg.Entity.NodeID(), ie.NewCause(ie.CauseRequestRejected), msg.Entity.RecoveryTimeStamp()) - return msg.ReplyTo(res) + return msg.NewResponse(res) } log.Println("Association Accepted") res := message.NewAssociationSetupResponse(msg.Sequence(), msg.Entity.NodeID(), ie.NewCause(ie.CauseRequestAccepted), msg.Entity.RecoveryTimeStamp()) - return msg.ReplyTo(res) + return msg.NewResponse(res) } -func DefaultSessionEstablishmentRequestHandler(msg ReceivedMessage) error { +func DefaultSessionEstablishmentRequestHandler(ctx context.Context, msg ReceivedMessage) (*OutcomingMessage, error) { log.Println("Received Session Establishment Request") m, ok := msg.Message.(*message.SessionEstablishmentRequest) if !ok { - return fmt.Errorf("Issue with Session Establishment Request") + return nil, fmt.Errorf("Issue with Session Establishment Request") } // If F-SEID is missing or malformed, SEID shall be set to 0 @@ -67,7 +68,7 @@ func DefaultSessionEstablishmentRequestHandler(msg ReceivedMessage) error { // other than the one(s) communicated in the Node ID during Association Establishment Procedure if m.CPFSEID == nil { res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(ie.CauseMandatoryIEMissing), ie.NewOffendingIE(ie.FSEID)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } fseid, err := m.CPFSEID.FSEID() if err != nil { @@ -76,8 +77,7 @@ func DefaultSessionEstablishmentRequestHandler(msg ReceivedMessage) error { cause = ie.CauseInvalidLength } res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(cause), ie.NewOffendingIE(ie.FSEID)) - return msg.ReplyTo(res) - return err + return msg.NewResponse(res) } rseid = fseid.SEID @@ -85,13 +85,13 @@ func DefaultSessionEstablishmentRequestHandler(msg ReceivedMessage) error { if _, err := checkSenderAssociation(msg.Entity, msg.SenderAddr); err != nil { log.Println(err) res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(ie.CauseNoEstablishedPFCPAssociation)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } // NodeID is a mandatory IE if m.NodeID == nil { res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(ie.CauseMandatoryIEMissing), ie.NewOffendingIE(ie.NodeID)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } nid, err := m.NodeID.NodeID() if err != nil { @@ -100,7 +100,7 @@ func DefaultSessionEstablishmentRequestHandler(msg ReceivedMessage) error { cause = ie.CauseInvalidLength } res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(cause), ie.NewOffendingIE(ie.NodeID)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } // NodeID is used to define which PFCP Association is associated the PFCP Session @@ -111,33 +111,33 @@ func DefaultSessionEstablishmentRequestHandler(msg ReceivedMessage) error { association, err := msg.Entity.GetPFCPAssociation(nid) if err != nil { res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(ie.CauseNoEstablishedPFCPAssociation)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } // CreatePDR is a Mandatory IE if m.CreatePDR == nil || len(m.CreatePDR) == 0 { res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(ie.CauseMandatoryIEMissing), ie.NewOffendingIE(ie.CreatePDR)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } // CreateFAR is a Mandatory IE if m.CreateFAR == nil || len(m.CreateFAR) == 0 { res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(ie.CauseMandatoryIEMissing), ie.NewOffendingIE(ie.CreateFAR)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } // create PDRs pdrs, err, cause, offendingie := NewPDRMap(m.CreatePDR) if err != nil { res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(cause), ie.NewOffendingIE(offendingie)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } // create FARs fars, err, cause, offendingie := NewFARMap(m.CreateFAR) if err != nil { res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(cause), ie.NewOffendingIE(offendingie)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } // create session with PDRs and FARs @@ -145,20 +145,20 @@ func DefaultSessionEstablishmentRequestHandler(msg ReceivedMessage) error { if err != nil { // Send cause(Rule creation/modification failure) res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(ie.CauseRuleCreationModificationFailure)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } // TODO: Create other type IEs // XXX: QER ie are ignored for the moment // send response: session creation accepted res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(ie.CauseRequestAccepted), session.LocalFSEID()) - return msg.ReplyTo(res) + return msg.NewResponse(res) } -func DefaultSessionModificationRequestHandler(msg ReceivedMessage) error { +func DefaultSessionModificationRequestHandler(ctx context.Context, msg ReceivedMessage) (*OutcomingMessage, error) { log.Println("Received Session Modification Request") m, ok := msg.Message.(*message.SessionModificationRequest) if !ok { - return fmt.Errorf("Issue with Session Modification Request") + return nil, fmt.Errorf("Issue with Session Modification Request") } // PFCP session related messages for sessions that are already established are sent to the IP address received // in the F-SEID allocated by the peer function or to the IP address of an alternative SMF in the SMF set @@ -170,20 +170,20 @@ func DefaultSessionModificationRequestHandler(msg ReceivedMessage) error { ielocalnodeid := msg.Entity.NodeID() localnodeid, err := ielocalnodeid.NodeID() if err != nil { - return err + return nil, err } var localip string switch ielocalnodeid.Payload[0] { case ie.NodeIDIPv4Address: ip4, err := net.ResolveIPAddr("ip4", localnodeid) if err != nil { - return err + return nil, err } localip = ip4.String() case ie.NodeIDIPv6Address: ip6, err := net.ResolveIPAddr("ip6", localnodeid) if err != nil { - return err + return nil, err } localip = ip6.String() case ie.NodeIDFQDN: @@ -196,19 +196,19 @@ func DefaultSessionModificationRequestHandler(msg ReceivedMessage) error { case ip4 != nil: localip = ip4.String() default: - return fmt.Errorf("Cannot resolve NodeID") + return nil, fmt.Errorf("Cannot resolve NodeID") } } localseid := msg.SEID() session, err := msg.Entity.GetPFCPSession(localip, localseid) if err != nil { res := message.NewSessionModificationResponse(0, 0, 0, msg.Sequence(), 0, ie.NewCause(ie.CauseSessionContextNotFound)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } rseid, err := session.RemoteSEID() if err != nil { - return err + return nil, err } // // CP F-SEID @@ -222,35 +222,35 @@ func DefaultSessionModificationRequestHandler(msg ReceivedMessage) error { createpdrs, err, cause, offendingie := NewPDRMap(m.CreatePDR) if err != nil { res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(cause), ie.NewOffendingIE(offendingie)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } // create FARs createfars, err, cause, offendingie := NewFARMap(m.CreateFAR) if err != nil { res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(cause), ie.NewOffendingIE(offendingie)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } // update PDRs updatepdrs, err, cause, offendingie := NewPDRMap(m.UpdatePDR) if err != nil { res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(cause), ie.NewOffendingIE(offendingie)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } // update FARs updatefars, err, cause, offendingie := NewFARMap(m.UpdateFAR) if err != nil { res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(cause), ie.NewOffendingIE(offendingie)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } err = session.AddUpdatePDRsFARs(createpdrs, createfars, updatepdrs, updatefars) if err != nil { //XXX, offending IE res := message.NewSessionModificationResponse(0, 0, rseid, msg.Sequence(), 0, ie.NewCause(ie.CauseRequestRejected)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } //XXX: QER modification/creation is ignored for the moment @@ -259,7 +259,7 @@ func DefaultSessionModificationRequestHandler(msg ReceivedMessage) error { //XXX: RemoveQER res := message.NewSessionModificationResponse(0, 0, rseid, msg.Sequence(), 0, ie.NewCause(ie.CauseRequestAccepted)) - return msg.ReplyTo(res) + return msg.NewResponse(res) } func checkSenderAssociation(entity api.PFCPEntityInterface, senderAddr net.Addr) (api.PFCPAssociationInterface, error) { diff --git a/pfcp/outcoming_message.go b/pfcp/outcoming_message.go new file mode 100644 index 0000000..aa93d29 --- /dev/null +++ b/pfcp/outcoming_message.go @@ -0,0 +1,17 @@ +// Copyright 2024 Louis Royer and the go-pfcp-networking contributors. All rights reserved. +// Use of this source code is governed by a MIT-style license that can be +// found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package pfcp_networking + +import ( + "net" + + "github.com/wmnsk/go-pfcp/message" +) + +type OutcomingMessage struct { + message.Message + Destination net.Addr +} diff --git a/pfcp/pfcp_conn.go b/pfcp/pfcp_conn.go new file mode 100644 index 0000000..fbcee07 --- /dev/null +++ b/pfcp/pfcp_conn.go @@ -0,0 +1,48 @@ +// Copyright 2024 Louis Royer and the go-pfcp-networking contributors. All rights reserved. +// Use of this source code is governed by a MIT-style license that can be +// found in the LICENSE file. +// SPDX-License-Identifier: MIT +package pfcp_networking + +import ( + "fmt" + "net" + + "github.com/nextmn/go-pfcp-networking/pfcputil" +) + +type PFCPConn struct { + net.UDPConn +} + +func ListenPFCP(network string, laddr *net.IPAddr) (*PFCPConn, error) { + switch network { + case "udp", "udp4", "udp6": + default: + return nil, fmt.Errorf("unknown network") + } + udpAddr := pfcputil.CreateUDPAddr(laddr.String(), pfcputil.PFCP_PORT) + ludpaddr, err := net.ResolveUDPAddr(network, udpAddr) + if err != nil { + return nil, err + } + if conn, err := net.ListenUDP(network, ludpaddr); err == nil { + return &PFCPConn{ + UDPConn: *conn, + }, nil + } else { + return nil, err + } +} + +func (conn *PFCPConn) Write(m *OutcomingMessage) error { + //XXX: message.Message interface does not implement Marshal() + b := make([]byte, m.MarshalLen()) + if err := m.MarshalTo(b); err != nil { + return err + } + if _, err := conn.WriteTo(b, m.Destination); err != nil { + return err + } + return nil +} diff --git a/pfcp/received_message.go b/pfcp/received_message.go index 8b20924..c2d7da5 100644 --- a/pfcp/received_message.go +++ b/pfcp/received_message.go @@ -20,23 +20,19 @@ type ReceivedMessage struct { Entity api.PFCPEntityInterface } -func (receivedMessage *ReceivedMessage) ReplyTo(responseMessage message.Message) error { +func (receivedMessage *ReceivedMessage) NewResponse(responseMessage message.Message) (*OutcomingMessage, error) { if !pfcputil.IsMessageTypeRequest(receivedMessage.MessageType()) { - return fmt.Errorf("receivedMessage shall be a Request Message") + return nil, fmt.Errorf("receivedMessage shall be a Request Message") } if !pfcputil.IsMessageTypeResponse(responseMessage.MessageType()) { - return fmt.Errorf("responseMessage shall be a Response Message") + return nil, fmt.Errorf("responseMessage shall be a Response Message") } if receivedMessage.Sequence() != responseMessage.Sequence() { - return fmt.Errorf("responseMessage shall have the same Sequence Number than receivedMessage") + return nil, fmt.Errorf("responseMessage shall have the same Sequence Number than receivedMessage") } - //XXX: message.Message interface does not implement Marshal() - b := make([]byte, responseMessage.MarshalLen()) - if err := responseMessage.MarshalTo(b); err != nil { - return err - } - if err := receivedMessage.Entity.SendTo(b, receivedMessage.SenderAddr); err != nil { - return err - } - return nil + return &OutcomingMessage{ + Message: responseMessage, + Destination: receivedMessage.SenderAddr, + }, nil + } From 61fa356f332eeb50ca7e88bee5df26011cc038b8 Mon Sep 17 00:00:00 2001 From: Louis Royer Date: Wed, 4 Sep 2024 16:32:09 +0200 Subject: [PATCH 3/8] Logrus --- README.md | 4 +-- go.mod | 9 ++++-- go.sum | 16 +++++++++++ pfcp/api/entity_interface.go | 2 +- pfcp/association.go | 6 ++-- pfcp/entity.go | 53 +++++++++++++++++++++--------------- pfcp/entity_cp.go | 6 ++-- pfcp/entity_up.go | 10 +++---- pfcp/handlers.go | 16 +++++------ pfcp/session.go | 4 +-- pfcp/session_id_pool.go | 4 +-- 11 files changed, 79 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index 42ce48b..555efeb 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ ### UPF ```golang -upNode := NewPFCPEntityUP(UPF_NODE_ID, UPF_IP_ADDR) // node id can be an IP Address or a FQDN +upNode, _ := NewPFCPEntityUP(UPF_NODE_ID, UPF_IP_ADDR) // node id can be an IP Address or a FQDN upNode.ListenAndServe() // starts the node in a new goroutine defer upNode.Close() // Access list of associations @@ -22,7 +22,7 @@ sessions := upNode.GetPFCPSessions() ### SMF ```golang -cpNode := NewPFCPEntityCP(SMF_NODE_ID, SMF_IP_ADDR) // node id can be an IP Address or a FQDN +cpNode, _ := NewPFCPEntityCP(SMF_NODE_ID, SMF_IP_ADDR) // node id can be an IP Address or a FQDN cpNode.ListenAndServe() // starts the node in a new goroutine defer cpNode.Close() association, _ := cpNode.NewEstablishedPFCPAssociation(ie.NewNodeIDHeuristic(UPFADDR)) diff --git a/go.mod b/go.mod index 3bc7d08..4fa1296 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,10 @@ module github.com/nextmn/go-pfcp-networking -go 1.22 +go 1.22.1 -require github.com/wmnsk/go-pfcp v0.0.24 +require ( + github.com/sirupsen/logrus v1.9.3 + github.com/wmnsk/go-pfcp v0.0.24 +) + +require golang.org/x/sys v0.24.0 // indirect diff --git a/go.sum b/go.sum index 617cbc1..2efe932 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,20 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/wmnsk/go-pfcp v0.0.24 h1:sv4F3U/IphsPUMXMkTJW877CRvXZ1sF5onWHGBvxx/A= github.com/wmnsk/go-pfcp v0.0.24/go.mod h1:8EUVvOzlz25wkUs9D8STNAs5zGyIo5xEUpHQOUZ/iSg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pfcp/api/entity_interface.go b/pfcp/api/entity_interface.go index 41950c1..1a9fbb2 100644 --- a/pfcp/api/entity_interface.go +++ b/pfcp/api/entity_interface.go @@ -20,6 +20,6 @@ type PFCPEntityInterface interface { GetPFCPSessions() []PFCPSessionInterface GetPFCPSession(localIP string, seid SEID) (PFCPSessionInterface, error) AddEstablishedPFCPSession(session PFCPSessionInterface) error - PrintPFCPRules() + LogPFCPRules() Options() EntityOptionsInterface } diff --git a/pfcp/association.go b/pfcp/association.go index 6bd019c..14b8783 100644 --- a/pfcp/association.go +++ b/pfcp/association.go @@ -7,11 +7,11 @@ package pfcp_networking import ( "fmt" - "log" "net" "time" "github.com/nextmn/go-pfcp-networking/pfcp/api" + "github.com/sirupsen/logrus" "github.com/wmnsk/go-pfcp/ie" "github.com/wmnsk/go-pfcp/message" ) @@ -74,7 +74,7 @@ func (association *PFCPAssociation) SetupInitiatedByCP() error { } asres, ok := resp.(*message.AssociationSetupResponse) if !ok { - log.Printf("got unexpected message: %s\n", resp.MessageTypeName()) + logrus.WithFields(logrus.Fields{"message-type": resp.MessageTypeName()}).Debug("Got unexpected message") } cause, err := asres.Cause.Cause() if err != nil { @@ -86,7 +86,7 @@ func (association *PFCPAssociation) SetupInitiatedByCP() error { go association.heartMonitoring() return nil } - return fmt.Errorf("Associaton setup request rejected") + return fmt.Errorf("Association setup request rejected") default: return fmt.Errorf("Local PFCP entity is not a UP function, neither a CP function.") } diff --git a/pfcp/entity.go b/pfcp/entity.go index 028a8b7..fb473c6 100644 --- a/pfcp/entity.go +++ b/pfcp/entity.go @@ -8,12 +8,12 @@ package pfcp_networking import ( "context" "fmt" - "log" "net" "sync" "github.com/nextmn/go-pfcp-networking/pfcp/api" "github.com/nextmn/go-pfcp-networking/pfcputil" + "github.com/sirupsen/logrus" "github.com/wmnsk/go-pfcp/ie" "github.com/wmnsk/go-pfcp/message" ) @@ -234,11 +234,11 @@ func (e *PFCPEntity) Serve(ctx context.Context, conn *PFCPConn) error { } f, err := e.GetHandler(msg.MessageType()) if err != nil { - log.Println("No Handler for message of this type:", err) + logrus.WithFields(logrus.Fields{"message-type": msg.MessageType}).WithError(err).Debug("No Handler for message of this type") return } if resp, err := f(ctx, ReceivedMessage{Message: msg, SenderAddr: addr, Entity: e}); err != nil { - log.Println(err) + logrus.WithError(err).Debug("Handler raised an error") } else { select { case <-ctx.Done(): @@ -268,58 +268,55 @@ func (e *PFCPEntity) IsControlPlane() bool { return e.kind == "CP" } -func (e *PFCPEntity) PrintPFCPRules() { +func (e *PFCPEntity) LogPFCPRules() { for _, session := range e.GetPFCPSessions() { localIPAddress, err := session.LocalIPAddress() if err != nil { - log.Println(err) + logrus.WithError(err).Debug("Could not get local IP Address") continue } localSEID, err := session.LocalSEID() if err != nil { - log.Println(err) + logrus.WithError(err).Debug("Could not get local SEID") continue } remoteIPAddress, err := session.RemoteIPAddress() if err != nil { - log.Println(err) + logrus.WithError(err).Debug("Could not get remote IP Address") continue } remoteSEID, err := session.RemoteSEID() if err != nil { - log.Println(err) + logrus.WithError(err).Debug("Could not get remote SEID") continue } - log.Printf("PFCP Session: Local F-SEID [%s (%d)], Remote F-SEID [%s (%d)]\n", - localIPAddress.String(), localSEID, - remoteIPAddress.String(), remoteSEID) session.RLock() defer session.RUnlock() for _, pdrid := range session.GetSortedPDRIDs() { pdr, err := session.GetPDR(pdrid) if err != nil { - log.Println(err) + logrus.WithError(err).Debug("Could not get PDR") continue } precedence, err := pdr.Precedence() if err != nil { - log.Println(err) + logrus.WithError(err).Debug("Could not get Precedence") continue } farid, err := pdr.FARID() if err != nil { - log.Println(err) + logrus.WithError(err).Debug("Could not get FAR ID") continue } pdicontent, err := pdr.PDI() if err != nil { - log.Println(err) + logrus.WithError(err).Debug("Could not get PDI") continue } far, err := session.GetFAR(farid) if err != nil { - log.Println(err) + logrus.WithError(err).Debug("Could not get FAR") continue } pdi := ie.NewPDI(pdicontent...) @@ -422,12 +419,24 @@ func (e *PFCPEntity) PrintPFCPRules() { } } - log.Printf(" ↦ [PDR %d] (%d) Source interface: %s, OHR: %s, F-TEID: %s, UE IP: %s\n", pdrid, precedence, sourceInterfaceLabel, OuterHeaderRemovalLabel, fteidLabel, ueIpAddressLabel) - if SDFFilterLabel != "" { - log.Printf(" ↪ %s\n", SDFFilterLabel) - } - log.Printf(" ↪ [FAR %d] OHC: %s, ApplyAction: %s, Destination interface: %s\n", farid, OuterHeaderCreationLabel, ApplyActionLabel, DestinationInterfaceLabel) + logrus.WithFields(logrus.Fields{ + "session/local-fseid": localIPAddress.String(), + "session/local-seid": localSEID, + "session/remote-fseid": remoteIPAddress.String(), + "session/remote-seid": remoteSEID, + "pdr/id": pdrid, + "pdr/precedence": precedence, + "pdr/source-iface": sourceInterfaceLabel, + "pdr/outer-header-removal": OuterHeaderRemovalLabel, + "pdr/fteid": fteidLabel, + "pdr/ue-ip-addr": ueIpAddressLabel, + "pdr/sdf-filter": SDFFilterLabel, + "far/id": farid, + "far/ohc": OuterHeaderCreationLabel, + "far/apply-action": ApplyActionLabel, + "far/destination-iface": DestinationInterfaceLabel, + }).Info("PDR") + } - log.Printf("\n") } } diff --git a/pfcp/entity_cp.go b/pfcp/entity_cp.go index fe03152..0f34998 100644 --- a/pfcp/entity_cp.go +++ b/pfcp/entity_cp.go @@ -11,10 +11,10 @@ type PFCPEntityCP struct { PFCPEntity } -func NewPFCPEntityCP(nodeID string, listenAddr string) *PFCPEntityCP { +func NewPFCPEntityCP(nodeID string, listenAddr string) (*PFCPEntityCP, error) { return NewPFCPEntityCPWithOptions(nodeID, listenAddr, EntityOptions{}) } -func NewPFCPEntityCPWithOptions(nodeID string, listenAddr string, options api.EntityOptionsInterface) *PFCPEntityCP { - return &PFCPEntityCP{PFCPEntity: NewPFCPEntity(nodeID, listenAddr, "CP", options)} +func NewPFCPEntityCPWithOptions(nodeID string, listenAddr string, options api.EntityOptionsInterface) (*PFCPEntityCP, error) { + return &PFCPEntityCP{PFCPEntity: NewPFCPEntity(nodeID, listenAddr, "CP", options)}, nil } diff --git a/pfcp/entity_up.go b/pfcp/entity_up.go index 5af9609..c147948 100644 --- a/pfcp/entity_up.go +++ b/pfcp/entity_up.go @@ -6,8 +6,6 @@ package pfcp_networking import ( - "log" - "github.com/nextmn/go-pfcp-networking/pfcp/api" "github.com/wmnsk/go-pfcp/message" ) @@ -16,17 +14,17 @@ type PFCPEntityUP struct { PFCPEntity } -func NewPFCPEntityUP(nodeID string, listenAddr string) *PFCPEntityUP { +func NewPFCPEntityUP(nodeID string, listenAddr string) (*PFCPEntityUP, error) { return NewPFCPEntityUPWithOptions(nodeID, listenAddr, EntityOptions{}) } -func NewPFCPEntityUPWithOptions(nodeID string, listenAddr string, options api.EntityOptionsInterface) *PFCPEntityUP { +func NewPFCPEntityUPWithOptions(nodeID string, listenAddr string, options api.EntityOptionsInterface) (*PFCPEntityUP, error) { e := PFCPEntityUP{PFCPEntity: NewPFCPEntity(nodeID, listenAddr, "UP", options)} err := e.initDefaultHandlers() if err != nil { - log.Println(err) + return nil, err } - return &e + return &e, nil } func (e *PFCPEntityUP) initDefaultHandlers() error { diff --git a/pfcp/handlers.go b/pfcp/handlers.go index f08e99b..0819ccf 100644 --- a/pfcp/handlers.go +++ b/pfcp/handlers.go @@ -9,10 +9,10 @@ import ( "context" "fmt" "io" - "log" "net" "github.com/nextmn/go-pfcp-networking/pfcp/api" + "github.com/sirupsen/logrus" "github.com/wmnsk/go-pfcp/ie" "github.com/wmnsk/go-pfcp/message" ) @@ -20,13 +20,13 @@ import ( type PFCPMessageHandler = func(ctx context.Context, receivedMessage ReceivedMessage) (*OutcomingMessage, error) func DefaultHeartbeatRequestHandler(ctx context.Context, msg ReceivedMessage) (*OutcomingMessage, error) { - log.Println("Received Heartbeat Request") + logrus.Debug("Received Heartbeat Request") res := message.NewHeartbeatResponse(msg.Sequence(), msg.Entity.RecoveryTimeStamp()) return msg.NewResponse(res) } func DefaultAssociationSetupRequestHandler(ctx context.Context, msg ReceivedMessage) (*OutcomingMessage, error) { - log.Println("Received Association Setup Request") + logrus.Debug("Received Association Setup Request") m, ok := msg.Message.(*message.AssociationSetupRequest) if !ok { return nil, fmt.Errorf("Issue with Association Setup Request") @@ -43,18 +43,18 @@ func DefaultAssociationSetupRequestHandler(ctx context.Context, msg ReceivedMess } if _, err := msg.Entity.NewEstablishedPFCPAssociation(m.NodeID); err != nil { - log.Println("Rejected Association:", err) + logrus.WithError(err).Debug("Rejected Association") res := message.NewAssociationSetupResponse(msg.Sequence(), msg.Entity.NodeID(), ie.NewCause(ie.CauseRequestRejected), msg.Entity.RecoveryTimeStamp()) return msg.NewResponse(res) } - log.Println("Association Accepted") + logrus.Debug("Association Accepted") res := message.NewAssociationSetupResponse(msg.Sequence(), msg.Entity.NodeID(), ie.NewCause(ie.CauseRequestAccepted), msg.Entity.RecoveryTimeStamp()) return msg.NewResponse(res) } func DefaultSessionEstablishmentRequestHandler(ctx context.Context, msg ReceivedMessage) (*OutcomingMessage, error) { - log.Println("Received Session Establishment Request") + logrus.Debug("Received Session Establishment Request") m, ok := msg.Message.(*message.SessionEstablishmentRequest) if !ok { return nil, fmt.Errorf("Issue with Session Establishment Request") @@ -83,7 +83,7 @@ func DefaultSessionEstablishmentRequestHandler(ctx context.Context, msg Received // Sender must have established a PFCP Association with the Receiver Node if _, err := checkSenderAssociation(msg.Entity, msg.SenderAddr); err != nil { - log.Println(err) + logrus.WithError(err).Debug("No association") res := message.NewSessionEstablishmentResponse(0, 0, rseid, msg.Sequence(), 0, msg.Entity.NodeID(), ie.NewCause(ie.CauseNoEstablishedPFCPAssociation)) return msg.NewResponse(res) } @@ -155,7 +155,7 @@ func DefaultSessionEstablishmentRequestHandler(ctx context.Context, msg Received } func DefaultSessionModificationRequestHandler(ctx context.Context, msg ReceivedMessage) (*OutcomingMessage, error) { - log.Println("Received Session Modification Request") + logrus.Debug("Received Session Modification Request") m, ok := msg.Message.(*message.SessionModificationRequest) if !ok { return nil, fmt.Errorf("Issue with Session Modification Request") diff --git a/pfcp/session.go b/pfcp/session.go index f500d36..a58ea07 100644 --- a/pfcp/session.go +++ b/pfcp/session.go @@ -7,11 +7,11 @@ package pfcp_networking import ( "fmt" - "log" "net" "sync" "github.com/nextmn/go-pfcp-networking/pfcp/api" + "github.com/sirupsen/logrus" "github.com/wmnsk/go-pfcp/ie" "github.com/wmnsk/go-pfcp/message" ) @@ -277,7 +277,7 @@ func (s *PFCPSession) Setup() error { } ser, ok := resp.(*message.SessionEstablishmentResponse) if !ok { - log.Printf("got unexpected message: %s\n", resp.MessageTypeName()) + logrus.WithFields(logrus.Fields{"message-type": resp.MessageTypeName()}).Debug("got unexpected message") } remoteFseidFields, err := ser.UPFSEID.FSEID() diff --git a/pfcp/session_id_pool.go b/pfcp/session_id_pool.go index d607b9b..3d81006 100644 --- a/pfcp/session_id_pool.go +++ b/pfcp/session_id_pool.go @@ -6,10 +6,10 @@ package pfcp_networking import ( - "log" "sync" "github.com/nextmn/go-pfcp-networking/pfcp/api" + "github.com/sirupsen/logrus" ) // SessionIDPool is a generator of session IDs @@ -32,6 +32,6 @@ func (pool *SessionIDPool) GetNext() api.SEID { defer pool.muSessionID.Unlock() id := pool.currentSessionID pool.currentSessionID = id + 1 - log.Println("Returning next Session ID:", id) + logrus.WithFields(logrus.Fields{"next-session-id": id}).Debug("Returning next Session ID") return id } From 8f2fb3daf1a801bd17372b7a3f385ab4b48fd3d4 Mon Sep 17 00:00:00 2001 From: Louis Royer Date: Wed, 4 Sep 2024 16:37:23 +0200 Subject: [PATCH 4/8] Recovery timestamp --- pfcp/entity.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pfcp/entity.go b/pfcp/entity.go index fb473c6..2e72adf 100644 --- a/pfcp/entity.go +++ b/pfcp/entity.go @@ -10,6 +10,7 @@ import ( "fmt" "net" "sync" + "time" "github.com/nextmn/go-pfcp-networking/pfcp/api" "github.com/nextmn/go-pfcp-networking/pfcputil" @@ -111,7 +112,7 @@ func NewPFCPEntity(nodeID string, listenAddr string, kind string, options api.En return PFCPEntity{ nodeID: ie.NewNodeIDHeuristic(nodeID), listenAddr: listenAddr, - recoveryTimeStamp: nil, + recoveryTimeStamp: ie.NewRecoveryTimeStamp(time.Now()), handlers: newDefaultPFCPEntityHandlers(), associationsMap: NewAssociationsMap(), sessionsMap: NewSessionsMap(), From a6a79aa00fe766ad53245c9ae6443eaf80776555 Mon Sep 17 00:00:00 2001 From: Louis Royer Date: Wed, 4 Sep 2024 16:45:30 +0200 Subject: [PATCH 5/8] ListenAndServeContext --- pfcp/entity.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pfcp/entity.go b/pfcp/entity.go index 2e72adf..bd6fb66 100644 --- a/pfcp/entity.go +++ b/pfcp/entity.go @@ -195,6 +195,12 @@ func (e *PFCPEntity) NewEstablishedPFCPAssociation(nodeID *ie.IE) (association a // Listen PFCP and run the entity with the provided context. // Always return a non-nil error. func (e *PFCPEntity) ListenAndServe() error { + return e.ListenAndServeContext(context.Background()) +} + +// Listen PFCP and run the entity with the provided context. +// Always return a non-nil error. +func (e *PFCPEntity) ListenAndServeContext(ctx context.Context) error { // TODO: listen on both ipv4 and ipv6 ipaddr, err := net.ResolveIPAddr("ip", e.listenAddr) if err != nil { @@ -203,7 +209,7 @@ func (e *PFCPEntity) ListenAndServe() error { if conn, err := ListenPFCP("udp", ipaddr); err != nil { return err } else { - e.Serve(context.Background(), conn) + e.Serve(ctx, conn) return fmt.Errorf("Server closed") } } From d09ddba289c35cdd8bf3ad7de884c3fa6ceff5d1 Mon Sep 17 00:00:00 2001 From: Louis Royer Date: Wed, 4 Sep 2024 17:01:23 +0200 Subject: [PATCH 6/8] New without error --- README.md | 4 ++-- pfcp/entity.go | 7 +++++-- pfcp/entity_cp.go | 6 +++--- pfcp/entity_up.go | 29 ++++++++++------------------- 4 files changed, 20 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 555efeb..42ce48b 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ ### UPF ```golang -upNode, _ := NewPFCPEntityUP(UPF_NODE_ID, UPF_IP_ADDR) // node id can be an IP Address or a FQDN +upNode := NewPFCPEntityUP(UPF_NODE_ID, UPF_IP_ADDR) // node id can be an IP Address or a FQDN upNode.ListenAndServe() // starts the node in a new goroutine defer upNode.Close() // Access list of associations @@ -22,7 +22,7 @@ sessions := upNode.GetPFCPSessions() ### SMF ```golang -cpNode, _ := NewPFCPEntityCP(SMF_NODE_ID, SMF_IP_ADDR) // node id can be an IP Address or a FQDN +cpNode := NewPFCPEntityCP(SMF_NODE_ID, SMF_IP_ADDR) // node id can be an IP Address or a FQDN cpNode.ListenAndServe() // starts the node in a new goroutine defer cpNode.Close() association, _ := cpNode.NewEstablishedPFCPAssociation(ie.NewNodeIDHeuristic(UPFADDR)) diff --git a/pfcp/entity.go b/pfcp/entity.go index bd6fb66..8bf0c20 100644 --- a/pfcp/entity.go +++ b/pfcp/entity.go @@ -108,12 +108,15 @@ func newDefaultPFCPEntityHandlers() map[pfcputil.MessageType]PFCPMessageHandler return m } -func NewPFCPEntity(nodeID string, listenAddr string, kind string, options api.EntityOptionsInterface) PFCPEntity { +func NewPFCPEntity(nodeID string, listenAddr string, kind string, handlers map[pfcputil.MessageType]PFCPMessageHandler, options api.EntityOptionsInterface) PFCPEntity { + if handlers == nil { + handlers = newDefaultPFCPEntityHandlers() + } return PFCPEntity{ nodeID: ie.NewNodeIDHeuristic(nodeID), listenAddr: listenAddr, recoveryTimeStamp: ie.NewRecoveryTimeStamp(time.Now()), - handlers: newDefaultPFCPEntityHandlers(), + handlers: handlers, associationsMap: NewAssociationsMap(), sessionsMap: NewSessionsMap(), kind: kind, diff --git a/pfcp/entity_cp.go b/pfcp/entity_cp.go index 0f34998..dab87b1 100644 --- a/pfcp/entity_cp.go +++ b/pfcp/entity_cp.go @@ -11,10 +11,10 @@ type PFCPEntityCP struct { PFCPEntity } -func NewPFCPEntityCP(nodeID string, listenAddr string) (*PFCPEntityCP, error) { +func NewPFCPEntityCP(nodeID string, listenAddr string) *PFCPEntityCP { return NewPFCPEntityCPWithOptions(nodeID, listenAddr, EntityOptions{}) } -func NewPFCPEntityCPWithOptions(nodeID string, listenAddr string, options api.EntityOptionsInterface) (*PFCPEntityCP, error) { - return &PFCPEntityCP{PFCPEntity: NewPFCPEntity(nodeID, listenAddr, "CP", options)}, nil +func NewPFCPEntityCPWithOptions(nodeID string, listenAddr string, options api.EntityOptionsInterface) *PFCPEntityCP { + return &PFCPEntityCP{PFCPEntity: NewPFCPEntity(nodeID, listenAddr, "CP", nil, options)} } diff --git a/pfcp/entity_up.go b/pfcp/entity_up.go index c147948..6d49433 100644 --- a/pfcp/entity_up.go +++ b/pfcp/entity_up.go @@ -7,6 +7,7 @@ package pfcp_networking import ( "github.com/nextmn/go-pfcp-networking/pfcp/api" + "github.com/nextmn/go-pfcp-networking/pfcputil" "github.com/wmnsk/go-pfcp/message" ) @@ -14,28 +15,18 @@ type PFCPEntityUP struct { PFCPEntity } -func NewPFCPEntityUP(nodeID string, listenAddr string) (*PFCPEntityUP, error) { +func NewPFCPEntityUP(nodeID string, listenAddr string) *PFCPEntityUP { return NewPFCPEntityUPWithOptions(nodeID, listenAddr, EntityOptions{}) } -func NewPFCPEntityUPWithOptions(nodeID string, listenAddr string, options api.EntityOptionsInterface) (*PFCPEntityUP, error) { - e := PFCPEntityUP{PFCPEntity: NewPFCPEntity(nodeID, listenAddr, "UP", options)} - err := e.initDefaultHandlers() - if err != nil { - return nil, err - } - return &e, nil +func NewPFCPEntityUPWithOptions(nodeID string, listenAddr string, options api.EntityOptionsInterface) *PFCPEntityUP { + return &PFCPEntityUP{PFCPEntity: NewPFCPEntity(nodeID, listenAddr, "UP", newDefaultPFCPEntityUPHandlers(), options)} } -func (e *PFCPEntityUP) initDefaultHandlers() error { - if err := e.AddHandler(message.MsgTypeAssociationSetupRequest, DefaultAssociationSetupRequestHandler); err != nil { - return err - } - if err := e.AddHandler(message.MsgTypeSessionEstablishmentRequest, DefaultSessionEstablishmentRequestHandler); err != nil { - return err - } - if err := e.AddHandler(message.MsgTypeSessionModificationRequest, DefaultSessionModificationRequestHandler); err != nil { - return err - } - return nil +func newDefaultPFCPEntityUPHandlers() map[pfcputil.MessageType]PFCPMessageHandler { + m := newDefaultPFCPEntityHandlers() + m[message.MsgTypeAssociationSetupRequest] = DefaultAssociationSetupRequestHandler + m[message.MsgTypeSessionEstablishmentRequest] = DefaultSessionEstablishmentRequestHandler + m[message.MsgTypeSessionModificationRequest] = DefaultSessionModificationRequestHandler + return m } From 26017aa17348ec78a590e35e66475d96b7b736d3 Mon Sep 17 00:00:00 2001 From: Louis Royer Date: Wed, 4 Sep 2024 17:21:37 +0200 Subject: [PATCH 7/8] update doc --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 42ce48b..37ad774 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ ```golang upNode := NewPFCPEntityUP(UPF_NODE_ID, UPF_IP_ADDR) // node id can be an IP Address or a FQDN -upNode.ListenAndServe() // starts the node in a new goroutine +go upNode.ListenAndServe() defer upNode.Close() // Access list of associations associations := upNode.GetPFCPAssociations() @@ -23,7 +23,7 @@ sessions := upNode.GetPFCPSessions() ```golang cpNode := NewPFCPEntityCP(SMF_NODE_ID, SMF_IP_ADDR) // node id can be an IP Address or a FQDN -cpNode.ListenAndServe() // starts the node in a new goroutine +go cpNode.ListenAndServe() defer cpNode.Close() association, _ := cpNode.NewEstablishedPFCPAssociation(ie.NewNodeIDHeuristic(UPFADDR)) session, _ := a.CreateSession(pdrs, fars) From 60beb02394e767f8fc810dec8cf4e5e495e77dba Mon Sep 17 00:00:00 2001 From: Louis Royer Date: Wed, 4 Sep 2024 17:50:24 +0200 Subject: [PATCH 8/8] Recoverytimestamp only at serve time --- pfcp/entity.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pfcp/entity.go b/pfcp/entity.go index 8bf0c20..0d79760 100644 --- a/pfcp/entity.go +++ b/pfcp/entity.go @@ -115,7 +115,7 @@ func NewPFCPEntity(nodeID string, listenAddr string, kind string, handlers map[p return PFCPEntity{ nodeID: ie.NewNodeIDHeuristic(nodeID), listenAddr: listenAddr, - recoveryTimeStamp: ie.NewRecoveryTimeStamp(time.Now()), + recoveryTimeStamp: nil, handlers: handlers, associationsMap: NewAssociationsMap(), sessionsMap: NewSessionsMap(), @@ -228,6 +228,7 @@ func (e *PFCPEntity) Serve(ctx context.Context, conn *PFCPConn) error { serveCtx, cancel := context.WithCancel(ctx) e.closeFunc = cancel defer newconn.Close() + e.recoveryTimeStamp = ie.NewRecoveryTimeStamp(time.Now()) for { select { case <-serveCtx.Done():