Skip to content

Commit

Permalink
Merge branch 'mailgun:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
ozon1234 authored Aug 4, 2022
2 parents a54fca0 + 46483ae commit ba25851
Show file tree
Hide file tree
Showing 17 changed files with 600 additions and 421 deletions.
28 changes: 28 additions & 0 deletions .github/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# syntax=docker/dockerfile:1.3

# This is a buildx compatible Dockerfile. Documentation can be found
# https://github.com/moby/buildkit/blob/master/frontend/dockerfile/docs/syntax.md

# This Dockerfile uses a base golang image with our pinned golang version
# See https://github.com/mailgun/dockerworld/images for current pinned version

# This creates an cached layer of our dependencies for subsequent builds to use
FROM golang:1.18.3 AS deps
WORKDIR /go/src
RUN --mount=type=bind,target=/go/src,rw go mod download

# ==========================================================================
# NOTE: Since tests are run in travis, we just build the container image
# ==========================================================================
# Run tests
#FROM deps as test
#RUN --mount=type=bind,target=/go/src,rw \
#go fmt ./... && \
#go vet ./... && \
#go test -v -p 1 -race -parallel=1 -tags holster_test_mode ./...

# Build cmds
FROM deps as build
RUN --mount=type=bind,target=/go/src,rw \
go version && \
CGO_ENABLED=0 go install -a -v ./...
9 changes: 9 additions & 0 deletions .github/create_topics.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/sh

set -ex

# wait for kafka server to load
sleep 5
docker exec -e JMX_PORT=5557 -i kafka-pixy_kafka_1 bin/kafka-topics.sh --create --zookeeper=zookeeper:2181 --topic test.1 --partitions 1 --replication-factor 1
docker exec -e JMX_PORT=5557 -i kafka-pixy_kafka_1 bin/kafka-topics.sh --create --zookeeper=zookeeper:2181 --topic test.4 --partitions 4 --replication-factor 1
docker exec -e JMX_PORT=5557 -i kafka-pixy_kafka_1 bin/kafka-topics.sh --create --zookeeper=zookeeper:2181 --topic test.64 --partitions 64 --replication-factor 1
72 changes: 72 additions & 0 deletions .github/workflows/build_and_push.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# ==============================================================
# NOTE: This workflow only builds a deployable container,
# See travis config for running tests.
# ==============================================================

name: Build Deployable Container

on:
pull_request:
branches: [ master, main ]

jobs:
build-and-publish:
runs-on: ubuntu-latest
steps:
- name: Checkout the repo
uses: actions/checkout@v3
# Create a build container which buildx will use as a driver when building the container.
# See https://github.com/docker/buildx/blob/master/docs/reference/buildx_create.md#driver
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
with:
# Enables host networking which allows tests run by buildx to access
# the containers started by docker compose on localhost
driver-opts: network=host
# Login to the registry
- name: Login to GitHub Container Registry
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.PKG_WRITER_PAT }}
# This creates a cache for the current PR, if none exists then
# the cache from the most recent PR will be used.
- name: Setup Docker layer Cache
uses: actions/cache@v3
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-docker-${{ github.event.number }}
restore-keys: ${{ runner.os }}-docker-
# Automagically extract useful information from the current github context and creates
# a set of labels for use by build-push-action to be attached to the final image.
- name: Extract Metadata for Docker
uses: docker/metadata-action@v4
with:
images: ${{ github.repository }}
id: meta
# This action runs Buildx, which allows for a more complex Dockerfile.
# We use a buildx Dockerfile to run tests as well as build the final image.
- name: Build and push
uses: docker/build-push-action@v2
with:
# We change the path context here since the github context does not include
# changes to local files, like when we download `Dockerfile`.
# See https://github.com/docker/build-push-action#git-context
context: .
file: .github/Dockerfile
tags: ghcr.io/${{ github.repository }}:PR${{ github.event.number }}
# We use local cache type, so we can clean up the cache
# https://github.com/docker/build-push-action/blob/master/docs/advanced/cache.md#local-cache
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,mode=max,dest=/tmp/.buildx-cache-new
labels: ${{ steps.meta.outputs.labels }}
push: true
# This is to avoid the cache sizes from continually growing as new image layers
# are created. See the following issues:
# https://github.com/docker/build-push-action/issues/252
# https://github.com/moby/buildkit/issues/1896
- name: Retire old cache
run: |
rm -rf /tmp/.buildx-cache
mv /tmp/.buildx-cache-new /tmp/.buildx-cache
60 changes: 60 additions & 0 deletions .github/workflows/test_on_push.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
name: Run tests

on:
pull_request:
branches: [ master, main ]
push:
branches: [ master, main ]

env:
KAFKA_PEERS: 127.0.0.1:9092
ZOOKEEPER_PEERS: 127.0.0.1:2181
KAFKA_HOSTNAME: 127.0.0.1

jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@master

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.17

- name: Login to GitHub Container Registry
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.PKG_WRITER_PAT }}

- name: Cache deps
uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Install deps
run: go mod download

- name: Add hosts to /etc/hosts
run: |
sudo echo "127.0.0.1 kafka" | sudo tee -a /etc/hosts
sudo echo "127.0.0.1 zookeeper" | sudo tee -a /etc/hosts
- name: Start containers
run: docker-compose up -d

- name: Create test topics
run: .github/create_topics.sh

- name: Run tests
run: make test

- name: Stop containers
if: always()
run: docker-compose down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: go
go:
- 1.12
- 1.18.3

env:
global:
Expand Down
4 changes: 2 additions & 2 deletions cmd/kafka-pixy-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func printOffsets(opts *args.Options, client pb.KafkaPixyClient) (int, error) {
Lag: lag,
}

data, err := json.MarshalIndent(offset, "", " ")
data, err := json.MarshalIndent(&offset, "", " ")
if err != nil {
return 1, errors.Wrap(err, "during JSON marshal")
}
Expand Down Expand Up @@ -345,7 +345,7 @@ func ListConsumers(parser *args.ArgParser, cast interface{}) (int, error) {
return 1, nil
}

ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
ctx, _ := context.WithTimeout(context.Background(), time.Second*60)
resp, err := client.ListConsumers(ctx, &pb.ListConsumersRq{
Group: opts.String("group"),
Topic: opts.String("topic"),
Expand Down
102 changes: 51 additions & 51 deletions consumer/multiplexer/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"reflect"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/mailgun/kafka-pixy/actor"
Expand All @@ -20,7 +21,7 @@ type T struct {
spawnInFn SpawnInFn
inputs map[int32]*input
output Out
isRunning bool
isRunning int64
stopCh chan none.T
wg sync.WaitGroup

Expand Down Expand Up @@ -75,7 +76,7 @@ type input struct {
// IsRunning returns `true` if multiplexer is running pumping events from the
// inputs to the output.
func (m *T) IsRunning() bool {
return m.isRunning
return atomic.LoadInt64(&m.isRunning) == 1
}

// IsSafe2Stop returns true if it is safe to stop all of the multiplexer
Expand Down Expand Up @@ -163,14 +164,14 @@ func (m *T) Stop() {

func (m *T) start() {
actor.Spawn(m.actDesc, &m.wg, m.run)
m.isRunning = true
atomic.StoreInt64(&m.isRunning, 1)
}

func (m *T) stopIfRunning() {
if m.isRunning {
if atomic.LoadInt64(&m.isRunning) == 1 {
m.stopCh <- none.V
m.wg.Wait()
m.isRunning = false
atomic.StoreInt64(&m.isRunning, 0)
}
}

Expand All @@ -197,51 +198,79 @@ reset:
}
selectCases[inputCount] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(m.stopCh)}

inputIdx := -1
// NOTE: When Stop() or WireUp() occurs the current `msg` might not
// be delivered before `case <-m.stopCh` is evaluated. In this case
// we store the message in m.sortedIns[X].msg for possible delivery
// on the next iteration after a `reset` has occurred. The exception
// to this is if KP shuts down the multiplexer while we have a message
// waiting, this messsage will be lost, however the offset tracker should
// eventually retry that message again some time in the fiture.

for {
// Collect next messages from inputs that have them available.
isAtLeastOneAvailable := false
for _, in := range m.sortedIns {

// If we have a message to deliver from a previous iteration,
// and we were interrupted by a WireUp(), send that message first.
if in.msgOk {
isAtLeastOneAvailable = true

select {
case m.output.Messages() <- in.msg:
in.msgOk = false
case <-m.stopCh:
return
}
continue
}

select {
case msg, ok := <-in.Messages():
// If a channel of an input is closed, then the input should be
// removed from the list of multiplexed inputs.
if !ok {
m.actDesc.Log().Infof("input channel closed: partition=%d", in.partition)
delete(m.inputs, in.partition)
m.refreshSortedIns()
goto reset
}
in.msg = msg
in.msgOk = true
isAtLeastOneAvailable = true

select {
case m.output.Messages() <- msg:
case <-m.stopCh:
// Store the message in case stopCh eval is due to a WireUp() call, and we
// need to provide this message later
in.msgOk = true
in.msg = msg
return
}
default:
}
}

if isAtLeastOneAvailable {
continue
}

// If none of the inputs has a message available, then wait until
// a message is fetched on any of them or a stop signal is received.
if !isAtLeastOneAvailable {
idx, value, _ := reflect.Select(selectCases)
// Check if it is a stop signal.
if idx == inputCount {
return
}
m.sortedIns[idx].msg = value.Interface().(consumer.Message)
m.sortedIns[idx].msgOk = true
idx, value, _ := reflect.Select(selectCases)
// Check if it is a stop signal.
if idx == inputCount {
return
}
// At this point there is at least one message available.
inputIdx = selectInput(inputIdx, m.sortedIns)

// Block until the output reads the next message of the selected input
// or a stop signal is received.
msg := value.Interface().(consumer.Message)
select {
case m.output.Messages() <- msg:
case <-m.stopCh:
// Store the message in case stopCh eval is due to a WireUp() call, and we
// need to provide this message later
m.sortedIns[idx].msgOk = true
m.sortedIns[idx].msg = msg
return
case m.output.Messages() <- m.sortedIns[inputIdx].msg:
m.sortedIns[inputIdx].msgOk = false
}
}
}
Expand All @@ -268,32 +297,3 @@ func hasPartition(partition int32, partitions []int32) bool {
}
return partitions[0] <= partition && partition <= partitions[count-1]
}

// selectInput picks an input that should be multiplexed next. It prefers the
// inputs with the largest lag. If there is more then one input with the same
// largest lag, then it picks the one that has index following prevSelectedIdx.
func selectInput(prevSelectedIdx int, sortedIns []*input) int {
maxLag := int64(-1)
selectedIdx := -1
for i, input := range sortedIns {
if !input.msgOk {
continue
}
lag := input.msg.HighWaterMark - input.msg.Offset
if lag > maxLag {
maxLag = lag
selectedIdx = i
continue
}
if lag < maxLag {
continue
}
if selectedIdx > prevSelectedIdx {
continue
}
if i > prevSelectedIdx {
selectedIdx = i
}
}
return selectedIdx
}
Loading

0 comments on commit ba25851

Please sign in to comment.