forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka-console-producer.go
123 lines (105 loc) · 3.71 KB
/
kafka-console-producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"github.com/Shopify/sarama"
"github.com/rcrowley/go-metrics"
)
var (
brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster. You can also set the KAFKA_PEERS environment variable")
topic = flag.String("topic", "", "REQUIRED: the topic to produce to")
key = flag.String("key", "", "The key of the message to produce. Can be empty.")
value = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.")
partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`")
partition = flag.Int("partition", -1, "The partition to produce to.")
verbose = flag.Bool("verbose", false, "Turn on sarama logging to stderr")
showMetrics = flag.Bool("metrics", false, "Output metrics on successful publish to stderr")
silent = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout")
logger = log.New(os.Stderr, "", log.LstdFlags)
)
func main() {
flag.Parse()
if *brokerList == "" {
printUsageErrorAndExit("no -brokers specified. Alternatively, set the KAFKA_PEERS environment variable")
}
if *topic == "" {
printUsageErrorAndExit("no -topic specified")
}
if *verbose {
sarama.Logger = logger
}
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
switch *partitioner {
case "":
if *partition >= 0 {
config.Producer.Partitioner = sarama.NewManualPartitioner
} else {
config.Producer.Partitioner = sarama.NewHashPartitioner
}
case "hash":
config.Producer.Partitioner = sarama.NewHashPartitioner
case "random":
config.Producer.Partitioner = sarama.NewRandomPartitioner
case "manual":
config.Producer.Partitioner = sarama.NewManualPartitioner
if *partition == -1 {
printUsageErrorAndExit("-partition is required when partitioning manually")
}
default:
printUsageErrorAndExit(fmt.Sprintf("Partitioner %s not supported.", *partitioner))
}
message := &sarama.ProducerMessage{Topic: *topic, Partition: int32(*partition)}
if *key != "" {
message.Key = sarama.StringEncoder(*key)
}
if *value != "" {
message.Value = sarama.StringEncoder(*value)
} else if stdinAvailable() {
bytes, err := ioutil.ReadAll(os.Stdin)
if err != nil {
printErrorAndExit(66, "Failed to read data from the standard input: %s", err)
}
message.Value = sarama.ByteEncoder(bytes)
} else {
printUsageErrorAndExit("-value is required, or you have to provide the value on stdin")
}
producer, err := sarama.NewSyncProducer(strings.Split(*brokerList, ","), config)
if err != nil {
printErrorAndExit(69, "Failed to open Kafka producer: %s", err)
}
defer func() {
if err := producer.Close(); err != nil {
logger.Println("Failed to close Kafka producer cleanly:", err)
}
}()
partition, offset, err := producer.SendMessage(message)
if err != nil {
printErrorAndExit(69, "Failed to produce message: %s", err)
} else if !*silent {
fmt.Printf("topic=%s\tpartition=%d\toffset=%d\n", *topic, partition, offset)
}
if *showMetrics {
metrics.WriteOnce(config.MetricRegistry, os.Stderr)
}
}
func printErrorAndExit(code int, format string, values ...interface{}) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
fmt.Fprintln(os.Stderr)
os.Exit(code)
}
func printUsageErrorAndExit(message string) {
fmt.Fprintln(os.Stderr, "ERROR:", message)
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Available command line options:")
flag.PrintDefaults()
os.Exit(64)
}
func stdinAvailable() bool {
stat, _ := os.Stdin.Stat()
return (stat.Mode() & os.ModeCharDevice) == 0
}