Skip to content

Commit

Permalink
feat(collector): migrate the collector from pegasus-kv/collector
Browse files Browse the repository at this point in the history
  • Loading branch information
neverchanje authored and acelyc111 committed Apr 26, 2023
1 parent e0d1920 commit 41f2897
Show file tree
Hide file tree
Showing 33 changed files with 2,692 additions and 9 deletions.
72 changes: 72 additions & 0 deletions .github/workflows/lint_and_test_collector.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
---
# workflow name
name: Golang Lint and Unit Test - collector

# on events
on:
# run on each pull request
pull_request:
types: [ synchronize, reopened, opened ]
branches:
- master
- 'v[0-9]+.*' # release branch
- ci-test # testing branch for github action
- '*dev'
paths:
- collector/**

# for manually triggering workflow
workflow_dispatch:

# workflow tasks
jobs:
lint:
name: Lint
runs-on: ubuntu-20.04
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 1
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.14
cache: false
- name: Lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.29
working-directory: ./collector

build:
name: Build
runs-on: ubuntu-20.04
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 1
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.14
- name: Build
working-directory: ./collector
run: make
2 changes: 2 additions & 0 deletions .github/workflows/module_labeler_conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ github:
- .github/**/*
admin-cli:
- admin-cli/**/*
collector:
- collector/**/*
docker:
- docker/**/*
go-client:
Expand Down
19 changes: 10 additions & 9 deletions .github/workflows/standardization_lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,16 @@ jobs:
bodyRegex: '#(\d+)'
bodyURLRegex: 'http(s?):\/\/(github.com)(\/apache)(\/incubator-pegasus)(\/issues)\/\d+'

dockerfile_linter:
name: Lint Dockerfile
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: hadolint/[email protected]
with:
recursive: true
ignore: 'DL3033,DL3013,DL3059,SC2086,DL3003,SC2164,DL3008,DL3007,DL3006,DL4001'
# TODO(yingchun): hadolint/[email protected] is not allowed to be used in apache/incubator-pegasus.
# dockerfile_linter:
# name: Lint Dockerfile
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v3
# - uses: hadolint/[email protected]
# with:
# recursive: true
# ignore: 'DL3033,DL3013,DL3059,SC2086,DL3003,SC2164,DL3008,DL3007,DL3006,DL4001'

license_check:
name: Check License
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,6 @@ thirdparty/output/

#macOS
.DS_Store

#collector
collector/collector
7 changes: 7 additions & 0 deletions collector/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
build:
go mod tidy
go mod verify
go build -o collector

fmt:
go fmt ./...
9 changes: 9 additions & 0 deletions collector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Pegasus Collector

[中文文档]

Collector is a part of the Pegasus ecosystem that serves as:

1. the service availability detector
2. the hotkey detector
3. the capacity units recorder
75 changes: 75 additions & 0 deletions collector/aggregate/aggregatable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package aggregate

var v1Tov2MetricsConversion = map[string]string{
"replica*app.pegasus*get_qps": "get_qps",
"replica*app.pegasus*multi_get_qps": "multi_get_qps",
"replica*app.pegasus*put_qps": "put_qps",
"replica*app.pegasus*multi_put_qps": "multi_put_qps",
"replica*app.pegasus*remove_qps": "remove_qps",
"replica*app.pegasus*multi_remove_qps": "multi_remove_qps",
"replica*app.pegasus*incr_qps": "incr_qps",
"replica*app.pegasus*check_and_set_qps": "check_and_set_qps",
"replica*app.pegasus*check_and_mutate_qps": "check_and_mutate_qps",
"replica*app.pegasus*scan_qps": "scan_qps",
"replica*eon.replica*backup_request_qps": "backup_request_qps",
"replica*app.pegasus*duplicate_qps": "duplicate_qps",
"replica*app.pegasus*dup_shipped_ops": "dup_shipped_ops",
"replica*app.pegasus*dup_failed_shipping_ops": "dup_failed_shipping_ops",
"replica*app.pegasus*get_bytes": "get_bytes",
"replica*app.pegasus*multi_get_bytes": "multi_get_bytes",
"replica*app.pegasus*scan_bytes": "scan_bytes",
"replica*app.pegasus*put_bytes": "put_bytes",
"replica*app.pegasus*multi_put_bytes": "multi_put_bytes",
"replica*app.pegasus*check_and_set_bytes": "check_and_set_bytes",
"replica*app.pegasus*check_and_mutate_bytes": "check_and_mutate_bytes",
"replica*app.pegasus*recent.read.cu": "recent_read_cu",
"replica*app.pegasus*recent.write.cu": "recent_write_cu",
"replica*app.pegasus*recent.expire.count": "recent_expire_count",
"replica*app.pegasus*recent.filter.count": "recent_filter_count",
"replica*app.pegasus*recent.abnormal.count": "recent_abnormal_count",
"replica*eon.replica*recent.write.throttling.delay.count": "recent_write_throttling_delay_count",
"replica*eon.replica*recent.write.throttling.reject.count": "recent_write_throttling_reject_count",
"replica*app.pegasus*disk.storage.sst(MB)": "sst_storage_mb",
"replica*app.pegasus*disk.storage.sst.count": "sst_count",
"replica*app.pegasus*rdb.block_cache.hit_count": "rdb_block_cache_hit_count",
"replica*app.pegasus*rdb.block_cache.total_count": "rdb_block_cache_total_count",
"replica*app.pegasus*rdb.index_and_filter_blocks.memory_usage": "rdb_index_and_filter_blocks_mem_usage",
"replica*app.pegasus*rdb.memtable.memory_usage": "rdb_memtable_mem_usage",
"replica*app.pegasus*rdb.estimate_num_keys": "rdb_estimate_num_keys",
"replica*app.pegasus*rdb.bf_seek_negatives": "rdb_bf_seek_negatives",
"replica*app.pegasus*rdb.bf_seek_total": "rdb_bf_seek_total",
"replica*app.pegasus*rdb.bf_point_positive_true": "rdb_bf_point_positive_true",
"replica*app.pegasus*rdb.bf_point_positive_total": "rdb_bf_point_positive_total",
"replica*app.pegasus*rdb.bf_point_negatives": "rdb_bf_point_negatives",
}

var aggregatableSet = map[string]interface{}{
"read_qps": nil,
"write_qps": nil,
"read_bytes": nil,
"write_bytes": nil,
}

// aggregatable returns whether the counter is to be aggregated on collector,
// including v1Tov2MetricsConversion and aggregatableSet.
func aggregatable(pc *partitionPerfCounter) bool {
v2Name, found := v1Tov2MetricsConversion[pc.name]
if found { // ignored
pc.name = v2Name
return true // listed above are all aggregatable
}
_, found = aggregatableSet[pc.name]
return found
}

// AllMetrics returns metrics tracked within this collector.
// The sets of metrics from cluster level and table level are completely equal.
func AllMetrics() (res []string) {
for _, newName := range v1Tov2MetricsConversion {
res = append(res, newName)
}
for name := range aggregatableSet {
res = append(res, name)
}
return res
}
156 changes: 156 additions & 0 deletions collector/aggregate/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package aggregate

import (
"fmt"
"time"

"github.com/apache/incubator-pegasus/go-client/idl/admin"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gopkg.in/tomb.v2"
)

// TableStatsAggregator aggregates the metric on each partition into table-level metrics.
// It's reponsible for all tables in the pegasus cluster.
// After all TableStats have been collected, TableStatsAggregator sums them up into a
// ClusterStats. Users of this pacakage can use the hooks to watch every changes of the stats.
type TableStatsAggregator interface {
Aggregate() (map[int32]*TableStats, *ClusterStats, error)

Close()
}

// NewTableStatsAggregator returns a TableStatsAggregator instance.
func NewTableStatsAggregator(metaAddrs []string) TableStatsAggregator {
return &tableStatsAggregator{
tables: make(map[int32]*TableStats),
client: NewPerfClient(metaAddrs),
}
}

type tableStatsAggregator struct {
tables map[int32]*TableStats
allStats *ClusterStats

client *PerfClient
}

// Start looping for metrics aggregation
func Start(tom *tomb.Tomb) {
aggregateInterval := viper.GetDuration("metrics.report_interval")
ticker := time.NewTicker(aggregateInterval)

metaAddr := viper.GetString("meta_server")
iAg := NewTableStatsAggregator([]string{metaAddr})
ag := iAg.(*tableStatsAggregator)

for {
select {
case <-tom.Dying(): // check if context cancelled
return
case <-ticker.C:
}

_, _, err := ag.Aggregate()
if err != nil {
log.Error(err)
}

// produce stats for the hooks
var batchTableStats []TableStats
for _, table := range ag.tables {
batchTableStats = append(batchTableStats, *table)
}
ag.aggregateClusterStats()
hooksManager.afterTableStatsEmitted(batchTableStats, *ag.allStats)
}
}

func (ag *tableStatsAggregator) Aggregate() (map[int32]*TableStats, *ClusterStats, error) {
err := ag.updateTableMap()
if err != nil {
return nil, nil, fmt.Errorf("failed to aggregate: %s", err)
}

// TODO(wutao1): reduce meta queries for listing nodes
partitions, err := ag.client.GetPartitionStats()
if err != nil {
return nil, nil, fmt.Errorf("failed to aggregate: %s", err)
}
for _, p := range partitions {
ag.updatePartitionStat(p)
}

for _, table := range ag.tables {
table.aggregate()
}

return ag.tables, ag.allStats, nil
}

func (ag *tableStatsAggregator) Close() {
ag.client.Close()
}

func (ag *tableStatsAggregator) aggregateClusterStats() {
ag.allStats = &ClusterStats{
Stats: make(map[string]float64),
Timestamp: time.Now(),
}
for _, table := range ag.tables {
for k, v := range table.Stats {
ag.allStats.Stats[k] += v
}
}
}

// Some tables may disappear (be dropped) or first show up.
// This function maintains the local table map
// to keep consistent with the pegasus cluster.
func (ag *tableStatsAggregator) updateTableMap() error {
tables, err := ag.client.listTables()
if err != nil {
return err
}
ag.doUpdateTableMap(tables)
return nil
}

func (ag *tableStatsAggregator) doUpdateTableMap(tables []*admin.AppInfo) {
currentTableSet := make(map[int32]*struct{})
for _, tb := range tables {
currentTableSet[tb.AppID] = nil
if _, found := ag.tables[tb.AppID]; !found {
// non-exisistent table, create it
ag.tables[tb.AppID] = newTableStats(tb)
log.Infof("found new table: %+v", tb)

// TODO(wutao1): some tables may have partitions splitted,
// recreate the tableStats then.
}
}
for appID, tb := range ag.tables {
// disappeared table, delete it
if _, found := currentTableSet[appID]; !found {
log.Infof("remove table from collector: {AppID: %d, PartitionCount: %d}", appID, len(tb.Partitions))
delete(ag.tables, appID)

hooksManager.afterTableDropped(appID)
}
}
}

// Update the counter value.
func (ag *tableStatsAggregator) updatePartitionStat(pc *PartitionStats) {
tb, found := ag.tables[pc.Gpid.Appid]
if !found {
// Ignore the perf-counter because there's currently no such table
return
}
part, found := tb.Partitions[int(pc.Gpid.PartitionIndex)]
if !found {
log.Errorf("no such partition %+v", pc.Gpid)
return
}
*part = *pc
}
Loading

0 comments on commit 41f2897

Please sign in to comment.