From d0c6c15d6708ecf82d2e33d0780857e84a2c270a Mon Sep 17 00:00:00 2001 From: Sam Date: Fri, 27 Oct 2023 11:58:47 +1000 Subject: [PATCH] Implement projection sub system Co-authored-by: Kevin Millar Co-authored-by: Troy Parker Co-authored-by: Ben Robey Co-authored-by: Danil Petrov Co-authored-by: Steve Stenzel --- go.sum | 72 ++++ .../internal/registrypb/registration.pb.go | 4 +- .../registrypb/registration_primo.pb.go | 2 +- .../internal/journalpb/record.pb.go | 4 +- .../internal/journalpb/record_primo.pb.go | 2 +- .../internal/journalpb/record.pb.go | 4 +- .../internal/journalpb/record_primo.pb.go | 2 +- internal/projection/consume_test.go | 355 ++++++++++++++++++ internal/projection/doc.go | 2 + internal/projection/eventconsumer.go | 17 + internal/projection/eventconsumer_test.go | 19 + internal/projection/packer_test.go | 36 ++ internal/projection/scope.go | 22 ++ internal/projection/supervisor.go | 49 +++ internal/projection/worker.go | 149 ++++++++ 15 files changed, 730 insertions(+), 9 deletions(-) create mode 100644 internal/projection/consume_test.go create mode 100644 internal/projection/doc.go create mode 100644 internal/projection/eventconsumer.go create mode 100644 internal/projection/eventconsumer_test.go create mode 100644 internal/projection/packer_test.go create mode 100644 internal/projection/scope.go create mode 100644 internal/projection/supervisor.go create mode 100644 internal/projection/worker.go diff --git a/go.sum b/go.sum index 24009b6b..0109b4bb 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,47 @@ +cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk= +cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= +cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= +github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w= +github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo= +github.com/aws/aws-sdk-go-v2/config v1.27.4 h1:AhfWb5ZwimdsYTgP7Od8E9L1u4sKmDW2ZVeLcf2O42M= +github.com/aws/aws-sdk-go-v2/config v1.27.4/go.mod h1:zq2FFXK3A416kiukwpsd+rD4ny6JC7QSkp4QdN1Mp2g= +github.com/aws/aws-sdk-go-v2/credentials v1.17.4 h1:h5Vztbd8qLppiPwX+y0Q6WiwMZgpd9keKe2EAENgAuI= +github.com/aws/aws-sdk-go-v2/credentials v1.17.4/go.mod h1:+30tpwrkOgvkJL1rUZuRLoxcJwtI/OkeBLYnHxJtVe0= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 h1:AK0J8iYBFeUk2Ax7O8YpLtFsfhdOByh2QIkHmigpRYk= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2/go.mod h1:iRlGzMix0SExQEviAyptRWRGdYNo3+ufW/lCzvKVTUc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.1 h1:haLXE5R07oaq/UnvSyE43V4jp9gA2XRMYcxkFYHEpdU= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.1/go.mod h1:mM51J0CILKQjqIawPDM4g6E1nyxdlvk/qaCDyJkx0II= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 h1:EyBZibRTVAs6ECHZOw5/wlylS9OcTzwyjeQMudmREjE= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1/go.mod h1:JKpmtYhhPs7D97NL/ltqz7yCkERFW5dOlHyVl66ZYF8= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.2 h1:3tS2g6P3N+Wz64e9aNx7X4BCWN/gT9MUvIuv5l2eoho= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.2/go.mod h1:1Pf5vPqk8t9pdYB3dmUMRE/0m8u0IHHg8ESSiutJd0I= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.2 h1:5ffmXjPtwRExp1zc7gENLgCPyHFbhEPwVTkTiH9niSk= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.2/go.mod h1:Ru7vg1iQ7cR4i7SZ/JTLYN9kaXtbL69UdgG0OQWQxW0= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 h1:utEGkfdQ4L6YW/ietH7111ZYglLJvS+sLriHJ1NBJEQ= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.1/go.mod h1:RsYqzYr2F2oPDdpy+PdhephuZxTfjHQe7SOBcZGoAU8= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 h1:9/GylMS45hGGFCcMrUZDVayQE1jYSIN6da9jo7RAYIw= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1/go.mod h1:YjAPFn4kGFqKC54VsHs5fn5B6d+PCY2tziEa3U/GB5Y= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.1 h1:3I2cBEYgKhrWlwyZgfpSO2BpaMY1LHPqXYk/QGlu2ew= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.1/go.mod h1:uQ7YYKZt3adCRrdCBREm1CD3efFLOUNH77MrUCvx5oA= +github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw= +github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= +github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe h1:QQ3GSy+MqSHxm/d8nCtnAiZdYFd45cYZPs8vOOIYKfk= +github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= +github.com/dave/jennifer v1.7.0 h1:uRbSBH9UTS64yXbh4FrMHfgfY762RD+C7bUPKODpSJE= +github.com/dave/jennifer v1.7.0/go.mod h1:nXbxhEmQfOZhWml3D1cDK5M1FLnMSozpbFN/m3RmGZc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dogmatiq/configkit v0.12.2 h1:3CgioafFI57yxreHouRjDmRP8eXKlQOQPIHH1N0VwKE= @@ -38,6 +80,12 @@ github.com/dogmatiq/sqltest v0.3.0 h1:DCwyLWfVk/ZHsqq5Itq3H/Lqsh/CIQ6nIRwI4YLywF github.com/dogmatiq/sqltest v0.3.0/go.mod h1:a8Da8NhU4m3lq5Sybhiv+ZQowSnGHWTIJHFNInVtffg= github.com/dogmatiq/testkit v0.13.11 h1:ikXg/Cxq58tzHL27JKCkVqUUElJCHcso7N/ymd3Wins= github.com/dogmatiq/testkit v0.13.11/go.mod h1:GDAEnCkfb8Chmbe+Dc4DfRaaCCV7eqOza61shb1hlE0= +github.com/emicklei/dot v1.6.0 h1:vUzuoVE8ipzS7QkES4UfxdpCwdU2U97m2Pb2tQCoYRY= +github.com/emicklei/dot v1.6.0/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s= +github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI= +github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -52,6 +100,8 @@ github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gG github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= +github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= @@ -72,6 +122,7 @@ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= @@ -91,6 +142,8 @@ github.com/jackc/pgx/v4 v4.10.0 h1:xXTl+lSiF1eFQ4U7vUL493n/1q8ZhSDP962rSKhgRZo= github.com/jackc/pgx/v4 v4.10.0/go.mod h1:QlrWebbs3kqEZPHCTGyxecvzG6tvIsYu+A5b1raylkA= github.com/jmalloc/gomegax v0.0.0-20200507221434-64fca4c0e03a h1:Gk7Gkwl1KUJII/FiAjvBjRgEz/lpvTV8kNYp+9jdpuk= github.com/jmalloc/gomegax v0.0.0-20200507221434-64fca4c0e03a/go.mod h1:TZpc8ObQEKqTuy1/VXpPRfcMU80QFDU4zK3nchXts/k= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -118,6 +171,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ= github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8= github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -129,6 +184,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ= +go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= @@ -143,12 +200,16 @@ golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/exp v0.0.0-20230810033253-352e893a4cad h1:g0bG7Z4uG+OgH2QDODnjp6ggkk1bJDsINcuWmJN1iJU= golang.org/x/exp v0.0.0-20230810033253-352e893a4cad/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/oauth2 v0.16.0 h1:aDkGMBSYxElaoP81NpoUoz2oo2R2wHdZpGToUxfyQrQ= +golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= @@ -163,6 +224,8 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= +golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -174,6 +237,12 @@ golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= +google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 h1:KAeGQVN3M9nD0/bQXnr/ClcEMJ968gUXJQ9pwfSynuQ= +google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80/go.mod h1:cc8bqMqtv9gMOr0zHg2Vzff5ULhhL2IXP4sbcn32Dro= +google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 h1:Lj5rbfG876hIAYFjqiJnPHfhXbv+nzTWfm04Fg/XSVU= +google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA= google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM= google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= @@ -191,12 +260,15 @@ google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw= diff --git a/internal/cluster/internal/registrypb/registration.pb.go b/internal/cluster/internal/registrypb/registration.pb.go index 0609e971..a08e11ca 100644 --- a/internal/cluster/internal/registrypb/registration.pb.go +++ b/internal/cluster/internal/registrypb/registration.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.24.4 +// protoc-gen-go v1.33.0 +// protoc v4.25.3 // source: github.com/dogmatiq/veracity/internal/cluster/internal/registrypb/registration.proto package registrypb diff --git a/internal/cluster/internal/registrypb/registration_primo.pb.go b/internal/cluster/internal/registrypb/registration_primo.pb.go index 14283357..7f62e070 100644 --- a/internal/cluster/internal/registrypb/registration_primo.pb.go +++ b/internal/cluster/internal/registrypb/registration_primo.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-primo. DO NOT EDIT. // versions: // protoc-gen-go-primo v -// protoc v4.24.4 +// protoc v4.25.3 // source: github.com/dogmatiq/veracity/internal/cluster/internal/registrypb/registration.proto package registrypb diff --git a/internal/eventstream/internal/journalpb/record.pb.go b/internal/eventstream/internal/journalpb/record.pb.go index a847e870..9a9d9f3a 100644 --- a/internal/eventstream/internal/journalpb/record.pb.go +++ b/internal/eventstream/internal/journalpb/record.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.24.4 +// protoc-gen-go v1.33.0 +// protoc v4.25.3 // source: github.com/dogmatiq/veracity/internal/eventstream/internal/journalpb/record.proto package journalpb diff --git a/internal/eventstream/internal/journalpb/record_primo.pb.go b/internal/eventstream/internal/journalpb/record_primo.pb.go index f78b6c3c..aca9fadf 100644 --- a/internal/eventstream/internal/journalpb/record_primo.pb.go +++ b/internal/eventstream/internal/journalpb/record_primo.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-primo. DO NOT EDIT. // versions: // protoc-gen-go-primo v -// protoc v4.24.4 +// protoc v4.25.3 // source: github.com/dogmatiq/veracity/internal/eventstream/internal/journalpb/record.proto package journalpb diff --git a/internal/integration/internal/journalpb/record.pb.go b/internal/integration/internal/journalpb/record.pb.go index 41d55dc9..da4209a7 100644 --- a/internal/integration/internal/journalpb/record.pb.go +++ b/internal/integration/internal/journalpb/record.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.24.4 +// protoc-gen-go v1.33.0 +// protoc v4.25.3 // source: github.com/dogmatiq/veracity/internal/integration/internal/journalpb/record.proto package journalpb diff --git a/internal/integration/internal/journalpb/record_primo.pb.go b/internal/integration/internal/journalpb/record_primo.pb.go index f278fa1c..8078dc67 100644 --- a/internal/integration/internal/journalpb/record_primo.pb.go +++ b/internal/integration/internal/journalpb/record_primo.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-primo. DO NOT EDIT. // versions: // protoc-gen-go-primo v -// protoc v4.24.4 +// protoc v4.25.3 // source: github.com/dogmatiq/veracity/internal/integration/internal/journalpb/record.proto package journalpb diff --git a/internal/projection/consume_test.go b/internal/projection/consume_test.go new file mode 100644 index 00000000..3f0201a3 --- /dev/null +++ b/internal/projection/consume_test.go @@ -0,0 +1,355 @@ +package projection_test + +import ( + "bytes" + "context" + "errors" + "sync" + "sync/atomic" + "testing" + + "github.com/dogmatiq/dogma" + . "github.com/dogmatiq/dogma/fixtures" + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/veracity/internal/envelope" + "github.com/dogmatiq/veracity/internal/eventstream" + . "github.com/dogmatiq/veracity/internal/projection" + "github.com/dogmatiq/veracity/internal/test" +) + +func TestConsume(t *testing.T) { + t.Parallel() + + type dependencies struct { + Packer *envelope.Packer + Handler *ProjectionMessageHandler + EventConsumer *eventConsumerStub + Supervisor *Supervisor + } + + setup := func(t test.TestingT) (deps dependencies) { + deps.Packer = newPacker() + deps.Handler = &ProjectionMessageHandler{} + deps.EventConsumer = &eventConsumerStub{} + + deps.Supervisor = &Supervisor{ + Handler: deps.Handler, + EventConsumer: deps.EventConsumer, + Packer: deps.Packer, + } + + return deps + } + + t.Run("it applies events exactly once, in order regardless of errors", func(t *testing.T) { + t.Parallel() + + cases := []struct { + Desc string + InduceFailure func(*dependencies) + }{ + { + Desc: "no faults", + InduceFailure: func(*dependencies) { + }, + }, + { + Desc: "failure before handling event at offset 0", + InduceFailure: func(deps *dependencies) { + var done atomic.Bool + handle := deps.Handler.HandleEventFunc + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + if e == MessageE1 && done.CompareAndSwap(false, true) { + return false, errors.New("") + } + + return handle(ctx, r, c, n, s, e) + } + }, + }, + { + Desc: "failure after handling event at offset 0", + InduceFailure: func(deps *dependencies) { + var done atomic.Bool + handle := deps.Handler.HandleEventFunc + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + ok, err := handle(ctx, r, c, n, s, e) + if !ok || err != nil { + return ok, err + } + if e == MessageE1 && done.CompareAndSwap(false, true) { + return false, errors.New("") + } + + return true, nil + } + }, + }, + { + Desc: "failure before handling event at offset 1", + InduceFailure: func(deps *dependencies) { + var done atomic.Bool + handle := deps.Handler.HandleEventFunc + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + if e == MessageE2 && done.CompareAndSwap(false, true) { + return false, errors.New("") + } + + return handle(ctx, r, c, n, s, e) + } + }, + }, + { + Desc: "failure after handling event at offset 1", + InduceFailure: func(deps *dependencies) { + var done atomic.Bool + handle := deps.Handler.HandleEventFunc + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + ok, err := handle(ctx, r, c, n, s, e) + if !ok || err != nil { + return ok, err + } + if e == MessageE2 && done.CompareAndSwap(false, true) { + return false, errors.New("") + } + + return true, nil + } + }, + }, + { + Desc: "occ failure at offset 0", + InduceFailure: func(deps *dependencies) { + var done atomic.Bool + resourceVersionFunc := deps.Handler.ResourceVersionFunc + + deps.Handler.ResourceVersionFunc = func(ctx context.Context, r []byte) ([]byte, error) { + if done.CompareAndSwap(false, true) { + return []byte{0, 0, 0, 0, 0, 0, 0, 1}, nil + } + + return resourceVersionFunc(ctx, r) + } + }, + }, + { + Desc: "occ failure at offset 1", + InduceFailure: func(deps *dependencies) { + var done atomic.Bool + handle := deps.Handler.HandleEventFunc + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + if e == MessageE2 && done.CompareAndSwap(false, true) { + return false, nil + } + + return handle(ctx, r, c, n, s, e) + } + }, + }, + } + + for _, c := range cases { + c := c + + t.Run(c.Desc, func(t *testing.T) { + tctx := test.WithContext(t) + + deps := setup(tctx) + + var ( + mu sync.Mutex + appliedResources = map[string][]byte{} + appliedEvents = make(chan dogma.Event, 100) + ) + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + mu.Lock() + defer mu.Unlock() + + v := appliedResources[string(r)] + if !bytes.Equal(v, c) { + t.Logf("[%T] resource %x occ conflict: %x != %x", e, r, c, v) + return false, nil + } + + select { + case <-ctx.Done(): + return false, ctx.Err() + case appliedEvents <- e: + t.Logf("[%T] resource %x updated: %x -> %x", e, r, c, n) + appliedResources[string(r)] = n + return true, nil + } + } + + deps.Handler.ResourceVersionFunc = func(ctx context.Context, r []byte) ([]byte, error) { + mu.Lock() + defer mu.Unlock() + + v := appliedResources[string(r)] + t.Logf("resource %x loaded: %x", r, v) + + return v, nil + } + + expectedStreamID := uuidpb.Generate() + expectedEvents := []*envelopepb.Envelope{ + deps.Packer.Pack(MessageE1), + deps.Packer.Pack(MessageE2), + deps.Packer.Pack(MessageE3), + } + + deps.EventConsumer.ConsumeFunc = func( + ctx context.Context, + streamID *uuidpb.UUID, + offset eventstream.Offset, + events chan<- eventstream.Event, + ) error { + var matching []*envelopepb.Envelope + + if streamID.Equal(expectedStreamID) && offset < eventstream.Offset(len(expectedEvents)) { + matching = expectedEvents[offset:] + } + + for i, env := range matching { + ese := eventstream.Event{ + StreamID: streamID, + Offset: eventstream.Offset(i), + Envelope: env, + } + + select { + case <-ctx.Done(): + return ctx.Err() + case events <- ese: + } + } + + <-ctx.Done() + return ctx.Err() + } + + deps.Supervisor.StreamIDs = []*uuidpb.UUID{expectedStreamID} + + c.InduceFailure(&deps) + + supervisorTask := test. + RunInBackground(t, "supervisor", deps.Supervisor.Run). + RepeatedlyUntilSuccess() + + for _, env := range expectedEvents { + expected, err := deps.Packer.Unpack(env) + if err != nil { + t.Fatal(err) + } + + test. + ExpectChannelToReceive( + tctx, + appliedEvents, + expected, + ) + } + + test. + ExpectChannelWouldBlock( + tctx, + appliedEvents, + ) + deps.Supervisor.Shutdown() + supervisorTask.WaitForSuccess() + }) + } + }) + + t.Run("it makes the event type available via the scope", func(t *testing.T) { + tctx := test.WithContext(t) + + deps := setup(tctx) + + env := deps.Packer.Pack(MessageE1) + + var supervisorTask *test.Task + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + expected := env.GetCreatedAt().AsTime() + if !s.RecordedAt().Equal(expected) { + t.Fatalf("unexpected recorded at time: got %s, want %s", s.RecordedAt(), expected) + } + + supervisorTask.Stop() + + return true, nil + } + + deps.EventConsumer.ConsumeFunc = func( + ctx context.Context, + streamID *uuidpb.UUID, + offset eventstream.Offset, + events chan<- eventstream.Event, + ) error { + ese := eventstream.Event{ + StreamID: streamID, + Offset: 0, + Envelope: env, + } + + select { + case <-ctx.Done(): + return ctx.Err() + case events <- ese: + } + + <-ctx.Done() + return ctx.Err() + } + + deps.Supervisor.StreamIDs = []*uuidpb.UUID{uuidpb.Generate()} + + supervisorTask = test. + RunInBackground(t, "supervisor", deps.Supervisor.Run). + UntilStopped() + supervisorTask.WaitUntilStopped() + }) +} diff --git a/internal/projection/doc.go b/internal/projection/doc.go new file mode 100644 index 00000000..f393e022 --- /dev/null +++ b/internal/projection/doc.go @@ -0,0 +1,2 @@ +// Package projection dispatches events to projection message handlers. +package projection diff --git a/internal/projection/eventconsumer.go b/internal/projection/eventconsumer.go new file mode 100644 index 00000000..4a069517 --- /dev/null +++ b/internal/projection/eventconsumer.go @@ -0,0 +1,17 @@ +package projection + +import ( + "context" + + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/veracity/internal/eventstream" +) + +type EventConsumer interface { + Consume( + ctx context.Context, + streamID *uuidpb.UUID, + offset eventstream.Offset, + events chan<- eventstream.Event, + ) error +} diff --git a/internal/projection/eventconsumer_test.go b/internal/projection/eventconsumer_test.go new file mode 100644 index 00000000..ea451767 --- /dev/null +++ b/internal/projection/eventconsumer_test.go @@ -0,0 +1,19 @@ +package projection_test + +import ( + "context" + + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/veracity/internal/eventstream" +) + +type eventConsumerStub struct { + ConsumeFunc func(ctx context.Context, streamID *uuidpb.UUID, offset eventstream.Offset, events chan<- eventstream.Event) error +} + +func (s *eventConsumerStub) Consume(ctx context.Context, streamID *uuidpb.UUID, offset eventstream.Offset, events chan<- eventstream.Event) error { + if s.ConsumeFunc != nil { + return s.ConsumeFunc(ctx, streamID, offset, events) + } + return nil +} diff --git a/internal/projection/packer_test.go b/internal/projection/packer_test.go new file mode 100644 index 00000000..81742105 --- /dev/null +++ b/internal/projection/packer_test.go @@ -0,0 +1,36 @@ +package projection_test + +import ( + "sync/atomic" + "time" + + "github.com/dogmatiq/enginekit/protobuf/identitypb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + . "github.com/dogmatiq/marshalkit/fixtures" + "github.com/dogmatiq/veracity/internal/envelope" +) + +func newPacker() *envelope.Packer { + var counter atomic.Uint64 + return &envelope.Packer{ + Application: identitypb.New("", uuidpb.Generate()), + Marshaler: Marshaler, + Now: func() time.Time { + return time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + }, + GenerateID: func() *uuidpb.UUID { + return deterministicUUID(counter.Add(1)) + }, + } +} + +func deterministicUUID(counter uint64) *uuidpb.UUID { + var data [16]byte + data[6] = (data[6] & 0x0f) | 0x40 // Version 4 + data[8] = (data[8] & 0x3f) | 0x80 // Variant is 10 (RFC 4122) + + id := uuidpb.FromByteArray(data) + id.Lower |= counter + + return id +} diff --git a/internal/projection/scope.go b/internal/projection/scope.go new file mode 100644 index 00000000..8f264262 --- /dev/null +++ b/internal/projection/scope.go @@ -0,0 +1,22 @@ +package projection + +import ( + "time" +) + +type scope struct { + recordedAt time.Time +} + +func (s *scope) RecordedAt() time.Time { + return s.recordedAt +} + +func (s *scope) IsPrimaryDelivery() bool { + // TODO + return true +} + +func (s *scope) Log(string, ...any) { + // TODO +} diff --git a/internal/projection/supervisor.go b/internal/projection/supervisor.go new file mode 100644 index 00000000..fc823434 --- /dev/null +++ b/internal/projection/supervisor.go @@ -0,0 +1,49 @@ +package projection + +import ( + "context" + + "github.com/dogmatiq/dogma" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/veracity/internal/envelope" + "github.com/dogmatiq/veracity/internal/signaling" + "golang.org/x/sync/errgroup" +) + +// A Supervisor coordinates projection workers. +type Supervisor struct { + Handler dogma.ProjectionMessageHandler + Packer *envelope.Packer + EventConsumer EventConsumer + StreamIDs []*uuidpb.UUID + + shutdown signaling.Latch +} + +func (s *Supervisor) Run(ctx context.Context) error { + eg, ctx := errgroup.WithContext(ctx) + + for _, streamID := range s.StreamIDs { + streamID := streamID // capture loop variable + + eg.Go( + func() error { + worker := &worker{ + Handler: s.Handler, + EventConsumer: s.EventConsumer, + Packer: s.Packer, + StreamID: streamID, + Shutdown: &s.shutdown, + } + return worker.Run(ctx) + }, + ) + } + + return eg.Wait() +} + +// Shutdown stops the supervisor when it next becomes idle. +func (s *Supervisor) Shutdown() { + s.shutdown.Signal() +} diff --git a/internal/projection/worker.go b/internal/projection/worker.go new file mode 100644 index 00000000..247f2c6e --- /dev/null +++ b/internal/projection/worker.go @@ -0,0 +1,149 @@ +package projection + +import ( + "context" + "encoding/binary" + "errors" + + "github.com/dogmatiq/dogma" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/veracity/internal/envelope" + "github.com/dogmatiq/veracity/internal/eventstream" + "github.com/dogmatiq/veracity/internal/fsm" + "github.com/dogmatiq/veracity/internal/signaling" +) + +// A worker dispatches events from a single eventstream to a projection handler. +type worker struct { + Handler dogma.ProjectionMessageHandler + Packer *envelope.Packer + EventConsumer EventConsumer + StreamID *uuidpb.UUID + Shutdown *signaling.Latch + + resource []byte + currentVersion []byte + events chan eventstream.Event + consumerError chan error + cancelConsumer context.CancelFunc +} + +// resourceFromStream returns the resource ID for a stream. +func resourceFromStream(streamID *uuidpb.UUID) []byte { + return streamID.AsBytes() +} + +// offsetFromVersion returns the offset for a version. +func offsetFromVersion(version []byte) (eventstream.Offset, error) { + switch len(version) { + case 0: + return 0, nil + case 8: + return eventstream.Offset(binary.BigEndian.Uint64(version)), nil + default: + return 0, errors.New("invalid version") + } +} + +// offsetToVersion returns the version for an offset. +func offsetToVersion(offset eventstream.Offset) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(offset)) + return b +} + +func (w *worker) Run(ctx context.Context) error { + defer func() { + if w.cancelConsumer != nil { + w.cancelConsumer() + } + }() + return fsm.Start(ctx, w.initState) +} + +func (w *worker) initState(ctx context.Context) fsm.Action { + if w.cancelConsumer != nil { + w.cancelConsumer() + + select { + case <-ctx.Done(): + case <-w.consumerError: + } + } + + w.resource = resourceFromStream(w.StreamID) + w.events = make(chan eventstream.Event) + w.consumerError = make(chan error, 1) + + var err error + w.currentVersion, err = w.Handler.ResourceVersion(ctx, w.resource) + if err != nil { + return fsm.Fail(err) + } + + offset, err := offsetFromVersion(w.currentVersion) + if err != nil { + return fsm.Fail(err) + } + + var consumeCtx context.Context + consumeCtx, w.cancelConsumer = context.WithCancel(ctx) + go func() { + w.consumerError <- w.EventConsumer.Consume(consumeCtx, w.StreamID, offset, w.events) + }() + + return fsm.EnterState(w.idleState) +} + +func (w *worker) idleState(ctx context.Context) fsm.Action { + select { + case <-ctx.Done(): + return fsm.Stop() + + case <-w.Shutdown.Signaled(): + return fsm.Stop() + + case err := <-w.consumerError: + if err == nil { + panic("consumer returned nil") + } + return fsm.Fail(err) + + case ese := <-w.events: + return fsm.With(ese).EnterState(w.handleEventState) + } +} + +// handleEventState handles events. +func (w *worker) handleEventState( + ctx context.Context, + ese eventstream.Event, +) fsm.Action { + e, err := w.Packer.Unpack(ese.Envelope) + if err != nil { + return fsm.Fail(err) + } + + n := offsetToVersion(ese.Offset + 1) + + ok, err := w.Handler.HandleEvent( + ctx, + w.resource, + w.currentVersion, + n, + &scope{ + recordedAt: ese.Envelope.GetCreatedAt().AsTime(), + }, + e, + ) + if err != nil { + return fsm.Fail(err) + } + if !ok { + return fsm.EnterState(w.initState) + } + + w.currentVersion = n + + return fsm.EnterState(w.idleState) +}