-
Notifications
You must be signed in to change notification settings - Fork 2
/
fetch.go
202 lines (173 loc) · 6.37 KB
/
fetch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package client
// FetchRequest is used to fetch a chunk of one or more logs for some topic-partitions.
type FetchRequest struct {
MaxWait int32
MinBytes int32
RequestInfo map[string][]*PartitionFetchInfo
}
// Write writes the FetchRequest to the given Encoder.
func (fr *FetchRequest) Write(encoder Encoder) {
//Normal client consumers should always specify ReplicaId as -1 as they have no node id
encoder.WriteInt32(-1)
encoder.WriteInt32(fr.MaxWait)
encoder.WriteInt32(fr.MinBytes)
encoder.WriteInt32(int32(len(fr.RequestInfo)))
for topic, partitionFetchInfos := range fr.RequestInfo {
encoder.WriteString(topic)
encoder.WriteInt32(int32(len(partitionFetchInfos)))
for _, info := range partitionFetchInfos {
encoder.WriteInt32(info.Partition)
encoder.WriteInt64(info.Offset)
encoder.WriteInt32(info.FetchSize)
}
}
}
// Key returns the Kafka API key for FetchRequest.
func (fr *FetchRequest) Key() int16 {
return 1
}
// Version returns the Kafka request version for backwards compatibility.
func (fr *FetchRequest) Version() int16 {
return 0
}
// AddFetch is a convenience method to add a PartitionFetchInfo.
func (fr *FetchRequest) AddFetch(topic string, partition int32, offset int64, fetchSize int32) {
if fr.RequestInfo == nil {
fr.RequestInfo = make(map[string][]*PartitionFetchInfo)
}
fr.RequestInfo[topic] = append(fr.RequestInfo[topic], &PartitionFetchInfo{Partition: partition, Offset: offset, FetchSize: fetchSize})
}
// FetchResponse contains FetchResponseData for all requested topics and partitions.
type FetchResponse struct {
Data map[string]map[int32]*FetchResponsePartitionData
}
func (fr *FetchResponse) Read(decoder Decoder) *DecodingError {
fr.Data = make(map[string]map[int32]*FetchResponsePartitionData)
blocksLength, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidBlocksLength)
}
for i := int32(0); i < blocksLength; i++ {
topic, err := decoder.GetString()
if err != nil {
return NewDecodingError(err, reasonInvalidBlockTopic)
}
fr.Data[topic] = make(map[int32]*FetchResponsePartitionData)
fetchResponseDataLength, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidFetchResponseDataLength)
}
for j := int32(0); j < fetchResponseDataLength; j++ {
partition, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidFetchResponseDataPartition)
}
fetchResponseData := new(FetchResponsePartitionData)
decodingErr := fetchResponseData.Read(decoder)
if decodingErr != nil {
return decodingErr
}
fr.Data[topic][partition] = fetchResponseData
}
}
return nil
}
// GetMessages traverses this FetchResponse and collects all messages.
// Returns an error if FetchResponse contains one.
// Messages should be ordered by offset.
func (fr *FetchResponse) GetMessages() ([]*MessageAndMetadata, error) {
var messages []*MessageAndMetadata
collector := func(topic string, partition int32, offset int64, key []byte, value []byte) error {
messages = append(messages, &MessageAndMetadata{
Topic: topic,
Partition: partition,
Offset: offset,
Key: key,
Value: value,
})
return nil
}
err := fr.CollectMessages(collector)
return messages, err
}
// Error returns the error message for a given topic and pertion of this FetchResponse
func (fr *FetchResponse) Error(topic string, partition int32) error {
t, ok := fr.Data[topic]
if !ok {
return nil
}
p, ok := t[partition]
if !ok {
return nil
}
return p.Error
}
// CollectMessages traverses this FetchResponse and applies a collector function to each message
// giving the possibility to avoid response -> kafka-client.Message -> other.Message conversion if necessary.
func (fr *FetchResponse) CollectMessages(collector func(topic string, partition int32, offset int64, key []byte, value []byte) error) error {
for topic, partitionAndData := range fr.Data {
for partition, data := range partitionAndData {
if data.Error != ErrNoError {
return data.Error
}
for _, messageAndOffset := range data.Messages {
if messageAndOffset.Message.Nested != nil {
for _, nested := range messageAndOffset.Message.Nested {
err := collector(topic, partition, nested.Offset, nested.Message.Key, nested.Message.Value)
if err != nil {
return err
}
}
} else {
err := collector(topic, partition, messageAndOffset.Offset, messageAndOffset.Message.Key, messageAndOffset.Message.Value)
if err != nil {
return err
}
}
}
}
}
return nil
}
// PartitionFetchInfo contains information about what partition to fetch, what offset to fetch from and the maximum bytes to include in the message set for this partition.
type PartitionFetchInfo struct {
Partition int32
Offset int64
FetchSize int32
}
// FetchResponsePartitionData contains fetched messages for a single partition, the offset at the end of the log for this partition and an error code.
type FetchResponsePartitionData struct {
Error error
HighwaterMarkOffset int64
Messages []*MessageAndOffset
}
func (frd *FetchResponsePartitionData) Read(decoder Decoder) *DecodingError {
errCode, err := decoder.GetInt16()
if err != nil {
return NewDecodingError(err, reasonInvalidFetchResponseDataErrorCode)
}
frd.Error = BrokerErrors[errCode]
highwaterMarkOffset, err := decoder.GetInt64()
if err != nil {
return NewDecodingError(err, reasonInvalidFetchResponseDataHighwaterMarkOffset)
}
frd.HighwaterMarkOffset = highwaterMarkOffset
if _, err = decoder.GetInt32(); err != nil {
return NewDecodingError(err, reasonInvalidMessageSetLength)
}
messages, decodingErr := ReadMessageSet(decoder)
if decodingErr != nil {
return decodingErr
}
frd.Messages = messages
return nil
}
const (
reasonInvalidBlocksLength = "Invalid length for Blocks field"
reasonInvalidBlockTopic = "Invalid topic in block"
reasonInvalidFetchResponseDataLength = "Invalid length for FetchResponseData field"
reasonInvalidFetchResponseDataPartition = "Invalid partition in FetchResponseData"
reasonInvalidFetchResponseDataErrorCode = "Invalid error code in FetchResponseData"
reasonInvalidFetchResponseDataHighwaterMarkOffset = "Invalid highwater mark offset in FetchResponseData"
reasonInvalidMessageSetLength = "Invalid MessageSet length"
)