-
Notifications
You must be signed in to change notification settings - Fork 50
/
packet_pool.go
130 lines (109 loc) · 3.37 KB
/
packet_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package astits
import (
"sort"
)
// packetAccumulator keeps track of packets for a single PID and decides when to flush them
type packetAccumulator struct {
pid uint16
programMap *programMap
q []*Packet
}
// newPacketAccumulator creates a new packet queue for a single PID
func newPacketAccumulator(pid uint16, programMap *programMap) *packetAccumulator {
return &packetAccumulator{
pid: pid,
programMap: programMap,
}
}
// add adds a new packet for this PID to the queue
func (b *packetAccumulator) add(p *Packet) (ps []*Packet) {
mps := b.q
// Empty buffer if we detect a discontinuity
if hasDiscontinuity(mps, p) {
// Reset current slice or make new
if cap(mps) > 0 {
mps = mps[:0]
} else {
mps = make([]*Packet, 0, 10)
}
}
// Throw away packet if it's the same as the previous one
if isSameAsPrevious(mps, p) {
return
}
// Flush buffer if new payload starts here
if p.Header.PayloadUnitStartIndicator {
ps = mps
mps = make([]*Packet, 0, cap(mps))
}
mps = append(mps, p)
// Check if PSI payload is complete
if b.programMap != nil &&
(b.pid == PIDPAT || b.programMap.existsUnlocked(b.pid)) &&
isPSIComplete(mps) {
ps = mps
mps = nil
}
b.q = mps
return
}
// packetPool represents a queue of packets for each PID in the stream
type packetPool struct {
// We use map[uint32] instead map[uint16] as go runtime provide optimized hash functions for (u)int32/64 keys
b map[uint32]*packetAccumulator // Indexed by PID
programMap *programMap
}
// newPacketPool creates a new packet pool with an optional parser and programMap
func newPacketPool(programMap *programMap) *packetPool {
return &packetPool{
b: make(map[uint32]*packetAccumulator),
programMap: programMap,
}
}
// addUnlocked adds a new packet to the pool
func (b *packetPool) addUnlocked(p *Packet) (ps []*Packet) {
// Throw away packet if error indicator
if p.Header.TransportErrorIndicator {
return
}
// Throw away packets that don't have a payload until we figure out what we're going to do with them
// TODO figure out what we're going to do with them :D
if !p.Header.HasPayload {
return
}
// Make sure accumulator exists
acc, ok := b.b[uint32(p.Header.PID)]
if !ok {
acc = newPacketAccumulator(p.Header.PID, b.programMap)
b.b[uint32(p.Header.PID)] = acc
}
// Add to the accumulator
return acc.add(p)
}
// dumpUnlocked dumps the packet pool by looking for the first item with packets inside
func (b *packetPool) dumpUnlocked() (ps []*Packet) {
var keys []int
for k := range b.b {
keys = append(keys, int(k))
}
sort.Ints(keys)
for _, k := range keys {
ps = b.b[uint32(k)].q
delete(b.b, uint32(k))
if len(ps) > 0 {
return
}
}
return
}
// hasDiscontinuity checks whether a packet is discontinuous with a set of packets
func hasDiscontinuity(ps []*Packet, p *Packet) bool {
l := len(ps)
return (p.Header.HasAdaptationField && p.AdaptationField.DiscontinuityIndicator) || (l > 0 && ((p.Header.HasPayload && p.Header.ContinuityCounter != (ps[l-1].Header.ContinuityCounter+1)%16) ||
(!p.Header.HasPayload && p.Header.ContinuityCounter != ps[l-1].Header.ContinuityCounter)))
}
// isSameAsPrevious checks whether a packet is the same as the last packet of a set of packets
func isSameAsPrevious(ps []*Packet, p *Packet) bool {
l := len(ps)
return l > 0 && p.Header.HasPayload && p.Header.ContinuityCounter == ps[l-1].Header.ContinuityCounter
}