Caravan is a set of in-process message streaming tools for Go applications. Think "Kafka", but for the internal workings of your software. Caravan Streaming includes basic features for building Message Streams and Tables.
This is a work in progress. The basics are there, but not yet ready for production use. Use at your own risk
Creates a Producer and two Consumers, each of which consumes from the Producer independently.
package main
import (
"fmt"
"math/rand"
"github.com/caravan/essentials"
"github.com/caravan/streaming"
"github.com/caravan/streaming/stream/node"
)
func main() {
// Create new topics with permanent retention
left := essentials.NewTopic[int]()
right := essentials.NewTopic[int]()
out := essentials.NewTopic[int]()
s := streaming.NewStream(
node.Join(
node.Bind(
node.TopicConsumer(left),
node.Filter(func(i int) bool {
// Filter out numbers greater than or equal to 200
return i < 200
}),
),
node.Bind(
node.TopicConsumer(right),
node.Filter(func(i int) bool {
// Filter out numbers less than or equal to 100
return i > 100
}),
),
func(l int, r int) bool {
// Only join if the left is even, and the right is odd
return l%2 == 0 && r%2 == 1
},
func(l int, r int) int {
// Join by multiplying the numbers
return l * r
},
),
node.TopicProducer(out),
).Start()
done := make(chan struct{})
go func() {
// Start sending stuff to the topic
lp := left.NewProducer()
rp := right.NewProducer()
for i := 0; i < 10000; i++ {
lp.Send() <- rand.Intn(1000)
rp.Send() <- rand.Intn(1000)
}
lp.Close()
rp.Close()
}()
go func() {
c := out.NewConsumer()
for i := 0; i < 10; i++ {
// Display the first ten that come out
fmt.Println(<-c.Receive())
}
c.Close()
close(done)
}()
<-done
_ = s.Stop()
}