diff --git a/go.mod b/go.mod index 5b24425..87b5c77 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,19 @@ go 1.22.4 require ( github.com/docker/docker v27.0.0+incompatible + github.com/mackerelio/go-osstat v0.2.5 + github.com/ncruces/go-sqlite3 v0.18.4 github.com/opencontainers/image-spec v1.1.0 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.14.0 + github.com/prometheus/common v0.42.0 github.com/stretchr/testify v1.9.0 ) require ( github.com/Microsoft/go-winio v0.4.14 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/containerd/log v0.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/distribution/reference v0.6.0 // indirect @@ -20,19 +26,27 @@ require ( github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/term v0.5.0 // indirect github.com/morikuni/aec v1.0.0 // indirect + github.com/ncruces/julianday v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/procfs v0.8.0 // indirect + github.com/tetratelabs/wazero v1.8.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0 // indirect go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.27.0 // indirect go.opentelemetry.io/otel/metric v1.27.0 // indirect go.opentelemetry.io/otel/sdk v1.27.0 // indirect go.opentelemetry.io/otel/trace v1.27.0 // indirect - golang.org/x/sys v0.20.0 // indirect + golang.org/x/sys v0.25.0 // indirect golang.org/x/time v0.5.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gotest.tools/v3 v3.5.1 // indirect ) diff --git a/go.sum b/go.sum index 1792c49..c65eb2d 100644 --- a/go.sum +++ b/go.sum @@ -2,10 +2,15 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU= github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= @@ -25,6 +30,12 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= @@ -32,21 +43,44 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= +github.com/mackerelio/go-osstat v0.2.5/go.mod h1:atxwWF+POUZcdtR1wnsUcQxTytoHG4uhl2AKKzrOajY= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/ncruces/go-sqlite3 v0.18.4 h1:Je8o3y33MDwPYY/Cacas8yCsuoUzpNY/AgoSlN2ekyE= +github.com/ncruces/go-sqlite3 v0.18.4/go.mod h1:4HLag13gq1k10s4dfGBhMfRVsssJRT9/5hYqVM9RUYo= +github.com/ncruces/julianday v1.0.0 h1:fH0OKwa7NWvniGQtxdJRxAgkBMolni2BjDHaWTxqt7M= +github.com/ncruces/julianday v1.0.0/go.mod h1:Dusn2KvZrrovOMJuOt0TNXL6tB7U2E8kvza5fFc9G7g= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.14.0 h1:nJdhIvne2eSX/XRAFV9PcvFFRbrjbcTUj0VP62TMhnw= +github.com/prometheus/client_golang v1.14.0/go.mod h1:8vpkKitgIVNcqrRBWh1C4TIUQgYNtG/XQE4E/Zae36Y= +github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= +github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= +github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= +github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= +github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= @@ -54,6 +88,8 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tetratelabs/wazero v1.8.0 h1:iEKu0d4c2Pd+QSRieYbnQC9yiFlMS9D+Jr0LsRmcF4g= +github.com/tetratelabs/wazero v1.8.0/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0 h1:9l89oX4ba9kHbBol3Xin3leYJ+252h0zszDtBwyKe2A= @@ -83,6 +119,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -91,12 +128,12 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -113,10 +150,13 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= diff --git a/internal/app/ptah-agent/agent_upgrade.go b/internal/app/ptah-agent/agent_upgrade.go index bbc8d98..cf4a94f 100644 --- a/internal/app/ptah-agent/agent_upgrade.go +++ b/internal/app/ptah-agent/agent_upgrade.go @@ -3,14 +3,15 @@ package ptah_agent import ( "context" "fmt" - "github.com/pkg/errors" - t "github.com/ptah-sh/ptah-agent/internal/pkg/ptah-client" "io" "math" "net/http" "os" "path" "time" + + "github.com/pkg/errors" + t "github.com/ptah-sh/ptah-agent/internal/pkg/ptah-client" ) func (e *taskExecutor) downloadAgentUpgrade(ctx context.Context, req *t.DownloadAgentUpgradeReq) (*t.DownloadAgentUpgradeRes, error) { @@ -87,7 +88,7 @@ func (e *taskExecutor) updateAgentSymlink(ctx context.Context, req *t.UpdateAgen return nil, err } - e.stop() + e.agent.Stop() return &t.UpdateAgentSymlinkRes{}, nil } diff --git a/internal/app/ptah-agent/metrics.go b/internal/app/ptah-agent/metrics.go new file mode 100644 index 0000000..c6254e8 --- /dev/null +++ b/internal/app/ptah-agent/metrics.go @@ -0,0 +1,125 @@ +package ptah_agent + +import ( + "bytes" + "context" + "fmt" + "log" + "slices" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" + + caddyClient "github.com/ptah-sh/ptah-agent/internal/pkg/caddy-client" +) + +type MetricsAgent struct { + client *SafeClient + caddyClient *caddyClient.Client + interval time.Duration + stopChan chan struct{} + wg sync.WaitGroup +} + +func NewMetricsAgent(client *SafeClient, caddyClient *caddyClient.Client, interval time.Duration) *MetricsAgent { + return &MetricsAgent{ + client: client, + caddyClient: caddyClient, + interval: interval, + stopChan: make(chan struct{}), + } +} + +func (m *MetricsAgent) ScrapeMetrics(ctx context.Context) ([]string, error) { + // TODO: adjust the timestamp value to stick to a resolution of 5 seconds (or whatever else server wants) + // something like time.Now().Truncate(5 * time.Second).UnixMilli() + timestamp := time.Now().UnixMilli() + + caddyMetrics, err := m.caddyClient.GetMetrics(ctx) + if err != nil { + log.Printf("failed to get caddy metrics: %v", err) + } + + err = scrapeSystemMetrics() + if err != nil { + log.Printf("failed to scrape system metrics: %v", err) + } + + sysMetrics, err := prometheus.DefaultGatherer.Gather() + if err != nil { + return nil, errors.Wrapf(err, "failed to gather metrics") + } + + buf := bytes.NewBuffer([]byte{}) + + for _, mf := range sysMetrics { + _, err = expfmt.MetricFamilyToText(buf, mf) + if err != nil { + return nil, errors.Wrapf(err, "failed to expfmt to temporary buffer") + } + } + + sysMetricsSlice := strings.Split(buf.String(), "\n") + allMetrics := slices.Concat(caddyMetrics, sysMetricsSlice) + + result := make([]string, 0, len(allMetrics)) + for _, s := range allMetrics { + if strings.Contains(s, "ptah_") { + if s[0] != '#' { + s = fmt.Sprintf("%s %d", s, timestamp) + } + + result = append(result, s) + } + } + + return result, nil +} + +func (m *MetricsAgent) Start(ctx context.Context) error { + metrics, err := m.ScrapeMetrics(ctx) + if err != nil { + log.Printf("failed to collect metrics: %v", err) + } + + err = m.client.SendMetrics(ctx, metrics) + if err != nil { + return err + } + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(m.interval): + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + metrics, err := m.ScrapeMetrics(ctx) + if err != nil { + log.Printf("failed to collect metrics: %v", err) + + continue + } + + err = m.client.SendMetrics(ctx, metrics) + if err != nil { + log.Printf("failed to send metrics: %v", err) + } + } + } + }() + + return nil +} + +func (m *MetricsAgent) Stop() { + close(m.stopChan) + m.wg.Wait() +} diff --git a/internal/app/ptah-agent/metrics_system.go b/internal/app/ptah-agent/metrics_system.go new file mode 100644 index 0000000..5be6f5a --- /dev/null +++ b/internal/app/ptah-agent/metrics_system.go @@ -0,0 +1,308 @@ +package ptah_agent + +import ( + "log" + "syscall" + + "github.com/mackerelio/go-osstat/cpu" + "github.com/mackerelio/go-osstat/loadavg" + "github.com/mackerelio/go-osstat/memory" + "github.com/mackerelio/go-osstat/network" + "github.com/mackerelio/go-osstat/uptime" + "github.com/prometheus/client_golang/prometheus" +) + +var metrics struct { + diskUsageTotal *prometheus.GaugeVec + diskUsageFree *prometheus.GaugeVec + diskUsageUsed *prometheus.GaugeVec + + diskIO *prometheus.GaugeVec + + cpuUser prometheus.Gauge + cpuSystem prometheus.Gauge + cpuIdle prometheus.Gauge + cpuNice prometheus.Gauge + cpuTotal prometheus.Gauge + + memoryUsed prometheus.Gauge + memoryFree prometheus.Gauge + memoryTotal prometheus.Gauge + + swapUsed prometheus.Gauge + swapFree prometheus.Gauge + swapTotal prometheus.Gauge + + loadAvg1 prometheus.Gauge + loadAvg5 prometheus.Gauge + loadAvg15 prometheus.Gauge + + networkTxBytes *prometheus.GaugeVec + networkRxBytes *prometheus.GaugeVec + + uptime prometheus.Gauge +} + +func init() { + namespace := "ptah" + subsystem := "node" + + metrics.diskUsageTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "disk_total_bytes", + Help: "Total disk usage in bytes", + }, []string{"path"}) + + metrics.diskUsageFree = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "disk_free_bytes", + Help: "Free disk usage in bytes", + }, []string{"path"}) + + metrics.diskUsageUsed = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "disk_used_bytes", + Help: "Used disk usage in bytes", + }, []string{"path"}) + + metrics.diskIO = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "disk_io_ops_count", + Help: "Number of disk operations completed", + }, []string{"device", "operation"}) + + metrics.cpuUser = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "cpu_user", + Help: "User CPU time in nanoseconds", + }) + + metrics.cpuSystem = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "cpu_system", + Help: "System CPU time in nanoseconds", + }) + + metrics.cpuIdle = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "cpu_idle", + Help: "Idle CPU time in nanoseconds", + }) + + metrics.cpuNice = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "cpu_nice", + Help: "Nice CPU time in nanoseconds", + }) + + metrics.cpuTotal = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "cpu_total", + Help: "Total CPU time in nanoseconds", + }) + + metrics.memoryUsed = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "memory_used_bytes", + Help: "Used memory in bytes", + }) + + metrics.memoryFree = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "memory_free_bytes", + Help: "Free memory in bytes", + }) + + metrics.memoryTotal = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "memory_total_bytes", + Help: "Total memory in bytes", + }) + + metrics.swapUsed = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "swap_used_bytes", + Help: "Used swap in bytes", + }) + + metrics.swapFree = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "swap_free_bytes", + Help: "Free swap in bytes", + }) + + metrics.swapTotal = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "swap_total_bytes", + Help: "Total swap in bytes", + }) + + metrics.loadAvg1 = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "load_avg_1m", + Help: "Load average over 1 minute", + }) + + metrics.loadAvg5 = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "load_avg_5m", + Help: "Load average over 5 minutes", + }) + + metrics.loadAvg15 = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "load_avg_15m", + Help: "Load average over 15 minutes", + }) + + metrics.networkTxBytes = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "network_tx_bytes", + Help: "Network transmit bytes", + }, []string{"interface"}) + + metrics.networkRxBytes = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "network_rx_bytes", + Help: "Network receive bytes", + }, []string{"interface"}) + + metrics.uptime = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "uptime_seconds", + Help: "System uptime in seconds", + }) + + prometheus.MustRegister(metrics.diskUsageTotal, metrics.diskUsageFree, metrics.diskUsageUsed, + metrics.diskIO, metrics.cpuUser, metrics.cpuSystem, + metrics.cpuIdle, metrics.cpuNice, metrics.cpuTotal, metrics.memoryUsed, metrics.memoryFree, + metrics.memoryTotal, metrics.swapUsed, metrics.swapFree, metrics.swapTotal, metrics.loadAvg1, metrics.loadAvg5, metrics.loadAvg15, + metrics.networkTxBytes, metrics.networkRxBytes, metrics.uptime) +} + +type DiskUsageRaw struct { + Path string + Total uint64 + Free uint64 + Used uint64 +} + +type DiskIOStatsRaw struct { + Name string + ReadsCompleted uint64 + WritesCompleted uint64 +} + +func getDiskUsage(path string) (*DiskUsageRaw, error) { + fs := syscall.Statfs_t{} + err := syscall.Statfs(path, &fs) + if err != nil { + return nil, err + } + total := fs.Blocks * uint64(fs.Bsize) + free := fs.Bfree * uint64(fs.Bsize) + used := total - free + return &DiskUsageRaw{ + Path: path, + Total: total, + Free: free, + Used: used, + }, nil +} + +func scrapeSystemMetrics() error { + diskUsage, err := getDiskUsage("/") + if err != nil { + log.Printf("failed to get disk usage: %v", err) + } + + metrics.diskUsageTotal.WithLabelValues(diskUsage.Path).Set(float64(diskUsage.Total)) + metrics.diskUsageFree.WithLabelValues(diskUsage.Path).Set(float64(diskUsage.Free)) + metrics.diskUsageUsed.WithLabelValues(diskUsage.Path).Set(float64(diskUsage.Used)) + + memStats, err := memory.Get() + if err != nil { + log.Printf("failed to get memory stats: %v", err) + } + + metrics.memoryUsed.Set(float64(memStats.Used)) + metrics.memoryFree.Set(float64(memStats.Free)) + metrics.memoryTotal.Set(float64(memStats.Total)) + metrics.swapUsed.Set(float64(memStats.SwapUsed)) + metrics.swapFree.Set(float64(memStats.SwapFree)) + metrics.swapTotal.Set(float64(memStats.SwapTotal)) + cpuStats, err := cpu.Get() + if err != nil { + log.Printf("failed to get cpu stats: %v", err) + } + + metrics.cpuUser.Set(float64(cpuStats.User)) + metrics.cpuSystem.Set(float64(cpuStats.System)) + metrics.cpuIdle.Set(float64(cpuStats.Idle)) + metrics.cpuNice.Set(float64(cpuStats.Nice)) + metrics.cpuTotal.Set(float64(cpuStats.Total)) + + diskIOStats, err := getDiskIOStats() + if err != nil { + log.Printf("failed to get disk io stats: %v", err) + } + + for _, stats := range diskIOStats { + metrics.diskIO.WithLabelValues(stats.Name, "reads").Set(float64(stats.ReadsCompleted)) + metrics.diskIO.WithLabelValues(stats.Name, "writes").Set(float64(stats.WritesCompleted)) + } + + loadAvgStats, err := loadavg.Get() + if err != nil { + log.Printf("failed to get load avg stats: %v", err) + } + + metrics.loadAvg1.Set(loadAvgStats.Loadavg1) + metrics.loadAvg5.Set(loadAvgStats.Loadavg5) + metrics.loadAvg15.Set(loadAvgStats.Loadavg15) + + networkStats, err := network.Get() + if err != nil { + log.Printf("failed to get network stats: %v", err) + } + + for _, netStat := range networkStats { + if netStat.TxBytes > 0 { + metrics.networkTxBytes.WithLabelValues(netStat.Name).Set(float64(netStat.TxBytes)) + } + + if netStat.RxBytes > 0 { + metrics.networkRxBytes.WithLabelValues(netStat.Name).Set(float64(netStat.RxBytes)) + } + } + + uptime, err := uptime.Get() + if err != nil { + log.Printf("failed to get uptime: %v", err) + } + + metrics.uptime.Set(float64(uptime.Seconds())) + + return nil +} diff --git a/internal/app/ptah-agent/metrics_system_darwin.go b/internal/app/ptah-agent/metrics_system_darwin.go new file mode 100644 index 0000000..b92fa96 --- /dev/null +++ b/internal/app/ptah-agent/metrics_system_darwin.go @@ -0,0 +1,7 @@ +package ptah_agent + +func getDiskIOStats() ([]*DiskIOStatsRaw, error) { + result := make([]*DiskIOStatsRaw, 0) + + return result, nil +} diff --git a/internal/app/ptah-agent/metrics_system_linux.go b/internal/app/ptah-agent/metrics_system_linux.go new file mode 100644 index 0000000..39baf8f --- /dev/null +++ b/internal/app/ptah-agent/metrics_system_linux.go @@ -0,0 +1,22 @@ +package ptah_agent + +import ( + "github.com/mackerelio/go-osstat/disk" +) + +func getDiskIOStats() ([]*DiskIOStatsRaw, error) { + diskIOStats, err := disk.Get() + if err != nil { + return nil, err + } + + result := make([]*DiskIOStatsRaw, 0, len(diskIOStats)) + for _, stat := range diskIOStats { + result = append(result, &DiskIOStatsRaw{ + Name: stat.Name, + ReadsCompleted: stat.ReadsCompleted, + WritesCompleted: stat.WritesCompleted, + }) + } + return result, nil +} diff --git a/internal/app/ptah-agent/ptah_client.go b/internal/app/ptah-agent/ptah_client.go index a4117bc..81d490e 100644 --- a/internal/app/ptah-agent/ptah_client.go +++ b/internal/app/ptah-agent/ptah_client.go @@ -17,12 +17,15 @@ import ( ) type Agent struct { - Version string - ptah *ptahClient.Client - rootDir string - docker *dockerClient.Client - caddy *caddyClient.Client - executor *taskExecutor + Version string + ptah *ptahClient.Client + safeClient *SafeClient + rootDir string + docker *dockerClient.Client + caddy *caddyClient.Client + executor *taskExecutor + metricsAgent *MetricsAgent + cancel context.CancelFunc } func New(version string, baseUrl string, ptahToken string, rootDir string) (*Agent, error) { @@ -33,24 +36,36 @@ func New(version string, baseUrl string, ptahToken string, rootDir string) (*Age // Create a background context for API version negotiation ctx := context.Background() + docker.NegotiateAPIVersion(ctx) caddy := caddyClient.New("http://127.0.0.1:2019", http.DefaultClient) + ptah := ptahClient.New(baseUrl, ptahToken) + + safeClient, err := NewSafeClient(ptah, rootDir) + if err != nil { + return nil, err + } + + metricsAgent := NewMetricsAgent(safeClient, caddy, 5*time.Second) + // TODO: refactor to avoid duplication and circular dependency? agent := &Agent{ Version: version, - ptah: ptahClient.New(baseUrl, ptahToken), - rootDir: rootDir, - caddy: caddy, - docker: docker, + // TODO: replace ptah with safeClient + ptah: ptah, + safeClient: safeClient, + rootDir: rootDir, + caddy: caddy, + docker: docker, executor: &taskExecutor{ - docker: docker, - caddy: caddy, - rootDir: rootDir, - // TODO: use channel instead? - stopAgentFlag: false, + docker: docker, + caddy: caddy, + rootDir: rootDir, + safeClient: safeClient, }, + metricsAgent: metricsAgent, } agent.executor.agent = agent @@ -142,6 +157,16 @@ func (a *Agent) sendStartedEvent(ctx context.Context) (*ptahClient.StartedRes, e } func (a *Agent) Start(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + a.cancel = cancel + + defer a.safeClient.Close() + + a.metricsAgent.Start(ctx) + a.safeClient.StartBackgroundRequestsProcessing(ctx) + settings, err := a.sendStartedEvent(ctx) if err != nil { return err @@ -153,54 +178,71 @@ func (a *Agent) Start(ctx context.Context) error { maxConsecutiveFailures := 5 for { - taskID, task, err := a.getNextTask(ctx) - if err != nil { - log.Println("can't get the next task", err) - consecutiveFailures++ + select { + case <-ctx.Done(): + log.Println("received stop signal, shutting down gracefully") - if taskID == 0 { - if consecutiveFailures >= maxConsecutiveFailures { - return fmt.Errorf("shutting down due to %d consecutive failures to get next task", maxConsecutiveFailures) - } - } else { - if err = a.ptah.FailTask(ctx, taskID, &ptahClient.TaskError{ - Message: err.Error(), - }); err != nil { - log.Println("can't fail task", err) - } + return nil + case <-time.After(time.Duration(settings.Settings.PollInterval) * time.Second): + err = a.safeClient.PerformForegroundRequests(ctx) + if err != nil { + log.Println("can't perform calls", err) + consecutiveFailures++ } - } else { - consecutiveFailures = 0 - } - if task == nil { - time.Sleep(time.Duration(settings.Settings.PollInterval) * time.Second) + if consecutiveFailures >= maxConsecutiveFailures { + return fmt.Errorf("shutting down due to %d consecutive failures performing calls", maxConsecutiveFailures) + } - continue - } + if err != nil { + continue + } - result, err := a.executor.executeTask(ctx, task) - // TODO: store the result to re-send it once connection to the ptah server is restored - if err == nil { - if err = a.ptah.CompleteTask(ctx, taskID, result); err != nil { - log.Println("can't complete task", err) + taskID, task, err := a.getNextTask(ctx) + if err != nil { + log.Println("can't get the next task", err) + consecutiveFailures++ + + if taskID == 0 { + if consecutiveFailures >= maxConsecutiveFailures { + return fmt.Errorf("shutting down due to %d consecutive failures to get next task", maxConsecutiveFailures) + } + } else { + if err = a.safeClient.FailTask(ctx, taskID, &ptahClient.TaskError{ + Message: err.Error(), + }); err != nil { + log.Println("can't fail task", err) + } + } + } else { + consecutiveFailures = 0 } - } else { - if err = a.ptah.FailTask(ctx, taskID, &ptahClient.TaskError{ - Message: err.Error(), - }); err != nil { - log.Println("can't fail task", err) + + if task == nil { + continue } - } - if a.executor.stopAgentFlag { - log.Println("received stop signal, shutting down gracefully") + result, err := a.executor.executeTask(ctx, task) - break + if err == nil { + if err = a.safeClient.CompleteTask(ctx, taskID, result); err != nil { + log.Println("can't complete task", err) + } + } else { + if err = a.safeClient.FailTask(ctx, taskID, &ptahClient.TaskError{ + Message: err.Error(), + }); err != nil { + log.Println("can't fail task", err) + } + } } } +} - return nil +func (a *Agent) Stop() { + if a.cancel != nil { + a.cancel() + } } func (a *Agent) getNextTask(ctx context.Context) (taskId int, task interface{}, err error) { diff --git a/internal/app/ptah-agent/safe_client.go b/internal/app/ptah-agent/safe_client.go new file mode 100644 index 0000000..c17a921 --- /dev/null +++ b/internal/app/ptah-agent/safe_client.go @@ -0,0 +1,225 @@ +package ptah_agent + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "log" + "time" + + _ "github.com/ncruces/go-sqlite3/driver" + _ "github.com/ncruces/go-sqlite3/embed" + + ptahClient "github.com/ptah-sh/ptah-agent/internal/pkg/ptah-client" +) + +type SafeClient struct { + client *ptahClient.Client + db *sql.DB +} + +func NewSafeClient(client *ptahClient.Client, ptahRootDir string) (*SafeClient, error) { + db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s/ptah-agent.db", ptahRootDir)) + if err != nil { + return nil, err + } + + db.Exec("PRAGMA journal_mode=WAL;") + db.Exec("PRAGMA synchronous=normal;") + db.Exec("PRAGMA temp_store=memory;") + db.Exec("PRAGMA cache_size=1000000;") + + rows, err := db.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='requests';") + if err != nil { + return nil, err + } + defer rows.Close() + + if !rows.Next() { + _, err = db.Exec("CREATE TABLE requests (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, payload TEXT);") + if err != nil { + return nil, err + } + } + + return &SafeClient{client: client, db: db}, nil +} + +func (c *SafeClient) CompleteTask(ctx context.Context, taskID int, result interface{}) error { + jsonResult, err := json.Marshal(struct { + TaskID int + Result interface{} + }{ + TaskID: taskID, + Result: result, + }) + + if err != nil { + return err + } + + _, err = c.db.Exec("INSERT INTO requests (name, payload) VALUES (?, ?)", "CompleteTask", jsonResult) + if err != nil { + return err + } + + return nil +} + +func (c *SafeClient) FailTask(ctx context.Context, taskID int, taskError *ptahClient.TaskError) error { + jsonTaskError, err := json.Marshal(struct { + TaskID int + TaskError *ptahClient.TaskError + }{ + TaskID: taskID, + TaskError: taskError, + }) + if err != nil { + return err + } + + _, err = c.db.Exec("INSERT INTO requests (name, payload) VALUES (?, ?)", "FailTask", jsonTaskError) + if err != nil { + return err + } + + return nil +} + +func (c *SafeClient) SendMetrics(ctx context.Context, metrics []string) error { + jsonMetrics, err := json.Marshal(metrics) + if err != nil { + return err + } + + _, err = c.db.Exec("INSERT INTO requests (name, payload) VALUES (?, ?)", "SendMetrics", jsonMetrics) + if err != nil { + return err + } + + return nil +} + +// TODO: split into "sequential" and "parallel". One for tasks, another for metrics. Metrics should be sent always in parallel, in background. +func (c *SafeClient) PerformForegroundRequests(ctx context.Context) error { + rows, err := c.db.Query("SELECT id, name, payload FROM requests WHERE name in ('CompleteTask', 'FailTask');") + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var id int + var name string + var rowPayload []byte + + err = rows.Scan(&id, &name, &rowPayload) + if err != nil { + return err + } + + switch name { + case "CompleteTask": + var payload struct { + TaskID int + Result interface{} + } + + err = json.Unmarshal(rowPayload, &payload) + if err != nil { + return err + } + + // TODO: if the http status is 409 (Conflict) - it is ok, than the task result is already saved and we are executing the same task again due to some error/crash + err = c.client.CompleteTask(ctx, payload.TaskID, payload.Result) + if err != nil { + return err + } + case "FailTask": + var payload struct { + TaskID int + TaskError *ptahClient.TaskError + } + + err = json.Unmarshal(rowPayload, &payload) + if err != nil { + return err + } + + // TODO: if the http status is 409 (Conflict) - it is ok, than the task result is already saved and we are executing the same task again due to some error/crash + err = c.client.FailTask(ctx, payload.TaskID, payload.TaskError) + if err != nil { + return err + } + default: + return fmt.Errorf("unknown request name: %s", name) + } + + _, err = c.db.Exec("DELETE FROM requests WHERE id = ?", id) + if err != nil { + return err + } + } + + return nil +} + +func (c *SafeClient) PerformBackgroundRequests(ctx context.Context) error { + rows, err := c.db.Query("SELECT id, name, payload FROM requests WHERE name in ('SendMetrics');") + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var id int + var name string + var rowPayload []byte + + err = rows.Scan(&id, &name, &rowPayload) + if err != nil { + return err + } + + var metrics []string + err = json.Unmarshal(rowPayload, &metrics) + if err != nil { + return err + } + + err = c.client.SendMetrics(ctx, metrics) + if err != nil { + return err + } + + _, err = c.db.Exec("DELETE FROM requests WHERE id = ?", id) + if err != nil { + return err + } + } + + return nil +} + +func (c *SafeClient) StartBackgroundRequestsProcessing(ctx context.Context) error { + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + err := c.PerformBackgroundRequests(ctx) + if err != nil { + log.Println("error performing background requests:", err) + } + } + } + }() + + return nil +} + +func (c *SafeClient) Close() error { + return c.db.Close() +} diff --git a/internal/app/ptah-agent/swarm.go b/internal/app/ptah-agent/swarm.go index a0dc8e2..b389c28 100644 --- a/internal/app/ptah-agent/swarm.go +++ b/internal/app/ptah-agent/swarm.go @@ -23,6 +23,11 @@ func (e *taskExecutor) initSwarm(ctx context.Context, req *t.InitSwarmReq) (*t.I return nil, errors.Wrapf(err, "init swarm") } + _, err = e.agent.sendStartedEvent(ctx) + if err != nil { + log.Println("init swarm: failed to send started event:", err) + } + res.Docker.ID = swarmId return &res, nil diff --git a/internal/app/ptah-agent/task_executor.go b/internal/app/ptah-agent/task_executor.go index f5f1880..3799ee8 100644 --- a/internal/app/ptah-agent/task_executor.go +++ b/internal/app/ptah-agent/task_executor.go @@ -11,11 +11,11 @@ import ( ) type taskExecutor struct { - agent *Agent - docker *dockerClient.Client - caddy *caddyClient.Client - rootDir string - stopAgentFlag bool + safeClient *SafeClient + agent *Agent + docker *dockerClient.Client + caddy *caddyClient.Client + rootDir string } func (e *taskExecutor) executeTask(ctx context.Context, task interface{}) (interface{}, error) { @@ -68,7 +68,3 @@ func (e *taskExecutor) executeTask(ctx context.Context, task interface{}) (inter return nil, fmt.Errorf("execute task: unknown task type %T", task) } } - -func (e *taskExecutor) stop() { - e.stopAgentFlag = true -} diff --git a/internal/pkg/caddy-client/client.go b/internal/pkg/caddy-client/client.go index cdc6e1e..95e48be 100644 --- a/internal/pkg/caddy-client/client.go +++ b/internal/pkg/caddy-client/client.go @@ -5,9 +5,11 @@ import ( "context" "encoding/json" "fmt" - "github.com/pkg/errors" "io" "net/http" + "strings" + + "github.com/pkg/errors" ) type Client struct { @@ -22,6 +24,31 @@ func New(url string, http *http.Client) *Client { } } +func (c *Client) GetMetrics(ctx context.Context) ([]string, error) { + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/metrics", c.url), nil) + if err != nil { + return nil, errors.Wrapf(err, "caddy: failed to create metrics request") + } + + resp, err := c.http.Do(req) + if err != nil { + return nil, errors.Wrapf(err, "caddy: failed to get metrics") + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("caddy: failed to get metrics, status code: %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, errors.Wrapf(err, "caddy: failed to read metrics response body") + } + + return strings.Split(string(body), "\n"), nil +} + func (c *Client) PostConfig(ctx context.Context, config map[string]interface{}) error { if config == nil { return fmt.Errorf("caddy: config is nil") diff --git a/internal/pkg/networks/networks.go b/internal/pkg/networks/networks.go index 5616248..45e1a03 100644 --- a/internal/pkg/networks/networks.go +++ b/internal/pkg/networks/networks.go @@ -28,7 +28,8 @@ func List() ([]Network, error) { isDown := i.Flags&net.FlagUp == 0 isNotBroadcast := i.Flags&net.FlagBroadcast == 0 isDockerNetwork := strings.Contains(i.Name, "docker") - if isLoopback || isDown || isNotBroadcast || isDockerNetwork { + isBridgeNetwork := strings.Contains(i.Name, "bridge") || strings.Contains(i.Name, "br-") + if isLoopback || isDown || isNotBroadcast || isDockerNetwork || isBridgeNetwork { continue } diff --git a/internal/pkg/ptah-client/client.go b/internal/pkg/ptah-client/client.go index d9140ee..a8f4bb7 100644 --- a/internal/pkg/ptah-client/client.go +++ b/internal/pkg/ptah-client/client.go @@ -17,6 +17,30 @@ func (e *ServiceError) Error() string { return fmt.Sprintf("ptah error: %s", e.Message) } +type HttpConflictError struct { + Message string `json:"error"` +} + +func (e *HttpConflictError) Is(target error) bool { + _, ok := target.(*HttpConflictError) + + return ok +} + +func (e *HttpConflictError) Error() string { + return fmt.Sprintf("ptah error: %s", e.Message) +} + +func HttpConflictErrorFromJson(body []byte) error { + var serviceError HttpConflictError + err := json.Unmarshal(body, &serviceError) + if err != nil { + return nil + } + + return &serviceError +} + type Client struct { BaseUrl string ptahToken string @@ -79,7 +103,7 @@ func (c *Client) send(ctx context.Context, method, url string, req interface{}, } if response.StatusCode == http.StatusConflict { - return nil, fmt.Errorf("ptah error: %s", string(body)) + return nil, HttpConflictErrorFromJson(body) } var serviceError ServiceError diff --git a/internal/pkg/ptah-client/metrics.go b/internal/pkg/ptah-client/metrics.go new file mode 100644 index 0000000..75a2af5 --- /dev/null +++ b/internal/pkg/ptah-client/metrics.go @@ -0,0 +1,14 @@ +package ptah_client + +import ( + "context" + "strings" +) + +func (c *Client) SendMetrics(ctx context.Context, metrics []string) error { + body := strings.Join(metrics, "\n") + + _, err := c.send(ctx, "POST", "/metrics", body, nil) + + return err +} diff --git a/internal/pkg/ptah-client/tasks.go b/internal/pkg/ptah-client/tasks.go index e5fd695..0d0c37a 100644 --- a/internal/pkg/ptah-client/tasks.go +++ b/internal/pkg/ptah-client/tasks.go @@ -3,6 +3,8 @@ package ptah_client import ( "context" "fmt" + "log" + "github.com/pkg/errors" ) @@ -28,9 +30,15 @@ func (c *Client) GetNextTask(ctx context.Context) (*GetNextTaskRes, error) { return &result, nil } -func (c *Client) CompleteTask(ctx context.Context, taskID int, result interface{}) error { - _, err := c.send(ctx, "POST", fmt.Sprintf("/tasks/%d/complete", taskID), result, nil) +func (c *Client) CompleteTask(ctx context.Context, taskID int, taskResult interface{}) error { + _, err := c.send(ctx, "POST", fmt.Sprintf("/tasks/%d/complete", taskID), taskResult, nil) if err != nil { + if errors.Is(err, &HttpConflictError{}) { + log.Printf("task %d already completed", taskID) + + return nil + } + return errors.Wrapf(err, "POST /tasks/%d/complete failed", taskID) }