Skip to content

Commit

Permalink
Fix consumer progress (#296)
Browse files Browse the repository at this point in the history
  • Loading branch information
crimson-gao authored Nov 19, 2024
1 parent d62a872 commit e75712d
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 102 deletions.
4 changes: 2 additions & 2 deletions consumer/shard_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func (consumer *ShardConsumerWorker) consume() {
consumer.updateStatus(false)
return
}
err := consumer.nextFetchTask()
consumer.updateStatus(err == nil && consumer.lastFetchGroupCount > 0)
hasProgress, err := consumer.nextFetchTask()
consumer.updateStatus(err == nil && hasProgress)
}()
case PROCESSING:
go func() {
Expand Down
22 changes: 12 additions & 10 deletions consumer/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) {
if err != nil {
return "", err
}
if checkpoint != "" && err == nil {
if checkpoint != "" {
consumer.consumerCheckPointTracker.initCheckPoint(checkpoint)
return checkpoint, nil
}
Expand Down Expand Up @@ -45,23 +45,23 @@ func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) {
return "", errors.New("CursorPositionError")
}

func (consumer *ShardConsumerWorker) nextFetchTask() error {
func (consumer *ShardConsumerWorker) nextFetchTask() (hasProgress bool, err error) {
// update last fetch time, for control fetch frequency
consumer.lastFetchTime = time.Now()

logGroup, pullLogMeta, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor)
cursor := consumer.nextFetchCursor
logGroup, pullLogMeta, err := consumer.client.pullLogs(consumer.shardId, cursor)
if err != nil {
return err
return false, err
}
// set cursors user to decide whether to save according to the execution of `process`
consumer.consumerCheckPointTracker.setCurrentCursor(consumer.nextFetchCursor)
consumer.lastFetchLogGroupList = logGroup
consumer.nextFetchCursor = pullLogMeta.NextCursor
consumer.lastFetchRawSize = pullLogMeta.RawSize
consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList)
consumer.lastFetchGroupCount = pullLogMeta.Count
if consumer.client.option.Query != "" {
consumer.lastFetchRawSizeBeforeQuery = pullLogMeta.RawSizeBeforeQuery
consumer.lastFetchGroupCountBeforeQuery = pullLogMeta.RawDataCountBeforeQuery
consumer.lastFetchGroupCountBeforeQuery = pullLogMeta.DataCountBeforeQuery
if consumer.lastFetchRawSizeBeforeQuery == -1 {
consumer.lastFetchRawSizeBeforeQuery = 0
}
Expand All @@ -74,13 +74,15 @@ func (consumer *ShardConsumerWorker) nextFetchTask() error {
"shardId", consumer.shardId,
"fetch log count", consumer.lastFetchGroupCount,
)
if consumer.lastFetchGroupCount == 0 {

// if cursor == nextCursor, no progress is needed
if cursor == pullLogMeta.NextCursor {
consumer.lastFetchLogGroupList = nil
// may no new data can be pulled, no process func can trigger checkpoint saving
consumer.saveCheckPointIfNeeded()
return false, nil
}

return nil
return true, nil
}

func (consumer *ShardConsumerWorker) consumerProcessTask() (rollBackCheckpoint string, err error) {
Expand Down
36 changes: 30 additions & 6 deletions consumer/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ func TestStartAndStop(t *testing.T) {
CursorPosition: BEGIN_CURSOR,
}

worker := InitConsumerWorker(option, process)
worker := InitConsumerWorkerWithCheckpointTracker(option, process)

worker.Start()
worker.StopAndWait()
}

func process(shardId int, logGroupList *sls.LogGroupList) string {
fmt.Printf("shardId %d processing works sucess", shardId)
return ""
func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) {
fmt.Printf("shardId %d processing works sucess, logGroupSize: %d\n", shardId, len(logGroupList.LogGroups))
checkpointTracker.SaveCheckPoint(true)
return "", nil
}

func TestStartAndStopCredentialsProvider(t *testing.T) {
Expand All @@ -46,12 +47,35 @@ func TestStartAndStopCredentialsProvider(t *testing.T) {
ConsumerName: "test-consumer-1",
// This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed.
// Could be "begin", "end", "specific time format in time stamp", it's log receiving time.
CursorPosition: BEGIN_CURSOR,
CursorPosition: BEGIN_CURSOR,
AutoCommitDisabled: false,
}

worker := InitConsumerWorker(option, process)
worker := InitConsumerWorkerWithCheckpointTracker(option, process)

worker.Start()
time.Sleep(time.Second * 20)
worker.StopAndWait()
}

func TestConsumerQueryNoData(t *testing.T) {
option := LogHubConfig{
Endpoint: os.Getenv("LOG_TEST_ENDPOINT"),
CredentialsProvider: sls.NewStaticCredentialsProvider(
os.Getenv("LOG_TEST_ACCESS_KEY_ID"),
os.Getenv("LOG_TEST_ACCESS_KEY_SECRET"), ""),
Project: os.Getenv("LOG_TEST_PROJECT"),
Logstore: os.Getenv("LOG_TEST_LOGSTORE"),
ConsumerGroupName: "test-consumer",
ConsumerName: "test-consumer-1",
CursorPosition: END_CURSOR,
Query: "* | where \"request_method\" = 'GET'",
}

worker := InitConsumerWorkerWithCheckpointTracker(option, process)

worker.Start()
time.Sleep(time.Second * 2000)
worker.StopAndWait()

}
132 changes: 57 additions & 75 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) ([]byte, string, error) {
// GetLogsBytes gets logs binary data from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next curosr can be used to read logs at next time.
func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, pullLogMeta *PullLogMeta, err error) {
func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) ([]byte, *PullLogMeta, error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Accept": "application/x-protobuf",
Expand All @@ -514,102 +514,84 @@ func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, pullL

r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return
return nil, nil, err
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
return nil, nil, err
}
pullLogMeta = &PullLogMeta{}
pullLogMeta.Netflow = len(buf)
if r.StatusCode != http.StatusOK {
errMsg := &Error{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to get cursor")
dump, _ := httputil.DumpResponse(r, true)
if IsDebugLevelMatched(1) {
level.Error(Logger).Log("msg", string(dump))
}
return
return nil, nil, fmt.Errorf("failed parse errorCode json: %w", err)
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
return nil, nil, fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
}
v, ok := r.Header["X-Log-Compresstype"]
if !ok || len(v) == 0 {
err = fmt.Errorf("can't find 'x-log-compresstype' header")
return
netflow := len(buf)

nextCursor, err := parseHeaderString(r.Header, "X-Log-Cursor")
if err != nil {
return nil, nil, err
}
var compressType = Compress_None
if v[0] == "lz4" {
compressType = Compress_LZ4
} else if v[0] == "zstd" {
compressType = Compress_ZSTD
} else {
err = fmt.Errorf("unexpected compress type:%v", compressType)
return
rawSize, err := ParseHeaderInt(r, "X-Log-Bodyrawsize")
if err != nil {
return nil, nil, err
}

v, ok = r.Header["X-Log-Cursor"]
if !ok || len(v) == 0 {
err = fmt.Errorf("can't find 'x-log-cursor' header")
return
count, err := ParseHeaderInt(r, "X-Log-Count")
if err != nil {
return nil, nil, err
}
pullLogMeta.NextCursor = v[0]
pullLogMeta.RawSize, err = ParseHeaderInt(r, "X-Log-Bodyrawsize")
pullMeta := &PullLogMeta{
RawSize: rawSize,
NextCursor: nextCursor,
Netflow: netflow,
Count: count,
}
// If query is not nil, extract more headers
if plr.Query != "" {
pullMeta.RawSizeBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatasize")
pullMeta.DataCountBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatacount")
pullMeta.Lines, _ = ParseHeaderInt(r, "X-Log-Resultlines")
pullMeta.LinesBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatalines")
pullMeta.FailedLines, _ = ParseHeaderInt(r, "X-Log-Failedlines")
}
if rawSize == 0 {
return make([]byte, 0), pullMeta, nil
}

// decompress data
out := make([]byte, rawSize)
compressType, err := parseHeaderString(r.Header, "X-Log-Compresstype")
if err != nil {
return
return nil, nil, err
}
if pullLogMeta.RawSize > 0 {
out = make([]byte, pullLogMeta.RawSize)
switch compressType {
case Compress_LZ4:
uncompressedSize := 0
if uncompressedSize, err = lz4.UncompressBlock(buf, out); err != nil {
return
}
if uncompressedSize != pullLogMeta.RawSize {
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", uncompressedSize, pullLogMeta.RawSize)
}
case Compress_ZSTD:
out, err = slsZstdCompressor.Decompress(buf, out)
if err != nil {
return nil, nil, err
}
if len(out) != pullLogMeta.RawSize {
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", len(out), pullLogMeta.RawSize)
}
default:
return nil, nil, fmt.Errorf("unexpected compress type: %d", compressType)
switch compressType {
case "lz4":
uncompressedSize := 0
if uncompressedSize, err = lz4.UncompressBlock(buf, out); err != nil {
return nil, nil, err
}
if uncompressedSize != rawSize {
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", uncompressedSize, rawSize)
}
case "zstd":
out, err = slsZstdCompressor.Decompress(buf, out)
if err != nil {
return nil, nil, err
}
if len(out) != rawSize {
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", len(out), rawSize)
}
default:
return nil, nil, fmt.Errorf("unexpected compress type: %s", compressType)
}
// todo: add query meta
// If query is not nil, extract more headers
// if plr.Query != "" {
// pullLogMeta.RawSizeBeforeQuery, err = ParseHeaderInt(r, "X-Log-Rawdatasize")
// if err != nil {
// return
// }
// pullLogMeta.DataCountBeforeQuery, err = ParseHeaderInt(r, "X-Log-Rawdatacount")
// if err != nil {
// return
// }
// pullLogMeta.Lines, err = ParseHeaderInt(r, "X-Log-Resultlines")
// if err != nil {
// return
// }
// pullLogMeta.LinesBeforeQuery, err = ParseHeaderInt(r, "X-Log-Rawdatalines")
// if err != nil {
// return
// }
// pullLogMeta.FailedLines, err = ParseHeaderInt(r, "X-Log-Failedlines")
// if err != nil {
// return
// }
// }
return
return out, pullMeta, nil
}

// LogsBytesDecode decodes logs binary data returned by GetLogsBytes API
Expand Down
19 changes: 10 additions & 9 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,16 @@ func (plr *PullLogRequest) ToURLParams() url.Values {
}

type PullLogMeta struct {
NextCursor string
Netflow int
RawSize int
RawDataCountBeforeQuery int
RawSizeBeforeQuery int
Lines int
LinesBeforeQuery int
FailedLines int
DataCountBeforeQuery int
NextCursor string
Netflow int
RawSize int
Count int
// these fields are only present when query is set
RawSizeBeforeQuery int // processed raw size before query
Lines int // result lines after query
LinesBeforeQuery int // processed lines before query
FailedLines int // failed lines during query
DataCountBeforeQuery int // processed logGroup count before query
}

// GetHistogramsResponse defines response from GetHistograms call
Expand Down
8 changes: 8 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ func ParseHeaderInt(r *http.Response, headerName string) (int, error) {
}
return -1, fmt.Errorf("can't find '%s' header", strings.ToLower(headerName))
}

func parseHeaderString(header http.Header, headerName string) (string, error) {
v, ok := header[headerName]
if !ok || len(v) == 0 {
return "", fmt.Errorf("can't find '%s' header", strings.ToLower(headerName))
}
return v[0], nil
}

0 comments on commit e75712d

Please sign in to comment.