diff --git a/go.sum b/go.sum index c3575eeb..bf0534d5 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,45 @@ +cloud.google.com/go/compute v1.24.0 h1:phWcR2eWzRJaL/kOiJwfFsPs4BaKq1j6vnpZrc1YlVg= +cloud.google.com/go/compute v1.24.0/go.mod h1:kw1/T+h/+tK2LJK0wiPPx1intgdAM3j/g3hFDlscY40= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= +cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= +github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= +github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/config v1.27.10 h1:PS+65jThT0T/snC5WjyfHHyUgG+eBoupSDV+f838cro= +github.com/aws/aws-sdk-go-v2/config v1.27.10/go.mod h1:BePM7Vo4OBpHreKRUMuDXX+/+JWP38FLkzl5m27/Jjs= +github.com/aws/aws-sdk-go-v2/credentials v1.17.10 h1:qDZ3EA2lv1KangvQB6y258OssCHD0xvaGiEDkG4X/10= +github.com/aws/aws-sdk-go-v2/credentials v1.17.10/go.mod h1:6t3sucOaYDwDssHQa0ojH1RpmVmF5/jArkye1b2FKMI= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 h1:FVJ0r5XTHSmIHJV6KuDmdYhEpvlHpiSd38RQWhut5J4= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1/go.mod h1:zusuAeqezXzAB24LGuzuekqMAEgWkVYukBec3kr3jUg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.31.1 h1:dZXY07Dm59TxAjJcUfNMJHLDI/gLMxTRZefn2jFAVsw= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.31.1/go.mod h1:lVLqEtX+ezgtfalyJs7Peb0uv9dEpAQP5yuq2O26R44= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.6 h1:6tayEze2Y+hiL3kdnEUxSPsP+pJsUfwLSFspFl1ru9Q= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.6/go.mod h1:qVNb/9IOVsLCZh0x2lnagrBwQ9fxajUpXS7OZfIsKn0= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 h1:ogRAwT1/gxJBcSWDMZlgyFUM962F51A5CRhDLbxLdmo= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7/go.mod h1:YCsIZhXfRPLFFCl5xxY+1T9RKzOKjCut+28JSX2DnAk= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.4 h1:WzFol5Cd+yDxPAdnzTA5LmpHYSWinhmSj4rQChV0ee8= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.4/go.mod h1:qGzynb/msuZIE8I75DVRCUXw3o3ZyBmUvMwQ2t/BrGM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 h1:Jux+gDDyi1Lruk+KHF91tK2KCuY61kzoCpvtvJJBtOE= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4/go.mod h1:mUYPBhaF2lGiukDEjJX2BLRRKTmoUSitGDUgM4tRxak= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n2HZPkcKgPAi1phU= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.6/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= +github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= +github.com/dave/jennifer v1.7.0 h1:uRbSBH9UTS64yXbh4FrMHfgfY762RD+C7bUPKODpSJE= +github.com/dave/jennifer v1.7.0/go.mod h1:nXbxhEmQfOZhWml3D1cDK5M1FLnMSozpbFN/m3RmGZc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dogmatiq/configkit v0.13.0 h1:pV2Pz0iBUBnRfOm6tbWVRXvuh2bWHBScOh8KfVp/N68= @@ -42,6 +82,12 @@ github.com/dogmatiq/sqltest v0.3.0 h1:DCwyLWfVk/ZHsqq5Itq3H/Lqsh/CIQ6nIRwI4YLywF github.com/dogmatiq/sqltest v0.3.0/go.mod h1:a8Da8NhU4m3lq5Sybhiv+ZQowSnGHWTIJHFNInVtffg= github.com/dogmatiq/testkit v0.13.11 h1:ikXg/Cxq58tzHL27JKCkVqUUElJCHcso7N/ymd3Wins= github.com/dogmatiq/testkit v0.13.11/go.mod h1:GDAEnCkfb8Chmbe+Dc4DfRaaCCV7eqOza61shb1hlE0= +github.com/emicklei/dot v1.6.1 h1:ujpDlBkkwgWUY+qPId5IwapRW/xEoligRSYjioR6DFI= +github.com/emicklei/dot v1.6.1/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s= +github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI= +github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -56,6 +102,8 @@ github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gG github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= +github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= @@ -63,6 +111,8 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -72,6 +122,7 @@ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= @@ -91,6 +142,8 @@ github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= github.com/jmalloc/gomegax v0.0.0-20200507221434-64fca4c0e03a h1:Gk7Gkwl1KUJII/FiAjvBjRgEz/lpvTV8kNYp+9jdpuk= github.com/jmalloc/gomegax v0.0.0-20200507221434-64fca4c0e03a/go.mod h1:TZpc8ObQEKqTuy1/VXpPRfcMU80QFDU4zK3nchXts/k= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -118,6 +171,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ= github.com/onsi/gomega v1.32.0 h1:JRYU78fJ1LPxlckP6Txi/EYqJvjtMrDC04/MM5XRHPk= github.com/onsi/gomega v1.32.0/go.mod h1:a4x4gW6Pz2yK1MAmvluYme5lvYTn61afQ2ETw/8n4Lg= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -129,6 +184,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ= +go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= go.opentelemetry.io/otel/metric v1.25.0 h1:LUKbS7ArpFL/I2jJHdJcqMGxkRdxpPHE0VU/D4NuEwA= @@ -143,12 +200,16 @@ golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/exp v0.0.0-20230810033253-352e893a4cad h1:g0bG7Z4uG+OgH2QDODnjp6ggkk1bJDsINcuWmJN1iJU= golang.org/x/exp v0.0.0-20230810033253-352e893a4cad/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= +golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ= +golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= @@ -163,6 +224,8 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -172,6 +235,14 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= +google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY= +google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo= +google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de h1:jFNzHPIeuzhdRwVhbZdiym9q0ory/xY3sA+v2wPg8I0= +google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:5iCWqnniDlqZHrd3neWVTOwvh/v6s3232omMecelax8= google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= @@ -187,12 +258,15 @@ google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw= diff --git a/internal/eventstream/append.go b/internal/eventstream/append.go index 9344f0ca..045b7b8c 100644 --- a/internal/eventstream/append.go +++ b/internal/eventstream/append.go @@ -1,8 +1,13 @@ package eventstream import ( + "context" + "log/slog" + "github.com/dogmatiq/enginekit/protobuf/envelopepb" "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/persistencekit/journal" + "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" ) // AppendRequest is a request to append events to an event stream. @@ -31,3 +36,176 @@ type AppendResponse struct { // [AppendRequest] and hence deduplicated. AppendedByPriorAttempt bool } + +func (w *worker) handleAppend( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + if !req.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + if len(req.Events) == 0 { + // We panic rather than just failing the exchange because we never want + // empty requests to occupy space in the worker's queue. The sender + // should simply not send empty requests. + panic("received append request with no events") + } + + defer w.resetIdleTimer() + + if req.LowestPossibleOffset > w.nextOffset { + if err := w.catchUpWithJournal(ctx); err != nil { + return AppendResponse{}, err + } + } + + for { + res, err := w.findPriorAppend(ctx, req) + if err != nil { + return AppendResponse{}, err + } + + if res.AppendedByPriorAttempt { + for index, event := range req.Events { + w.Logger.Info( + "discarded duplicate event", + slog.Uint64("stream_offset", uint64(res.BeginOffset)+uint64(index)), + slog.String("message_id", event.MessageId.AsString()), + slog.String("description", event.Description), + ) + } + return res, nil + } + + res, err = w.writeEventsToJournal(ctx, req) + if err == nil { + w.publishEvents(res.BeginOffset, req.Events) + return res, nil + } + + if err != journal.ErrConflict { + return AppendResponse{}, err + } + + if err := w.catchUpWithJournal(ctx); err != nil { + return AppendResponse{}, err + } + } +} + +// findPriorAppend returns an [AppendResponse] if the given [AppendRequest] has +// already been handled. +func (w *worker) findPriorAppend( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + // If the lowest possible offset is ahead of the next offset the request is + // malformed. Either theres a bug in Veracity, or the journal has suffered + // catastrophic data loss. + if req.LowestPossibleOffset > w.nextOffset { + panic("lowest possible offset is greater than the next offset") + } + + // If the lowest possible offset is equal to the next offset, no events + // have been recorded since the the request was created, and hence there + // can be no prior append attempt. + if req.LowestPossibleOffset == w.nextOffset { + return AppendResponse{}, nil + } + + // If the lowest possible offset is in the cache, we can check for + // duplicates without using the journal. We search using the last event in + // the request as it's the most likely to still be in the cache. + lowestPossibleOffset := req.LowestPossibleOffset + Offset(len(req.Events)) + + if cacheIndex := w.findInCache(lowestPossibleOffset); cacheIndex != -1 { + lastMessageIndex := len(req.Events) - 1 + lastMessageID := req.Events[lastMessageIndex].MessageId + + for _, event := range w.recentEvents[cacheIndex:] { + if event.Envelope.MessageId.Equal(lastMessageID) { + return AppendResponse{ + // We know the offset of the last message in the request, so + // we can compute the offset of the first message, even if + // it's no longer in the cache. + BeginOffset: event.Offset - Offset(lastMessageIndex), + EndOffset: event.Offset + 1, + AppendedByPriorAttempt: true, + }, nil + } + } + } + + // Finally, we search the journal for the record containing the events. + rec, err := journal.ScanFromSearchResult( + ctx, + w.Journal, + 0, + w.nextPos, + eventstreamjournal.SearchByOffset(uint64(req.LowestPossibleOffset)), + func( + _ context.Context, + _ journal.Position, + rec *eventstreamjournal.Record, + ) (*eventstreamjournal.Record, bool, error) { + if op := rec.GetEventsAppended(); op != nil { + targetID := req.Events[0].MessageId + candidateID := op.Events[0].MessageId + return rec, candidateID.Equal(targetID), nil + } + return nil, false, nil + }, + ) + if err != nil { + return AppendResponse{}, journal.IgnoreNotFound(err) + } + + return AppendResponse{ + BeginOffset: Offset(rec.StreamOffsetBefore), + EndOffset: Offset(rec.StreamOffsetAfter), + AppendedByPriorAttempt: true, + }, nil +} + +func (w *worker) writeEventsToJournal( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + before := w.nextOffset + after := w.nextOffset + Offset(len(req.Events)) + + if err := w.Journal.Append( + ctx, + w.nextPos, + eventstreamjournal. + NewRecordBuilder(). + WithStreamOffsetBefore(uint64(before)). + WithStreamOffsetAfter(uint64(after)). + WithEventsAppended(&eventstreamjournal.EventsAppended{ + Events: req.Events, + }). + Build(), + ); err != nil { + return AppendResponse{}, err + } + + for index, event := range req.Events { + w.Logger.Info( + "appended event to the stream", + slog.Uint64("journal_position", uint64(w.nextPos)), + slog.Uint64("stream_offset", uint64(before)+uint64(index)), + slog.String("message_id", event.MessageId.AsString()), + slog.String("description", event.Description), + ) + } + + w.nextPos++ + w.nextOffset = after + + return AppendResponse{ + BeginOffset: before, + EndOffset: after, + AppendedByPriorAttempt: false, + }, nil +} diff --git a/internal/eventstream/append_test.go b/internal/eventstream/append_test.go index 4a791440..0e19c28e 100644 --- a/internal/eventstream/append_test.go +++ b/internal/eventstream/append_test.go @@ -26,6 +26,7 @@ func TestAppend(t *testing.T) { Journals *memoryjournal.BinaryStore Supervisor *Supervisor Packer *envelope.Packer + Barrier chan struct{} } setup := func(t test.TestingT) (deps dependencies) { @@ -41,6 +42,8 @@ func TestAppend(t *testing.T) { Marshaler: Marshaler, } + deps.Barrier = make(chan struct{}) + return deps } @@ -51,26 +54,26 @@ func TestAppend(t *testing.T) { cases := []struct { Desc string - InduceFailure func(*dependencies) + InduceFailure func(context.Context, *testing.T, *dependencies) }{ { Desc: "no faults", - InduceFailure: func(*dependencies) { - }, }, { Desc: "failure to open journal", - InduceFailure: func(deps *dependencies) { + InduceFailure: func(_ context.Context, t *testing.T, deps *dependencies) { test.FailOnJournalOpen( deps.Journals, eventstreamjournal.Name(streamID), errors.New(""), ) + t.Log("configured journal store to fail when opening the journal") + close(deps.Barrier) }, }, { Desc: "failure before appending to journal", - InduceFailure: func(deps *dependencies) { + InduceFailure: func(_ context.Context, t *testing.T, deps *dependencies) { test.FailBeforeJournalAppend( deps.Journals, eventstreamjournal.Name(streamID), @@ -79,11 +82,13 @@ func TestAppend(t *testing.T) { }, errors.New(""), ) + t.Log("configured journal store to fail before appending a record") + close(deps.Barrier) }, }, { Desc: "failure after appending to journal", - InduceFailure: func(deps *dependencies) { + InduceFailure: func(_ context.Context, t *testing.T, deps *dependencies) { test.FailAfterJournalAppend( deps.Journals, eventstreamjournal.Name(streamID), @@ -92,6 +97,56 @@ func TestAppend(t *testing.T) { }, errors.New(""), ) + t.Log("configured journal store to fail after appending a record") + close(deps.Barrier) + }, + }, + { + Desc: "optimistic concurrency conflict", + InduceFailure: func(ctx context.Context, t *testing.T, deps *dependencies) { + go func() { + if _, err := deps.Supervisor.AppendQueue.Do( + ctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + deps.Packer.Pack(MessageX1), + }, + }, + ); err != nil { + t.Error(err) + return + } + + t.Log("confirmed that the supervisor-under-test is running") + + s := &Supervisor{ + Journals: deps.Journals, + Logger: spruce.NewLogger(t), + } + + defer test. + RunInBackground(t, "conflict-generating-supervisor", s.Run). + UntilStopped(). + Stop() + + if _, err := s.AppendQueue.Do( + ctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + deps.Packer.Pack(MessageX2), + }, + }, + ); err != nil { + t.Error(err) + return + } + + t.Log("appended events using a different supervisor to induce a journal conflict") + + close(deps.Barrier) + }() }, }, } @@ -101,10 +156,10 @@ func TestAppend(t *testing.T) { tctx := test.WithContext(t) deps := setup(tctx) - t.Log("append some initial events to the stream") + t.Log("seeding the event stream with some initial events") supervisor := test. - RunInBackground(t, "supervisor", deps.Supervisor.Run). + RunInBackground(t, "event-seeding-supervisor", deps.Supervisor.Run). UntilStopped() res, err := deps.Supervisor.AppendQueue.Do( @@ -124,14 +179,35 @@ func TestAppend(t *testing.T) { supervisor.StopAndWait() - t.Log("induce a failure") + // Open a journal that was can use for verifying results + // _before_ inducing any failure. + j, err := eventstreamjournal.Open(tctx, deps.Journals, streamID) + if err != nil { + t.Fatal(err) + } + defer j.Close() - c.InduceFailure(&deps) + if c.InduceFailure != nil { + c.InduceFailure(tctx, t, &deps) + } else { + close(deps.Barrier) + } supervisor = test. - RunInBackground(t, "supervisor", deps.Supervisor.Run). + RunInBackground(t, "supervisor-under-test", deps.Supervisor.Run). RepeatedlyUntilStopped() + <-deps.Barrier + + // Read the journal bounds as they exist before the test + // commences. + begin, end, err := j.Bounds(tctx) + if err != nil { + t.Fatal(err) + } + + t.Logf("proceeding with test, journal bounds are [%d, %d)", begin, end) + event := deps.Packer.Pack(MessageE1) req := AppendRequest{ @@ -159,16 +235,11 @@ func TestAppend(t *testing.T) { t.Log("ensure that the event was appended to the stream exactly once") - j, err := eventstreamjournal.Open(tctx, deps.Journals, streamID) - if err != nil { - t.Fatal(err) - } - var events []*envelopepb.Envelope if err := j.Range( tctx, - 1, + end, // only read the records appended during the test func( ctx context.Context, _ journal.Position, diff --git a/internal/eventstream/workercache.go b/internal/eventstream/cache.go similarity index 85% rename from internal/eventstream/workercache.go rename to internal/eventstream/cache.go index 072ecb5e..a3064c2d 100644 --- a/internal/eventstream/workercache.go +++ b/internal/eventstream/cache.go @@ -30,7 +30,7 @@ func (w *worker) findInCache(offset Offset) int { // growCache grows the cache capacity to fit an additional n events. It removes // old events if necessary. // -// It returns the number of events that may be added to the cache. +// It returns the difference between n and the actual capacity of the cache. func (w *worker) growCache(n int) int { begin := 0 end := len(w.recentEvents) @@ -39,10 +39,12 @@ func (w *worker) growCache(n int) int { panic("cache is over capacity, always use appendToCache() to add events") } - if n >= maxCacheCapacity { + capacity := n + + if capacity >= maxCacheCapacity { // We've requested the entire cache, so just clear it entirely. end = 0 - n = maxCacheCapacity + capacity = maxCacheCapacity } else { // Otherwise, first remove any events that are older than the cache TTL. for index, event := range w.recentEvents[begin:end] { @@ -55,9 +57,8 @@ func (w *worker) growCache(n int) int { } // Then, if we still don't have enough space, remove the oldest events. - capacity := end - begin + n - if capacity > maxCacheCapacity { - begin += capacity - maxCacheCapacity + if c := end - begin + capacity; c > maxCacheCapacity { + begin += c - maxCacheCapacity } } @@ -66,9 +67,9 @@ func (w *worker) growCache(n int) int { copy(w.recentEvents, w.recentEvents[begin:end]) w.recentEvents = w.recentEvents[:end-begin] - w.recentEvents = slices.Grow(w.recentEvents, n) + w.recentEvents = slices.Grow(w.recentEvents, capacity) - return n + return n - capacity } // appendEventToCache appends the given event to the cache of recent events. diff --git a/internal/eventstream/workeridle.go b/internal/eventstream/idle.go similarity index 100% rename from internal/eventstream/workeridle.go rename to internal/eventstream/idle.go diff --git a/internal/eventstream/reader.go b/internal/eventstream/reader.go index 2d04baa2..17f3873a 100644 --- a/internal/eventstream/reader.go +++ b/internal/eventstream/reader.go @@ -3,6 +3,7 @@ package eventstream import ( "context" "fmt" + "log/slog" "time" "github.com/dogmatiq/enginekit/protobuf/envelopepb" @@ -169,3 +170,181 @@ func (r *Reader) unsubscribe(ctx context.Context, sub *Subscriber) error { return r.UnsubscribeQueue.Do(ctx, sub) } + +// handleSubscribe adds sub to the subscriber list. +// +// It delivers any cached events that the subscriber has not yet seen. If the +// subscriber's requested event is older than the events in the cache the +// subscription is canceled immediately. +func (w *worker) handleSubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + if w.subscribers == nil { + w.subscribers = map[*Subscriber]struct{}{} + } + w.subscribers[sub] = struct{}{} + + w.Logger.Debug( + "subscription activated", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Uint64("requested_stream_offset", uint64(sub.Offset)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + + if sub.Offset >= w.nextOffset { + return + } + + index := w.findInCache(sub.Offset) + + if index == -1 { + sub.canceled.Signal() + w.Logger.Warn( + "subscription canceled immediately due request for historical events", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Int("cached_event_count", len(w.recentEvents)), + slog.Uint64("requested_stream_offset", uint64(sub.Offset)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + return + } + + for _, event := range w.recentEvents[index:] { + if w.deliverEventToSubscriber(event, sub) == subscriptionCanceled { + return + } + } +} + +// handleUnsubscribe removes sub from the subscriber list. +func (w *worker) handleUnsubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + before := len(w.subscribers) + delete(w.subscribers, sub) + after := len(w.subscribers) + + if before > after { + sub.canceled.Signal() + + w.Logger.Debug( + "subscription canceled by subscriber", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", after), + ) + } +} + +// deliverResult is an enumeration of the possible outcomes of delivering an +// event to a subscriber. +type deliverResult int + +const ( + // eventDelivered means that the event was sent to the subscriber's events + // channel, which may or may not be buffered. + eventDelivered deliverResult = iota + + // eventFiltered means that the event was filtered by the subscriber's + // filter function, and did not need to be delivered. + eventFiltered + + // subscriptionCanceled means that an attempt was made to send the event to + // the subscriber's event channel, but the channel buffer was full (or + // unbuffered and not ready to read), and so the subscription was canceled. + subscriptionCanceled +) + +// deliverEventToSubscriber attempts to deliver an event to a subscriber's event +// channel. +func (w *worker) deliverEventToSubscriber(event Event, sub *Subscriber) deliverResult { + if event.Offset > sub.Offset { + panic("event is out of order") + } + + if event.Offset < sub.Offset { + return eventFiltered + } + + if !sub.Filter(event.Envelope) { + sub.Offset++ + return eventFiltered + } + + select { + case sub.Events <- event: + sub.Offset++ + return eventDelivered + + default: + delete(w.subscribers, sub) + sub.canceled.Signal() + + w.Logger.Warn( + "subscription canceled because the subscriber can not keep up with the event stream", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", 0), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Uint64("stream_offset", uint64(event.Offset)), + ) + + return subscriptionCanceled + } +} + +// publishEvents publishes the events to both the recent event cache and any +// interested subscribers. +func (w *worker) publishEvents( + offset Offset, + events []*envelopepb.Envelope, +) { + skip := w.growCache(len(events)) + + for i, env := range events { + event := Event{w.StreamID, offset, env} + offset++ + + if i >= skip { + w.appendEventToCache(event) + } + + if len(w.subscribers) == 0 { + continue + } + + var delivered, filtered, canceled int + + for sub := range w.subscribers { + switch w.deliverEventToSubscriber(event, sub) { + case eventDelivered: + delivered++ + case eventFiltered: + filtered++ + case subscriptionCanceled: + canceled++ + } + } + + w.Logger.Debug( + "event published to subscribers", + slog.Uint64("stream_offset", uint64(event.Offset)), + slog.String("message_id", env.MessageId.AsString()), + slog.String("description", env.Description), + slog.Int("delivered_count", delivered), + slog.Int("filtered_count", filtered), + slog.Int("canceled_count", canceled), + ) + } +} diff --git a/internal/eventstream/worker.go b/internal/eventstream/worker.go index b8c51f60..4d6c66c6 100644 --- a/internal/eventstream/worker.go +++ b/internal/eventstream/worker.go @@ -48,7 +48,7 @@ type worker struct { // shutdown by the supervisor, or the idle timeout expires. func (w *worker) Run(ctx context.Context) (err error) { defer func() { - if err != nil { + if err != nil && err != context.Canceled { w.Logger.Debug( "event stream worker stopped due to an error", slog.String("error", err.Error()), diff --git a/internal/eventstream/workerappend.go b/internal/eventstream/workerappend.go deleted file mode 100644 index ad0d4158..00000000 --- a/internal/eventstream/workerappend.go +++ /dev/null @@ -1,226 +0,0 @@ -package eventstream - -import ( - "context" - "log/slog" - - "github.com/dogmatiq/enginekit/protobuf/envelopepb" - "github.com/dogmatiq/persistencekit/journal" - "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" -) - -func (w *worker) handleAppend( - ctx context.Context, - req AppendRequest, -) (AppendResponse, error) { - if !req.StreamID.Equal(w.StreamID) { - panic("received request for a different stream ID") - } - - if len(req.Events) == 0 { - // We panic rather than just failing the exchange because we never want - // empty requests to occupy space in the worker's queue. The sender - // should simply not send empty requests. - panic("received append request with no events") - } - - defer w.resetIdleTimer() - - if req.LowestPossibleOffset > w.nextOffset { - if err := w.catchUpWithJournal(ctx); err != nil { - return AppendResponse{}, err - } - } - - for { - res, err := w.findPriorAppend(ctx, req) - if err != nil { - return AppendResponse{}, err - } - - if res.AppendedByPriorAttempt { - for index, event := range req.Events { - w.Logger.Info( - "discarded duplicate event", - slog.Uint64("stream_offset", uint64(res.BeginOffset)+uint64(index)), - slog.String("message_id", event.MessageId.AsString()), - slog.String("description", event.Description), - ) - } - return res, nil - } - - res, err = w.writeEventsToJournal(ctx, req) - if err == nil { - w.publishEvents(res.BeginOffset, req.Events) - return res, nil - } - - if err != journal.ErrConflict { - return AppendResponse{}, err - } - - if err := w.catchUpWithJournal(ctx); err != nil { - return AppendResponse{}, err - } - } -} - -// findPriorAppend returns an [AppendResponse] if the given [AppendRequest] has -// already been handled. -func (w *worker) findPriorAppend( - ctx context.Context, - req AppendRequest, -) (AppendResponse, error) { - // If the lowest possible offset is ahead of the next offset the request is - // malformed. Either theres a bug in Veracity, or the journal has suffered - // catastrophic data loss. - if req.LowestPossibleOffset > w.nextOffset { - panic("lowest possible offset is greater than the next offset") - } - - // If the lowest possible offset is equal to the next offset, no events - // have been recorded since the the request was created, and hence there - // can be no prior append attempt. - if req.LowestPossibleOffset == w.nextOffset { - return AppendResponse{}, nil - } - - // If the lowest possible offset is in the cache, we can check for - // duplicates without using the journal. We search using the last event in - // the request as it's the most likely to still be in the cache. - lowestPossibleOffset := req.LowestPossibleOffset + Offset(len(req.Events)) - - if cacheIndex := w.findInCache(lowestPossibleOffset); cacheIndex != -1 { - lastMessageIndex := len(req.Events) - 1 - lastMessageID := req.Events[lastMessageIndex].MessageId - - for _, event := range w.recentEvents[cacheIndex:] { - if event.Envelope.MessageId.Equal(lastMessageID) { - return AppendResponse{ - // We know the offset of the last message in the request, so - // we can compute the offset of the first message, even if - // it's no longer in the cache. - BeginOffset: event.Offset - Offset(lastMessageIndex), - EndOffset: event.Offset + 1, - AppendedByPriorAttempt: true, - }, nil - } - } - } - - // Finally, we search the journal for the record containing the events. - rec, err := journal.ScanFromSearchResult( - ctx, - w.Journal, - 0, - w.nextPos, - eventstreamjournal.SearchByOffset(uint64(req.LowestPossibleOffset)), - func( - _ context.Context, - _ journal.Position, - rec *eventstreamjournal.Record, - ) (*eventstreamjournal.Record, bool, error) { - if op := rec.GetEventsAppended(); op != nil { - targetID := req.Events[0].MessageId - candidateID := op.Events[0].MessageId - return rec, candidateID.Equal(targetID), nil - } - return nil, false, nil - }, - ) - if err != nil { - return AppendResponse{}, journal.IgnoreNotFound(err) - } - - return AppendResponse{ - BeginOffset: Offset(rec.StreamOffsetBefore), - EndOffset: Offset(rec.StreamOffsetAfter), - AppendedByPriorAttempt: true, - }, nil -} - -func (w *worker) writeEventsToJournal( - ctx context.Context, - req AppendRequest, -) (AppendResponse, error) { - before := w.nextOffset - after := w.nextOffset + Offset(len(req.Events)) - - if err := w.Journal.Append( - ctx, - w.nextPos, - eventstreamjournal. - NewRecordBuilder(). - WithStreamOffsetBefore(uint64(before)). - WithStreamOffsetAfter(uint64(after)). - WithEventsAppended(&eventstreamjournal.EventsAppended{ - Events: req.Events, - }). - Build(), - ); err != nil { - return AppendResponse{}, err - } - - for index, event := range req.Events { - w.Logger.Info( - "appended event to the stream", - slog.Uint64("journal_position", uint64(w.nextPos)), - slog.Uint64("stream_offset", uint64(before)+uint64(index)), - slog.String("message_id", event.MessageId.AsString()), - slog.String("description", event.Description), - ) - } - - w.nextPos++ - w.nextOffset = after - - return AppendResponse{ - BeginOffset: before, - EndOffset: after, - AppendedByPriorAttempt: false, - }, nil -} - -// publishEvents publishes the events to both the recent event cache and any -// interested subscribers. -func (w *worker) publishEvents( - offset Offset, - events []*envelopepb.Envelope, -) { - w.growCache(len(events)) - - for _, env := range events { - event := Event{w.StreamID, offset, env} - offset++ - - w.appendEventToCache(event) - - if len(w.subscribers) == 0 { - continue - } - - var delivered, filtered, canceled int - - for sub := range w.subscribers { - switch w.deliverEventToSubscriber(event, sub) { - case eventDelivered: - delivered++ - case eventFiltered: - filtered++ - case subscriptionCanceled: - canceled++ - } - } - - w.Logger.Debug( - "event published to subscribers", - slog.Uint64("stream_offset", uint64(event.Offset)), - slog.String("message_id", env.MessageId.AsString()), - slog.String("description", env.Description), - slog.Int("delivered_count", delivered), - slog.Int("filtered_count", filtered), - slog.Int("canceled_count", canceled), - ) - } -} diff --git a/internal/eventstream/workersubscriber.go b/internal/eventstream/workersubscriber.go deleted file mode 100644 index eb8a2325..00000000 --- a/internal/eventstream/workersubscriber.go +++ /dev/null @@ -1,139 +0,0 @@ -package eventstream - -import ( - "fmt" - "log/slog" -) - -// handleSubscribe adds sub to the subscriber list. -// -// It delivers any cached events that the subscriber has not yet seen. If the -// subscriber's requested event is older than the events in the cache the -// subscription is canceled immediately. -func (w *worker) handleSubscribe(sub *Subscriber) { - if !sub.StreamID.Equal(w.StreamID) { - panic("received request for a different stream ID") - } - - if w.subscribers == nil { - w.subscribers = map[*Subscriber]struct{}{} - } - w.subscribers[sub] = struct{}{} - - w.Logger.Debug( - "subscription activated", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Uint64("requested_stream_offset", uint64(sub.Offset)), - slog.Uint64("next_stream_offset", uint64(w.nextOffset)), - ) - - if sub.Offset >= w.nextOffset { - return - } - - index := w.findInCache(sub.Offset) - - if index == -1 { - sub.canceled.Signal() - w.Logger.Warn( - "subscription canceled immediately due request for historical events", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Int("cached_event_count", len(w.recentEvents)), - slog.Uint64("requested_stream_offset", uint64(sub.Offset)), - slog.Uint64("next_stream_offset", uint64(w.nextOffset)), - ) - return - } - - for _, event := range w.recentEvents[index:] { - if w.deliverEventToSubscriber(event, sub) == subscriptionCanceled { - return - } - } -} - -// handleUnsubscribe removes sub from the subscriber list. -func (w *worker) handleUnsubscribe(sub *Subscriber) { - if !sub.StreamID.Equal(w.StreamID) { - panic("received request for a different stream ID") - } - - before := len(w.subscribers) - delete(w.subscribers, sub) - after := len(w.subscribers) - - if before > after { - sub.canceled.Signal() - - w.Logger.Debug( - "subscription canceled by subscriber", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", after), - ) - } -} - -// deliverResult is an enumeration of the possible outcomes of delivering an -// event to a subscriber. -type deliverResult int - -const ( - // eventDelivered means that the event was sent to the subscriber's events - // channel, which may or may not be buffered. - eventDelivered deliverResult = iota - - // eventFiltered means that the event was filtered by the subscriber's - // filter function, and did not need to be delivered. - eventFiltered - - // subscriptionCanceled means that an attempt was made to send the event to - // the subscriber's event channel, but the channel buffer was full (or - // unbuffered and not ready to read), and so the subscription was canceled. - subscriptionCanceled -) - -// deliverEventToSubscriber attempts to deliver an event to a subscriber's event -// channel. -func (w *worker) deliverEventToSubscriber(event Event, sub *Subscriber) deliverResult { - if event.Offset > sub.Offset { - panic("event is out of order") - } - - if event.Offset < sub.Offset { - return eventFiltered - } - - if !sub.Filter(event.Envelope) { - sub.Offset++ - return eventFiltered - } - - select { - case sub.Events <- event: - sub.Offset++ - return eventDelivered - - default: - delete(w.subscribers, sub) - sub.canceled.Signal() - - w.Logger.Warn( - "subscription canceled because the subscriber can not keep up with the event stream", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", 0), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Uint64("stream_offset", uint64(event.Offset)), - ) - - return subscriptionCanceled - } -}