Skip to content

Commit

Permalink
[spanChunk ] common library
Browse files Browse the repository at this point in the history
- pack pinpoint-common
- add cpp example
- add get depth
- remove unused code
- fixed CI
- test_pinpoint.cpp and test_pinpoint.c
- common library support spanChunk
- rename calls to fols
- add thread_local pool
- support "Async Invocation .."
  • Loading branch information
eeliu committed Oct 10, 2024
1 parent bc936c9 commit bc1486f
Show file tree
Hide file tree
Showing 139 changed files with 4,808 additions and 3,831 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ jobs:
Collector-agent:
strategy:
matrix:
go-version: [1.18.x]
go-version: [1.19.x]
os: [ubuntu-latest,windows-latest,macos-latest]
runs-on: ${{ matrix.os }}
steps:
Expand Down
14 changes: 12 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
md5=($(md5sum ${{ env.MODULE_PAK }} ))
PACK_NAME=${{ env.MODULE_PAK }}-${md5}-$(date '+%Y-%m-%d').tar.gz
mv ${{ env.MODULE_PAK }} $PACK_NAME
echo "PACK=$PACK_NAME" >> $GITHUB_OUTPUT
echo "PACK=$PACK_NAME" >> $GITHUB_OUTPUT
- name: Release php_package
id: rel_php_pak
uses: softprops/action-gh-release@v2
Expand All @@ -39,4 +39,14 @@ jobs:
uses: softprops/action-gh-release@v2
if: startsWith(github.ref, 'refs/tags/')
with:
files: install_pinpoint_php.sh
files: install_pinpoint_php.sh

- name: pack pinpoint-common
run: |
tar zcvf pinpoint-common.tar.gz common
- name: Release pinpoint-common
uses: softprops/action-gh-release@v2
if: startsWith(github.ref, 'refs/tags/')
with:
files: pinpoint-common.tar.gz
Binary file added collector-agent/1.cap
Binary file not shown.
2 changes: 1 addition & 1 deletion collector-agent/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.18-alpine3.17 as builder
FROM golang:1.19-alpine3.17 as builder
WORKDIR /apps
USER root
RUN apk add --no-cache make protobuf-dev git
Expand Down
68 changes: 43 additions & 25 deletions collector-agent/agent/AgentRouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/pinpoint-apm/pinpoint-c-agent/collector-agent/common"

log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
)

type RawPacket struct {
Expand All @@ -27,13 +27,25 @@ type I_PacketRouter interface {
type AgentRouter struct {
AgentMap map[string]*GrpcAgent
PingId int32
Quit chan bool
WG *sync.WaitGroup
rwMutex sync.RWMutex
Config *common.Config
Log *logrus.Logger
}

func CreateAgentRouter(config *common.Config) *AgentRouter {
router := &AgentRouter{
PingId: 1,
AgentMap: make(map[string]*GrpcAgent),
Config: config,
Log: config.Log,
}
return router
}

type TSpanEvent struct {
Name string `json:"name"`
Sequence int32 `json:":seq"`
Depth int32 `json:":depth"`
ExceptionInfo string `json:"EXP,omitempty"`
ExceptionInfoV2 *TExceptionInfo `json:"EXP_V2,omitempty"`
DestinationId string `json:"dst,omitempty"`
Expand All @@ -44,9 +56,9 @@ type TSpanEvent struct {
StartElapsedV2 int32 `json:":S"`
EndElapsedV2 int32 `json:":E"`
ServiceType int32 `json:"stp,string"`
Clues []string `json:"clues,omitempty"`
Calls []*TSpanEvent `json:"calls,omitempty"`
SqlMeta string `json:"SQL,omitempty"`
Annotations []string `json:"anno,omitempty"`
SqlMeta *string `json:"SQL,omitempty"`
AsyId int32 `json:"asyId,string,omitempty"`
}

func (spanEv *TSpanEvent) GetEndElapsed() int32 {
Expand Down Expand Up @@ -77,6 +89,11 @@ type TExceptionInfo struct {
StartTime int64 `json:":S"`
}

type TAsyncId struct {
AsyncId int32 `json:"id"`
Sequence int32 `json:"seq"`
}

type TSpan struct {
AppServerType int32 `json:"FT"`
AppServerTypeV2 int32 `json:":FT"`
Expand All @@ -91,8 +108,8 @@ type TSpan struct {
AppIdV2 string `json:":appid"`
AppName string `json:"appname"`
AppNameV2 string `json:":appname"`
Calls []*TSpanEvent `json:"calls"`
Clues []string `json:"clues,omitempty"`
Follows []*TSpanEvent `json:"event"`
Annotations []string `json:"anno,omitempty"`
SpanName string `json:"name"`
SpanId int64 `json:"sid,string"`
ServerType int32 `json:"stp,string"`
Expand All @@ -108,6 +125,7 @@ type TSpan struct {
ErrorMarked int32 `json:"EA,omitempty"`
NginxHeader string `json:"NP,omitempty"`
ApacheHeader string `json:"AP,omitempty"`
LocalAsyncId *TAsyncId `json:"asy,omitempty"`
}

func (span *TSpan) IsFailed() bool {
Expand All @@ -117,7 +135,7 @@ func (span *TSpan) IsFailed() bool {
return false
}

//note
// note
// FindHistogramLevel must come with histogramSize
func (span *TSpan) FindHistogramLevel() int {
if span.GetElapsedTime() <= 100 {
Expand Down Expand Up @@ -180,12 +198,11 @@ func (span *TSpan) GetAppName() string {
}

func (manager *AgentRouter) Clean() {
config := common.GetConfig()
ctime := time.Now().Unix()
manager.rwMutex.RLock()
for id, agent := range manager.AgentMap {
if agent.GetLastBusyTime()+int64(config.AgentFreeOnlineSurvivalTimeSec) < ctime {
log.Warnf("agent:%s expired after:%d sec. busyTime:%d", agent, config.AgentFreeOnlineSurvivalTimeSec, agent.GetLastBusyTime())
if agent.GetLastBusyTime()+int64(manager.Config.AgentReTryTimeout) < ctime {
manager.Log.Warnf("agent:%s expired after:%d sec. busyTime:%d", agent, manager.Config.AgentReTryTimeout, agent.GetLastBusyTime())
manager.rwMutex.RUnlock()
manager.rwMutex.Lock()
delete(manager.AgentMap, id)
Expand All @@ -198,13 +215,13 @@ func (manager *AgentRouter) Clean() {
manager.rwMutex.RUnlock()
}

func GetAgentInfo(span *TSpan) (appid, name string, appServerType int32, startTime string, err error) {
func (manager *AgentRouter) GetAgentInfo(span *TSpan) (appid, name string, appServerType int32, startTime string, err error) {

// new feat: get current startTime
startTime = strconv.FormatInt(common.GetConfig().StartTime, 10) + "000"
startTime = strconv.FormatInt(manager.Config.StartTime, 10) + "000"
holder := strings.Split(span.TransactionId, "^")
if len(holder) < 3 {
log.Warn("tid in wrong format")
manager.Log.Warn("tid in wrong format")
} else if len(holder[1]) <= 10 { // seconds format
startTime = holder[1] + "000"
} else { // milliseconds format
Expand Down Expand Up @@ -240,41 +257,42 @@ func (manager *AgentRouter) DispatchPacket(packet *RawPacket) error {
}

if err := json.Unmarshal(packet.RawData, span); err != nil {
log.Warnf("json.Unmarshal err:%v", err)
manager.Log.Warnf("json.Unmarshal err:%v", err)
goto PACKET_INVALID
}

if appid, appName, serverType, startTime, err := GetAgentInfo(span); err == nil {
if appid, appName, serverType, startTime, err := manager.GetAgentInfo(span); err == nil {
manager.rwMutex.RLock()
log.Debug("Read-lock is holding")
manager.Log.Debug("Read-lock is holding")
agent, OK := manager.AgentMap[appid]
if !OK {
// create a new agent
manager.rwMutex.RUnlock()
log.Infof("agent:%s not find, create a new agent.", appid)
log.Debug("Try to get write-lock")
manager.Log.Infof("agent:%s not find, create a new agent.", appid)
manager.Log.Debug("Try to get write-lock")
manager.rwMutex.Lock()
log.Debug("Write-lock is holding")
manager.Log.Debug("Write-lock is holding")
if _t, OK := manager.AgentMap[appid]; OK {
agent = _t
} else {
agent = createGrpcAgent(appid, appName, serverType, manager.PingId, startTime)
agent = CreateGrpcAgent(appid, appName, serverType, manager.PingId, startTime, manager.Config)
agent.StartServe()
manager.PingId += 1
}
manager.AgentMap[appid] = agent
manager.rwMutex.Unlock()
log.Debug("Write-lock is release")
manager.Log.Debug("Write-lock is release")
} else {
manager.rwMutex.RUnlock()
log.Debug("Read-lock is release")
manager.Log.Debug("Read-lock is release")
}

agent.CheckValid(span)
agent.SendSpan(span)
return nil

} else {
log.Warn(err)
manager.Log.Warn(err)
return err
}

Expand Down
47 changes: 22 additions & 25 deletions collector-agent/agent/AgentRouter_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
package agent

import (
"context"
"encoding/json"
"testing"

"github.com/pinpoint-apm/pinpoint-c-agent/collector-agent/common"
"google.golang.org/grpc/metadata"
)

func TestGetAgentInfo(t *testing.T) {
// spanMap := map[string]interface{}{
// "appid": "sfdaefe",
// "appname": "sfdaefe",
// "FT": float64(23412),
// "tid": "234123424^41234^2333",
// }

spanMap := &TSpan{
AppId: "sfdaefe",
AppName: "sfdaefe",
Expand All @@ -23,7 +18,9 @@ func TestGetAgentInfo(t *testing.T) {
TransactionId: "234123424^41234^2333",
}

id, name, ft, startTime, err := GetAgentInfo(spanMap)
router := CreateAgentRouter(common.CreateTestConfig())

id, name, ft, startTime, err := router.GetAgentInfo(spanMap)

if id != "sfdaefe" && name != "sfdaefe" && ft != 23412 && startTime != "234123424" && err != nil {
t.Error(spanMap)
Expand All @@ -48,7 +45,8 @@ func Test_EASpan(t *testing.T) {
"test": "2",
"test2": "string",
})
ea := createErrorAnalysisFilter(md)
config := common.CreateTestConfig()
ea := createErrorAnalysisFilter(context.Background(), md, config, config.LogEntry)

meta := ea.scanTSpanTree(&tSpan)
if len(meta.Exceptions) == 0 {
Expand All @@ -57,53 +55,52 @@ func Test_EASpan(t *testing.T) {
}

func TestTspan(t *testing.T) {
msg := `{"E":1,"FT":1500,":FT":1500,"ptype":"1500","pname":"abc_d","psid":"23563","NP":"t=1617083759.535 D=0.000","S":1617083759798,"appid":"app-2",":appid":"app-2",
":appname":"APP-2","appname":"APP-2","calls":[{"E":1,"calls":[{"E":1,"S":0,"clues":["-1:input parameters","14:return value"],"name":"abc"}],"S":0,"clues":["-1:input parameters","14:return value"],"name":"app\\AppDate::abc","SQL":"select* from abc"}],"client":"10.34.135.145","clues":["46:200"],"name":"PHP Request: fpm-fcgi","server":"10.34.130.152:8000","sid":"726125302","stp":"1500","tid":"app-2^1617083747^5506","uri":"/index.php?type=get_date","Ah":"123.35.36.3/host","EXP":"exp","ERR":{"msg":"error_msg","file":"file.cc","line":123}}`
msg := `{":E":1,":FT":1500,":S":1728466073494,"appid":"cd.dev.test.run","appname":"cd.dev.test.php","client":"localhost","event":[{":E":0,":S":1,":depth":1,":seq":0,"name":"SimplePHP\\MessageHandler::handle_message_in_kafka","stp":"1501"}],"name":"RdKafka\\KafkaConsumer::consume","server":"localhost","sid":"2057154795","stp":"1500","tid":"cd.dev.test.run^1728466064272^0","uri":"abc","EXP":"xxxxx","ERR":{"msg":"asbc","file":"files","line":233}} `
var tspan TSpan

err := json.Unmarshal([]byte(msg), &tspan)
if err != nil {
t.Error(err)
}

if tspan.GetStartTime() != 1617083759798 {
if tspan.GetStartTime() != 1728466073494 {
t.Error(tspan.GetStartTime())
}

if tspan.GetAppName() != "APP-2" {
if tspan.GetAppName() != "cd.dev.test.php" {
t.Error(tspan.GetAppName())

}

if tspan.SpanId != 726125302 {
if tspan.SpanId != 2057154795 {
t.Error(tspan.SpanId)
}

if tspan.ServerType != 1500 {
t.Error(tspan.ServerType)
}

if tspan.ExceptionInfo != "exp" {
if tspan.ExceptionInfo != "xxxxx" {
t.Error(tspan.ExceptionInfo)
}

if tspan.ErrorInfo == nil {
t.Error("no error info")
}

if len(tspan.Calls) == 0 {
if len(tspan.Follows) == 0 {
t.Error("no calls")
}

for _, ev := range tspan.Calls {
evCalls := ev.Calls
if len(evCalls) == 0 {
t.Error("no calls")
}
if evCalls[0].Name == "app\\AppDate::abc" {
t.Error("calls no name")
}
// for _, ev := range tspan.Calls {
// evCalls := ev.Calls
// if len(evCalls) == 0 {
// t.Error("no calls")
// }
// if evCalls[0].Name == "app\\AppDate::abc" {
// t.Error("calls no name")
// }

}
// }

}
Loading

0 comments on commit bc1486f

Please sign in to comment.