Skip to content

Commit

Permalink
Small improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
danog committed Sep 21, 2023
1 parent ba25851 commit 81f6d44
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 21 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Dockerfile
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ _testmain.go

kafka-pixy
.vagrant/
main
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ RUN go build -v -o /go/bin/kafka-pixy
FROM alpine:3.11
LABEL maintainer="Maxim Vladimirskiy <[email protected]>"
COPY --from=builder /go/bin/kafka-pixy /usr/bin/kafka-pixy
COPY ./entrypoint.sh /entrypoint.sh
EXPOSE 19091 19092
ENTRYPOINT ["/usr/bin/kafka-pixy"]
ENTRYPOINT ["/entrypoint.sh"]
11 changes: 2 additions & 9 deletions consumer/multiplexer/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/mailgun/kafka-pixy/actor"
"github.com/mailgun/kafka-pixy/consumer"
"github.com/mailgun/kafka-pixy/none"
"golang.org/x/exp/slices"
)

// T fetches messages from inputs and multiplexes them to the output, giving
Expand Down Expand Up @@ -123,7 +124,7 @@ func (m *T) WireUp(output Out, assigned []int32) {

// Stop inputs that are not assigned anymore.
for p, in := range m.inputs {
if !hasPartition(p, assigned) {
if !slices.Contains(assigned, p) {
wg.Add(1)
go func(in *input) {
defer wg.Done()
Expand Down Expand Up @@ -289,11 +290,3 @@ func makeSortedIns(inputs map[int32]*input) []*input {
}
return sortedIns
}

func hasPartition(partition int32, partitions []int32) bool {
count := len(partitions)
if count == 0 {
return false
}
return partitions[0] <= partition && partition <= partitions[count-1]
}
5 changes: 5 additions & 0 deletions entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/sh -e

/usr/bin/kafka-pixy --config /etc/kafka-pixy.yml


43 changes: 35 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,29 +1,56 @@
module github.com/mailgun/kafka-pixy

go 1.16
go 1.21

require (
github.com/Shopify/sarama v1.23.1
github.com/davecgh/go-spew v1.1.1
github.com/fatih/structs v1.1.0 // indirect
github.com/go-ini/ini v1.46.0 // indirect
github.com/gorilla/mux v1.8.0
github.com/mailgun/holster/v4 v4.0.0
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e
github.com/onsi/ginkgo v1.9.0 // indirect
github.com/onsi/gomega v1.6.0 // indirect
github.com/pkg/errors v0.9.1
github.com/samuel/go-zookeeper v0.0.0-20190810000440-0ceca61e4d75
github.com/sirupsen/logrus v1.8.1
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect
github.com/spf13/cast v1.3.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/thrawn01/args v0.3.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
google.golang.org/grpc v1.39.0
google.golang.org/protobuf v1.27.1
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 // indirect
github.com/eapache/go-resiliency v1.1.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.4.7 // indirect
github.com/go-ini/ini v1.46.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/hashicorp/go-uuid v1.0.1 // indirect
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 // indirect
github.com/kr/pretty v0.2.0 // indirect
github.com/kr/text v0.1.0 // indirect
github.com/onsi/ginkgo v1.9.0 // indirect
github.com/onsi/gomega v1.6.0 // indirect
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect
github.com/spf13/cast v1.3.0 // indirect
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.3.5 // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
gopkg.in/ini.v1 v1.46.0 // indirect
gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0
gopkg.in/jcmturner/gokrb5.v7 v7.2.3 // indirect
gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
9 changes: 6 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
Expand Down Expand Up @@ -256,6 +257,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down Expand Up @@ -323,8 +326,9 @@ golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
Expand All @@ -346,7 +350,6 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand Down

0 comments on commit 81f6d44

Please sign in to comment.