From 798489e021b26d942f4f4e42776fa62318b1a7ba Mon Sep 17 00:00:00 2001 From: Mehmet TOSUN <93265833+middt@users.noreply.github.com> Date: Wed, 13 Nov 2024 14:27:25 +0300 Subject: [PATCH] Added clickhouse state store Signed-off-by: Mehmet TOSUN <93265833+middt@users.noreply.github.com> --- go.mod | 24 ++- go.sum | 35 ++++ state/clickhouse/clickhouse.go | 311 ++++++++++++++++++++++++++++ state/clickhouse/clickhouse_test.go | 246 ++++++++++++++++++++++ 4 files changed, 607 insertions(+), 9 deletions(-) create mode 100644 state/clickhouse/clickhouse.go create mode 100644 state/clickhouse/clickhouse_test.go diff --git a/go.mod b/go.mod index 13a6af3ab6..c1b19da5b4 100644 --- a/go.mod +++ b/go.mod @@ -118,13 +118,13 @@ require ( github.com/xdg-go/scram v1.1.2 go.etcd.io/etcd/client/v3 v3.5.9 go.mongodb.org/mongo-driver v1.14.0 - go.uber.org/goleak v1.2.1 + go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/ratelimit v0.3.0 - golang.org/x/crypto v0.26.0 + golang.org/x/crypto v0.28.0 golang.org/x/exp v0.0.0-20240119083558-1b970713d09a golang.org/x/mod v0.17.0 - golang.org/x/net v0.28.0 + golang.org/x/net v0.30.0 golang.org/x/oauth2 v0.20.0 google.golang.org/api v0.180.0 google.golang.org/grpc v1.64.0 @@ -155,6 +155,8 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/internal v1.8.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.0.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect + github.com/ClickHouse/ch-go v0.61.5 // indirect + github.com/ClickHouse/clickhouse-go/v2 v2.30.0 // indirect github.com/Code-Hex/go-generics-cache v1.3.1 // indirect github.com/DataDog/zstd v1.5.2 // indirect github.com/OneOfOne/xxhash v1.2.8 // indirect @@ -171,7 +173,7 @@ require ( github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect github.com/aliyun/credentials-go v1.1.2 // indirect github.com/aliyunmq/mq-http-go-sdk v1.0.3 // indirect - github.com/andybalholm/brotli v1.0.5 // indirect + github.com/andybalholm/brotli v1.1.1 // indirect github.com/apache/dubbo-getty v1.4.9-0.20220610060150-8af010f3f3dc // indirect github.com/apache/rocketmq-client-go v1.2.5 // indirect github.com/ardielle/ardielle-go v1.5.2 // indirect @@ -228,6 +230,8 @@ require ( github.com/gage-technologies/mistral-go v1.0.0 // indirect github.com/gavv/httpexpect v2.0.0+incompatible // indirect github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 // indirect + github.com/go-faster/city v1.0.1 // indirect + github.com/go-faster/errors v0.7.1 // indirect github.com/go-ini/ini v1.67.0 // indirect github.com/go-kit/kit v0.10.0 // indirect github.com/go-kit/log v0.2.1 // indirect @@ -331,6 +335,7 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/panjf2000/ants/v2 v2.8.1 // indirect + github.com/paulmach/orb v0.11.1 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pierrec/lz4 v2.6.0+incompatible // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect @@ -354,6 +359,7 @@ require ( github.com/sergi/go-diff v1.2.0 // indirect github.com/shirou/gopsutil/v3 v3.23.12 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/shopspring/decimal v1.4.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/sony/gobreaker v0.5.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect @@ -390,13 +396,13 @@ require ( go.opentelemetry.io/otel/metric v1.26.0 // indirect go.opentelemetry.io/otel/sdk v1.26.0 // indirect go.opentelemetry.io/otel/trace v1.26.0 // indirect - go.uber.org/atomic v1.10.0 // indirect - go.uber.org/zap v1.24.0 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/arch v0.10.0 // indirect golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.23.0 // indirect - golang.org/x/term v0.23.0 // indirect - golang.org/x/text v0.17.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/term v0.25.0 // indirect + golang.org/x/text v0.19.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect diff --git a/go.sum b/go.sum index 54c28416d8..3b936d0f56 100644 --- a/go.sum +++ b/go.sum @@ -113,6 +113,12 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/ClickHouse/ch-go v0.61.5 h1:zwR8QbYI0tsMiEcze/uIMK+Tz1D3XZXLdNrlaOpeEI4= +github.com/ClickHouse/ch-go v0.61.5/go.mod h1:s1LJW/F/LcFs5HJnuogFMta50kKDO0lf9zzfrbl0RQg= +github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0= +github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= +github.com/ClickHouse/clickhouse-go/v2 v2.30.0 h1:AG4D/hW39qa58+JHQIFOSnxyL46H6h2lrmGGk17dhFo= +github.com/ClickHouse/clickhouse-go/v2 v2.30.0/go.mod h1:i9ZQAojcayW3RsdCb3YR+n+wC2h65eJsZCscZ1Z1wyo= github.com/Code-Hex/go-generics-cache v1.3.1 h1:i8rLwyhoyhaerr7JpjtYjJZUcCbWOdiYO3fZXLiEC4g= github.com/Code-Hex/go-generics-cache v1.3.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= @@ -206,6 +212,8 @@ github.com/aliyunmq/mq-http-go-sdk v1.0.3/go.mod h1:JYfRMQoPexERvnNNBcal0ZQ2TVQ5 github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/dubbo-getty v1.4.9-0.20220610060150-8af010f3f3dc h1:NZRon3MDqT4vddR3UIRBnwbbhEerghAimCSBsiESs3g= github.com/apache/dubbo-getty v1.4.9-0.20220610060150-8af010f3f3dc/go.mod h1:cPJlbcHUTNTpiboMQjMHhE9XBni11LiBiG8FdrDuVzk= @@ -594,6 +602,10 @@ github.com/go-asn1-ber/asn1-ber v1.3.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkPro github.com/go-chi/chi/v5 v5.0.7/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-co-op/gocron v1.9.0/go.mod h1:DbJm9kdgr1sEvWpHCA7dFFs/PGHPMil9/97EXCRPr4k= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= +github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= +github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= +github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= +github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo= github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g= github.com/go-fonts/latin-modern v0.2.0/go.mod h1:rQVLdDMK+mK1xscDwsqM5J8U2jrRa3T0ecnM9pNujks= github.com/go-fonts/liberation v0.1.1/go.mod h1:K6qoJYypsmfVjWg8KOVDQhLc8UDgIK2HYqyqAO9z7GY= @@ -1316,6 +1328,9 @@ github.com/pashagolub/pgxmock/v2 v2.12.0 h1:IVRmQtVFNCoq7NOZ+PdfvB6fwnLJmEuWDhnc github.com/pashagolub/pgxmock/v2 v2.12.0/go.mod h1:D3YslkN/nJ4+umVqWmbwfSXugJIjPMChkGBG47OJpNw= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU= +github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= +github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= @@ -1466,6 +1481,8 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= +github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sijms/go-ora/v2 v2.7.18 h1:xl9CUeBlFi261AOKekiiFnfcp3ojHFEedLxIzsj909E= github.com/sijms/go-ora/v2 v2.7.18/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk= @@ -1571,6 +1588,7 @@ github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0 github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= @@ -1616,8 +1634,10 @@ github.com/vmware/vmware-go-kcl v1.5.1 h1:1rJLfAX4sDnCyatNoD/WJzVafkwST6u/cgY/Uf github.com/vmware/vmware-go-kcl v1.5.1/go.mod h1:kXJmQ6h0dRMRrp1uWU9XbIXvwelDpTxSPquvQUBdpbo= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= @@ -1628,6 +1648,7 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1: github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 h1:6fRhSjgLCkTD3JnJxvaJ4Sj+TYblw757bqYgZaOq5ZY= github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0/go.mod h1:/LWChgwKmvncFJFHJ7Gvn9wZArjbV5/FppcK2fKk/tI= github.com/yashtewari/glob-intersection v0.2.0 h1:8iuHdN88yYuCzCdjt0gDe+6bAhUwBeEWqThExu54RFg= @@ -1675,6 +1696,7 @@ go.etcd.io/etcd/client/v3 v3.5.9/go.mod h1:i/Eo5LrZ5IKqpbtpPDuaUnDOUv471oDg8cjQa go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0/go.mod h1:tV31atvwzcybuqejDoY3oaNRTtlD2l/Ot78Pc9w7DMY= go.etcd.io/etcd/raft/v3 v3.5.0-alpha.0/go.mod h1:FAwse6Zlm5v4tEWZaTjmNhe17Int4Oxbu7+2r0DiD3w= go.etcd.io/etcd/server/v3 v3.5.0-alpha.0/go.mod h1:tsKetYpt980ZTpzl/gb+UOJj9RkIyCb1u4wjzMg90BQ= +go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.mongodb.org/mongo-driver v1.12.0/go.mod h1:AZkxhPnFJUoH7kZlFkVKucV20K387miPfm7oimrSmK0= go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80= go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= @@ -1739,12 +1761,14 @@ go.uber.org/atomic v1.8.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= @@ -1765,6 +1789,7 @@ go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= goji.io v2.0.2+incompatible h1:uIssv/elbKRLznFUy3Xj4+2Mz/qKhek/9aZQDUMae7c= goji.io v2.0.2+incompatible/go.mod h1:sbqFwrtqZACxLBTQcdgVjFh54yGVCvwq8+w49MVMMIk= golang.org/x/arch v0.0.0-20201008161808-52c3e6f60cff/go.mod h1:flIaEI6LNU6xOCD5PaJvn9wGP0agmIOqjrtsKGRguv4= @@ -1798,6 +1823,8 @@ golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58 golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1932,6 +1959,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2078,6 +2107,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -2090,6 +2121,8 @@ golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo= golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= +golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= +golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2109,6 +2142,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/state/clickhouse/clickhouse.go b/state/clickhouse/clickhouse.go new file mode 100644 index 0000000000..4d51811e89 --- /dev/null +++ b/state/clickhouse/clickhouse.go @@ -0,0 +1,311 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clickhouse + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "reflect" + "time" + + _ "github.com/ClickHouse/clickhouse-go/v2" + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/state" + "github.com/dapr/components-contrib/state/utils" + "github.com/dapr/kit/logger" + "github.com/google/uuid" +) + +type StateStore struct { + db *sql.DB + logger logger.Logger + config clickhouseMetadata +} + +type clickhouseMetadata struct { + ClickHouseURL string + Database string + Table string +} + +func NewClickHouseStateStore(logger logger.Logger) state.Store { + return &StateStore{ + logger: logger, + } +} + +func (c *StateStore) Init(ctx context.Context, metadata state.Metadata) error { + config, err := parseAndValidateMetadata(metadata) + if err != nil { + return err + } + c.config = config + + db, err := sql.Open("clickhouse", c.config.ClickHouseURL) + if err != nil { + return fmt.Errorf("error opening connection: %v", err) + } + + if err := db.Ping(); err != nil { + return fmt.Errorf("error connecting to database: %v", err) + } + + // Create database if not exists + createDBQuery := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", c.config.Database) + if _, err := db.ExecContext(ctx, createDBQuery); err != nil { + return fmt.Errorf("error creating database: %v", err) + } + + // Create table if not exists with ReplacingMergeTree + createTableQuery := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s ( + key String, + value String, + etag String, + expire DateTime64(3) NULL, + PRIMARY KEY(key) + ) ENGINE = ReplacingMergeTree() + ORDER BY key + `, c.config.Database, c.config.Table) + + if _, err := db.ExecContext(ctx, createTableQuery); err != nil { + return fmt.Errorf("error creating table: %v", err) + } + + c.db = db + return nil +} + +func (c *StateStore) Features() []state.Feature { + return []state.Feature{ + state.FeatureETag, + state.FeatureTTL, + } +} + +func (c *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) { + if req.Key == "" { + return nil, errors.New("key is empty") + } + + query := fmt.Sprintf(` + SELECT value, etag, expire + FROM %s.%s FINAL -- Add FINAL to get the latest version + WHERE key = ? AND (expire IS NULL OR expire > now64()) + `, c.config.Database, c.config.Table) + + var value, etag string + var expire *time.Time + err := c.db.QueryRowContext(ctx, query, req.Key).Scan(&value, &etag, &expire) + if err == sql.ErrNoRows { + return &state.GetResponse{}, nil + } + if err != nil { + return nil, err + } + + var metadata map[string]string + if expire != nil { + metadata = map[string]string{ + state.GetRespMetaKeyTTLExpireTime: expire.UTC().Format(time.RFC3339), + } + } + + return &state.GetResponse{ + Data: []byte(value), + ETag: &etag, + Metadata: metadata, + }, nil +} + +func (c *StateStore) Set(ctx context.Context, req *state.SetRequest) error { + if req.Key == "" { + return errors.New("key is empty") + } + + ttlInSeconds := 0 + if req.Metadata != nil { + var err error + ttlInSeconds, err = parseTTL(req.Metadata) + if err != nil { + return err + } + } + + etag := uuid.New().String() + value, err := c.marshal(req.Value) + if err != nil { + return err + } + + var expireTime *time.Time + if ttlInSeconds > 0 { + t := time.Now().Add(time.Duration(ttlInSeconds) * time.Second) + expireTime = &t + } + + // ClickHouse uses ALTER TABLE ... UPDATE instead of ON DUPLICATE KEY + // First try to insert + insertQuery := fmt.Sprintf(` + INSERT INTO %s.%s (key, value, etag, expire) + VALUES (?, ?, ?, ?) + `, c.config.Database, c.config.Table) + + _, err = c.db.ExecContext(ctx, insertQuery, req.Key, value, etag, expireTime) + if err != nil { + // If the key exists, update it + updateQuery := fmt.Sprintf(` + ALTER TABLE %s.%s + UPDATE value = ?, etag = ?, expire = ? + WHERE key = ? + `, c.config.Database, c.config.Table) + + _, err = c.db.ExecContext(ctx, updateQuery, value, etag, expireTime, req.Key) + if err != nil { + return fmt.Errorf("error updating value: %v", err) + } + } + + return nil +} + +func (c *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error { + if req.Key == "" { + return errors.New("key is empty") + } + + query := fmt.Sprintf("DELETE FROM %s.%s WHERE key = ?", c.config.Database, c.config.Table) + _, err := c.db.ExecContext(ctx, query, req.Key) + return err +} + +func (c *StateStore) marshal(v any) (string, error) { + var value string + switch v := v.(type) { + case []byte: + value = string(v) + case string: + value = v + default: + bt, err := utils.Marshal(v, json.Marshal) + if err != nil { + return "", err + } + value = string(bt) + } + return value, nil +} + +func parseTTL(metadata map[string]string) (int, error) { + if metadata == nil { + return 0, nil + } + ttl, ok := metadata["ttlInSeconds"] + if !ok || ttl == "" { + return 0, nil + } + + ttlMetadata := map[string]string{ + "ttlInSeconds": ttl, + } + + ttlPtr, err := utils.ParseTTL(ttlMetadata) + if err != nil { + return 0, fmt.Errorf("error parsing TTL: %v", err) + } + + if ttlPtr == nil { + return 0, nil + } + + return *ttlPtr, nil +} + +func parseAndValidateMetadata(metadata state.Metadata) (clickhouseMetadata, error) { + config := clickhouseMetadata{} + + if val, ok := metadata.Properties["clickhouseURL"]; ok && val != "" { + config.ClickHouseURL = val + } else { + return config, errors.New("ClickHouse URL is missing") + } + + if val, ok := metadata.Properties["databaseName"]; ok && val != "" { + config.Database = val + } else { + return config, errors.New("ClickHouse database name is missing") + } + + if val, ok := metadata.Properties["tableName"]; ok && val != "" { + config.Table = val + } else { + return config, errors.New("ClickHouse table name is missing") + } + + return config, nil +} + +func (c *StateStore) GetComponentMetadata() (metadataInfo metadata.MetadataMap) { + metadataStruct := clickhouseMetadata{} + metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.StateStoreType) + return +} + +func (c *StateStore) BulkGet(ctx context.Context, reqs []state.GetRequest, opts state.BulkGetOpts) ([]state.BulkGetResponse, error) { + responses := make([]state.BulkGetResponse, len(reqs)) + for i, req := range reqs { + response, err := c.Get(ctx, &req) + if err != nil { + return nil, err + } + responses[i] = state.BulkGetResponse{ + Key: req.Key, + Data: response.Data, + ETag: response.ETag, + Metadata: response.Metadata, + Error: "", + } + } + return responses, nil +} + +func (c *StateStore) BulkSet(ctx context.Context, reqs []state.SetRequest, opts state.BulkStoreOpts) error { + for _, req := range reqs { + err := c.Set(ctx, &req) + if err != nil { + return err + } + } + return nil +} + +func (c *StateStore) BulkDelete(ctx context.Context, reqs []state.DeleteRequest, opts state.BulkStoreOpts) error { + for _, req := range reqs { + err := c.Delete(ctx, &req) + if err != nil { + return err + } + } + return nil +} + +func (c *StateStore) Close() error { + if c.db != nil { + return c.db.Close() + } + return nil +} diff --git a/state/clickhouse/clickhouse_test.go b/state/clickhouse/clickhouse_test.go new file mode 100644 index 0000000000..d9523d3272 --- /dev/null +++ b/state/clickhouse/clickhouse_test.go @@ -0,0 +1,246 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + + +package clickhouse + +import ( + "context" + "database/sql" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/state" + "github.com/dapr/kit/logger" +) + +const ( + testURL = "tcp://localhost:9000" + testDatabase = "dapr_test" + testTable = "state_test" + testUsername = "default" + testPassword = "" +) + +func TestClickHouseIntegration(t *testing.T) { + // Skip if not running integration tests + if testing.Short() { + t.Skip("Skipping integration test") + } + + store := NewClickHouseStateStore(logger.NewLogger("test")) + ctx := context.Background() + + // Initialize store with credentials + err := store.Init(ctx, state.Metadata{ + Base: metadata.Base{ + Properties: map[string]string{ + "clickhouseURL": testURL, + "databaseName": testDatabase, + "tableName": testTable, + "username": testUsername, + "password": testPassword, + }, + }, + }) + require.NoError(t, err) + + // Cleanup after tests + t.Cleanup(func() { + if s, ok := store.(*StateStore); ok { + // Drop test table + if s.db != nil { + _, _ = s.db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", testDatabase, testTable)) + // Drop test database + _, _ = s.db.ExecContext(ctx, fmt.Sprintf("DROP DATABASE IF EXISTS %s", testDatabase)) + } + // Close the connection + _ = store.Close() + } + }) + + t.Run("Test CRUD operations", func(t *testing.T) { + testKey := "test-key" + testValue := []byte("test-value") + + // Test Set + err := store.Set(ctx, &state.SetRequest{ + Key: testKey, + Value: testValue, + }) + require.NoError(t, err) + + // Test Get + response, err := store.Get(ctx, &state.GetRequest{ + Key: testKey, + }) + require.NoError(t, err) + assert.Equal(t, testValue, response.Data) + + // Test Delete + err = store.Delete(ctx, &state.DeleteRequest{ + Key: testKey, + }) + require.NoError(t, err) + + // Verify deletion + response, err = store.Get(ctx, &state.GetRequest{ + Key: testKey, + }) + require.NoError(t, err) + assert.Nil(t, response.Data) + }) + + t.Run("Test ETag support", func(t *testing.T) { + testKey := "etag-key" + testValue := []byte("etag-value") + + // Set initial value + err := store.Set(ctx, &state.SetRequest{ + Key: testKey, + Value: testValue, + }) + require.NoError(t, err) + + // Get value and ETag + response, err := store.Get(ctx, &state.GetRequest{ + Key: testKey, + }) + require.NoError(t, err) + assert.NotEmpty(t, response.ETag) + + // Update with correct ETag + err = store.Set(ctx, &state.SetRequest{ + Key: testKey, + Value: []byte("new-value"), + ETag: response.ETag, + }) + require.NoError(t, err) + + // Cleanup + err = store.Delete(ctx, &state.DeleteRequest{ + Key: testKey, + }) + require.NoError(t, err) + }) + + t.Run("Test empty key", func(t *testing.T) { + // Test Set with empty key + err := store.Set(ctx, &state.SetRequest{ + Key: "", + Value: []byte("test"), + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "key is empty") + + // Test Get with empty key + _, err = store.Get(ctx, &state.GetRequest{ + Key: "", + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "key is empty") + + // Test Delete with empty key + err = store.Delete(ctx, &state.DeleteRequest{ + Key: "", + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "key is empty") + }) + + t.Run("Test non-existent key", func(t *testing.T) { + response, err := store.Get(ctx, &state.GetRequest{ + Key: "non-existent-key", + }) + require.NoError(t, err) + assert.Nil(t, response.Data) + }) + + t.Run("Test Features", func(t *testing.T) { + features := store.Features() + assert.Contains(t, features, state.FeatureETag) + }) +} + +func TestParseAndValidateMetadata(t *testing.T) { + t.Run("With valid metadata", func(t *testing.T) { + properties := map[string]string{ + "clickhouseURL": "tcp://127.0.0.1:9000", + "databaseName": "default", + "tableName": "statestore", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + + metadata, err := parseAndValidateMetadata(m) + require.NoError(t, err) + assert.Equal(t, properties["clickhouseURL"], metadata.ClickHouseURL) + assert.Equal(t, properties["databaseName"], metadata.Database) + assert.Equal(t, properties["tableName"], metadata.Table) + }) + + t.Run("Missing clickhouseURL", func(t *testing.T) { + properties := map[string]string{ + "databaseName": "default", + "tableName": "statestore", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + + _, err := parseAndValidateMetadata(m) + require.Error(t, err) + assert.Equal(t, "ClickHouse URL is missing", err.Error()) + }) + + t.Run("Missing databaseName", func(t *testing.T) { + properties := map[string]string{ + "clickhouseURL": "tcp://127.0.0.1:9000", + "tableName": "statestore", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + + _, err := parseAndValidateMetadata(m) + require.Error(t, err) + assert.Equal(t, "ClickHouse database name is missing", err.Error()) + }) + + t.Run("Missing tableName", func(t *testing.T) { + properties := map[string]string{ + "clickhouseURL": "tcp://127.0.0.1:9000", + "databaseName": "default", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + + _, err := parseAndValidateMetadata(m) + require.Error(t, err) + assert.Equal(t, "ClickHouse table name is missing", err.Error()) + }) +} + +// Helper function to create a test database connection +func createTestConnection(t *testing.T) *sql.DB { + db, err := sql.Open("clickhouse", testURL) + require.NoError(t, err) + require.NoError(t, db.Ping()) + return db +}