Skip to content

Commit

Permalink
add:kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
CocaineCong committed Aug 26, 2023
1 parent 06be933 commit ca227a1
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 3 deletions.
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
Services map[string]*Service `yaml:"services"`
Domain map[string]*Domain `yaml:"domain"`
SeConfig *SeConfig `yaml:"SeConfig"`
Kafka *Kafka `yaml:"kafka"`
}

type SeConfig struct {
Expand Down Expand Up @@ -66,6 +67,10 @@ type Service struct {
Addr []string `yaml:"addr"`
}

type Kafka struct {
Address []string `yaml:"address"`
}

type Domain struct {
Name string `yaml:"name"`
}
Expand Down
3 changes: 3 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ SeConfig:
- "./"
MergeChannelSize:

kafka:
address:
- 127.0.0.1:9200

domain:
user:
Expand Down
4 changes: 4 additions & 0 deletions consts/search_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ const (
EngineBufSize = 10000
ForwardCountInitValue = "0"
)

const (
KafkaCrawlTopic = "kafka-crawl-topic"
)
19 changes: 19 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,24 @@ services:
- elasticsearch
ports:
- "5601:5601"
networks:
- search_engine

zookeeper:
image: zookeeper
restart: on-failure
ports:
- "2181:2181"
networks:
- search_engine

kafka:
image: wurstmeister/kafka
restart: on-failure
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
networks:
- search_engine
23 changes: 20 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/xtgo/set v1.0.0
go.etcd.io/bbolt v1.3.7
go.etcd.io/etcd/client/v3 v3.5.9
golang.org/x/crypto v0.9.0
golang.org/x/crypto v0.12.0
gonum.org/v1/gonum v0.13.0
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
Expand All @@ -29,9 +29,14 @@ require (
)

require (
github.com/IBM/sarama v1.41.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
Expand All @@ -41,14 +46,24 @@ require (
github.com/goccy/go-json v0.10.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/securecookie v1.1.1 // indirect
github.com/gorilla/sessions v1.2.1 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand All @@ -60,6 +75,8 @@ require (
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/samber/lo v1.38.1 // indirect
github.com/smartystreets/goconvey v1.8.0 // indirect
github.com/spf13/afero v1.9.3 // indirect
Expand All @@ -76,9 +93,9 @@ require (
go.uber.org/zap v1.21.0 // indirect
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/text v0.12.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
58 changes: 58 additions & 0 deletions go.sum

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions pkg/kfk/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package kfk

import (
"github.com/IBM/sarama"
)

func KafkaConsumer(topic string) (msg <-chan *sarama.ConsumerMessage, err error) {
consumer, err := sarama.NewConsumerFromClient(GobalKafka)
if err != nil {
return
}
partition := int32(-1)
partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
if err != nil {
return
}

return partitionConsumer.Messages(), nil
}
36 changes: 36 additions & 0 deletions pkg/kfk/crawl/sync_crawl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package crawl

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/CocaineCong/tangseng/consts"
"github.com/CocaineCong/tangseng/pkg/kfk"
)

type SyncCrawl struct {
}

func (s *SyncCrawl) RunSyncCrawl(ctx context.Context) (err error) {
topic := consts.KafkaCrawlTopic
msgs, err := kfk.KafkaConsumer(topic)
if err != nil {
return
}

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

for {
select {
case msg := <-msgs:
fmt.Println(msg)

case <-sigs:
return
}
}
}
17 changes: 17 additions & 0 deletions pkg/kfk/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package kfk

import (
"github.com/IBM/sarama"

"github.com/CocaineCong/tangseng/config"
)

var GobalKafka sarama.Client

func InitKafka() {
kafkaClient, err := sarama.NewClient(config.Conf.Kafka.Address, nil)
if err != nil {
return
}
GobalKafka = kafkaClient
}
36 changes: 36 additions & 0 deletions pkg/kfk/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package kfk

import (
"fmt"

"github.com/IBM/sarama"
)

func KafkaProducer(topic, msg string) (err error) {
producer, err := sarama.NewSyncProducerFromClient(GobalKafka)
if err != nil {
return
}
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(msg),
}
partition, offset, err := producer.SendMessage(message)
if err != nil {
return
}
fmt.Println(offset, partition)
return
}

func KafkaProducers(messages []*sarama.ProducerMessage) (err error) {
producer, err := sarama.NewSyncProducerFromClient(GobalKafka)
if err != nil {
return
}
err = producer.SendMessages(messages)
if err != nil {
return
}
return
}

0 comments on commit ca227a1

Please sign in to comment.