-
Notifications
You must be signed in to change notification settings - Fork 384
/
DemoProducerNewJava.java
94 lines (77 loc) · 3.33 KB
/
DemoProducerNewJava.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.shapira.examples.producer.simplecounter;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class DemoProducerNewJava implements DemoProducer {
String topic;
String sync;
private Properties kafkaProps = new Properties();
private Producer<String, String> producer;
public DemoProducerNewJava(String topic) {
this.topic = topic;
}
@Override
public void configure(String brokerList, String sync) {
this.sync = sync;
kafkaProps.put("bootstrap.servers", brokerList);
// This is mandatory, even though we don't send keys
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("acks", "1");
// how many times to retry when produce request fails?
kafkaProps.put("retries", "3");
kafkaProps.put("linger.ms", 5);
}
@Override
public void start() {
producer = new KafkaProducer<String, String>(kafkaProps);
}
@Override
public void produce(String value) throws ExecutionException, InterruptedException {
if (sync.equals("sync"))
produceSync(value);
else if (sync.equals("async"))
produceAsync(value);
else throw new IllegalArgumentException("Expected sync or async, got " + sync);
}
@Override
public void close() {
producer.close();
}
/* Produce a record and wait for server to reply. Throw an exception if something goes wrong */
private void produceSync(String value) throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, value);
producer.send(record).get();
}
/* Produce a record without waiting for server. This includes a callback that will print an error if something goes wrong */
private void produceAsync(String value) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, value);
producer.send(record, new DemoProducerCallback());
}
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println("Error producing to topic " + recordMetadata.topic());
e.printStackTrace();
}
}
}
}