diff --git a/pkg/server/bmp.go b/pkg/server/bmp.go index a5882fbd3..aac357b6e 100644 --- a/pkg/server/bmp.go +++ b/pkg/server/bmp.go @@ -154,8 +154,8 @@ func (b *bmpClient) loop() { tickerCh = t.C } - write := func(msg *bmp.BMPMessage) error { - buf, _ := msg.Serialize() + write := func(msg *bmp.BMPMessage, options ...*bgp.MarshallingOption) error { + buf, _ := msg.Serialize(options...) _, err := conn.Write(buf) if err != nil { b.s.logger.Warn("failed to write to bmp server", @@ -197,14 +197,14 @@ func (b *bmpClient) loop() { } } for _, path := range pathList { - for _, u := range table.CreateUpdateMsgFromPaths([]*table.Path{path}) { - payload, _ := u.Serialize() - if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.PostPolicy, 0, true, info, path.GetTimestamp().Unix(), payload)); err != nil { + for _, u := range table.CreateUpdateMsgFromPaths([]*table.Path{path}, msg.MarshallingOptions...) { + payload, _ := u.Serialize(msg.MarshallingOptions...) + if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.PostPolicy, 0, true, info, path.GetTimestamp().Unix(), payload), msg.MarshallingOptions...); err != nil { return false } } } - } else if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.PostPolicy, 0, msg.FourBytesAs, info, msg.Timestamp.Unix(), msg.Payload)); err != nil { + } else if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.PostPolicy, 0, msg.FourBytesAs, info, msg.Timestamp.Unix(), msg.Payload), msg.MarshallingOptions...); err != nil { return false } case *watchEventBestPath: diff --git a/pkg/server/fsm.go b/pkg/server/fsm.go index 6bf4a307d..5f50d50fd 100644 --- a/pkg/server/fsm.go +++ b/pkg/server/fsm.go @@ -191,7 +191,7 @@ type fsm struct { peerInfo *table.PeerInfo gracefulRestartTimer *time.Timer twoByteAsTrans bool - marshallingOptions *bgp.MarshallingOption + marshallingOptions []*bgp.MarshallingOption notification chan *bgp.BGPMessage logger log.Logger } @@ -985,7 +985,7 @@ func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) { options := h.fsm.marshallingOptions h.fsm.lock.RUnlock() - m, err := bgp.ParseBGPBody(hd, bodyBuf, options) + m, err := bgp.ParseBGPBody(hd, bodyBuf, options...) if err != nil { handling = h.handlingError(m, err, useRevisedError) h.fsm.bgpMessageStateUpdate(0, true) @@ -1324,9 +1324,9 @@ func (h *fsmHandler) opensent(ctx context.Context) (bgp.FSMState, *fsmStateReaso fsm.capMap, fsm.rfMap = open2Cap(body, fsm.pConf) if _, y := fsm.capMap[bgp.BGP_CAP_ADD_PATH]; y { - fsm.marshallingOptions = &bgp.MarshallingOption{ + fsm.marshallingOptions = []*bgp.MarshallingOption{{ AddPath: fsm.rfMap, - } + }} } else { fsm.marshallingOptions = nil } @@ -1616,7 +1616,7 @@ func (h *fsmHandler) sendMessageloop(ctx context.Context, wg *sync.WaitGroup) er table.UpdatePathAttrs2ByteAs(m.Body.(*bgp.BGPUpdate)) table.UpdatePathAggregator2ByteAs(m.Body.(*bgp.BGPUpdate)) } - b, err := m.Serialize(h.fsm.marshallingOptions) + b, err := m.Serialize(h.fsm.marshallingOptions...) fsm.lock.RUnlock() if err != nil { fsm.lock.RLock() @@ -1720,7 +1720,7 @@ func (h *fsmHandler) sendMessageloop(ctx context.Context, wg *sync.WaitGroup) er h.fsm.lock.RLock() options := h.fsm.marshallingOptions h.fsm.lock.RUnlock() - for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options) { + for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options...) { if err := send(msg); err != nil { return nil } @@ -1789,7 +1789,7 @@ func (h *fsmHandler) established(ctx context.Context) (bgp.FSMState, *fsmStateRe case <-ctx.Done(): select { case m := <-fsm.notification: - b, _ := m.Serialize(h.fsm.marshallingOptions) + b, _ := m.Serialize(h.fsm.marshallingOptions...) h.conn.Write(b) default: // nothing to do diff --git a/pkg/server/server.go b/pkg/server/server.go index b9e221930..6d6fe5d92 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -866,18 +866,19 @@ func (s *BgpServer) notifyPrePolicyUpdateWatcher(peer *peer, pathList []*table.P _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &watchEventUpdate{ - Message: msg, - PeerAS: peer.fsm.peerInfo.AS, - LocalAS: peer.fsm.peerInfo.LocalAS, - PeerAddress: peer.fsm.peerInfo.Address, - LocalAddress: net.ParseIP(l), - PeerID: peer.fsm.peerInfo.ID, - FourBytesAs: y, - Timestamp: timestamp, - Payload: payload, - PostPolicy: false, - PathList: cloned, - Neighbor: n, + Message: msg, + PeerAS: peer.fsm.peerInfo.AS, + LocalAS: peer.fsm.peerInfo.LocalAS, + PeerAddress: peer.fsm.peerInfo.Address, + LocalAddress: net.ParseIP(l), + PeerID: peer.fsm.peerInfo.ID, + FourBytesAs: y, + Timestamp: timestamp, + Payload: payload, + PostPolicy: false, + PathList: cloned, + Neighbor: n, + MarshallingOptions: peer.fsm.marshallingOptions, } peer.fsm.lock.RUnlock() s.notifyWatcher(watchEventTypePreUpdate, ev) @@ -897,16 +898,17 @@ func (s *BgpServer) notifyPostPolicyUpdateWatcher(peer *peer, pathList []*table. _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &watchEventUpdate{ - PeerAS: peer.fsm.peerInfo.AS, - LocalAS: peer.fsm.peerInfo.LocalAS, - PeerAddress: peer.fsm.peerInfo.Address, - LocalAddress: net.ParseIP(l), - PeerID: peer.fsm.peerInfo.ID, - FourBytesAs: y, - Timestamp: cloned[0].GetTimestamp(), - PostPolicy: true, - PathList: cloned, - Neighbor: n, + PeerAS: peer.fsm.peerInfo.AS, + LocalAS: peer.fsm.peerInfo.LocalAS, + PeerAddress: peer.fsm.peerInfo.Address, + LocalAddress: net.ParseIP(l), + PeerID: peer.fsm.peerInfo.ID, + FourBytesAs: y, + Timestamp: cloned[0].GetTimestamp(), + PostPolicy: true, + PathList: cloned, + Neighbor: n, + MarshallingOptions: peer.fsm.marshallingOptions, } peer.fsm.lock.RUnlock() s.notifyWatcher(watchEventTypePostUpdate, ev) @@ -4143,19 +4145,20 @@ type watchEvent interface { } type watchEventUpdate struct { - Message *bgp.BGPMessage - PeerAS uint32 - LocalAS uint32 - PeerAddress net.IP - LocalAddress net.IP - PeerID net.IP - FourBytesAs bool - Timestamp time.Time - Payload []byte - PostPolicy bool - Init bool - PathList []*table.Path - Neighbor *config.Neighbor + Message *bgp.BGPMessage + PeerAS uint32 + LocalAS uint32 + PeerAddress net.IP + LocalAddress net.IP + PeerID net.IP + FourBytesAs bool + Timestamp time.Time + Payload []byte + PostPolicy bool + Init bool + PathList []*table.Path + Neighbor *config.Neighbor + MarshallingOptions []*bgp.MarshallingOption } type PeerEventType uint32 @@ -4442,16 +4445,17 @@ func (s *BgpServer) watch(opts ...watchOption) (w *watcher) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() update := &watchEventUpdate{ - PeerAS: peer.fsm.peerInfo.AS, - LocalAS: peer.fsm.peerInfo.LocalAS, - PeerAddress: peer.fsm.peerInfo.Address, - LocalAddress: net.ParseIP(l), - PeerID: peer.fsm.peerInfo.ID, - FourBytesAs: y, - Init: true, - PostPolicy: false, - Neighbor: configNeighbor, - PathList: peer.adjRibIn.PathList([]bgp.RouteFamily{rf}, false), + PeerAS: peer.fsm.peerInfo.AS, + LocalAS: peer.fsm.peerInfo.LocalAS, + PeerAddress: peer.fsm.peerInfo.Address, + LocalAddress: net.ParseIP(l), + PeerID: peer.fsm.peerInfo.ID, + FourBytesAs: y, + Init: true, + PostPolicy: false, + Neighbor: configNeighbor, + PathList: peer.adjRibIn.PathList([]bgp.RouteFamily{rf}, false), + MarshallingOptions: peer.fsm.marshallingOptions, } peer.fsm.lock.RUnlock() w.notify(update) @@ -4460,18 +4464,19 @@ func (s *BgpServer) watch(opts ...watchOption) (w *watcher) { eorBuf, _ := eor.Serialize() peer.fsm.lock.RLock() update = &watchEventUpdate{ - Message: eor, - PeerAS: peer.fsm.peerInfo.AS, - LocalAS: peer.fsm.peerInfo.LocalAS, - PeerAddress: peer.fsm.peerInfo.Address, - LocalAddress: net.ParseIP(l), - PeerID: peer.fsm.peerInfo.ID, - FourBytesAs: y, - Timestamp: time.Now(), - Init: true, - Payload: eorBuf, - PostPolicy: false, - Neighbor: configNeighbor, + Message: eor, + PeerAS: peer.fsm.peerInfo.AS, + LocalAS: peer.fsm.peerInfo.LocalAS, + PeerAddress: peer.fsm.peerInfo.Address, + LocalAddress: net.ParseIP(l), + PeerID: peer.fsm.peerInfo.ID, + FourBytesAs: y, + Timestamp: time.Now(), + Init: true, + Payload: eorBuf, + PostPolicy: false, + Neighbor: configNeighbor, + MarshallingOptions: peer.fsm.marshallingOptions, } peer.fsm.lock.RUnlock() w.notify(update) @@ -4506,6 +4511,7 @@ func (s *BgpServer) watch(opts ...watchOption) (w *watcher) { Neighbor: configNeighbor, PathList: paths, Init: true, + // TODO: MarshallingOptions: peer.fsm.marshallingOptions, }) eor := bgp.NewEndOfRib(rf)