forked from googleapis/java-pubsublite-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
.readme-partials.yaml
84 lines (72 loc) · 3 KB
/
.readme-partials.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
custom_content: |
#### Publishing messages
With Pub/Sub Lite, you can use a `Producer<byte[], byte[]>` to publish messages:
```java
import com.google.cloud.pubsublite.kafka.ProducerSettings;
import org.apache.kafka.clients.producer.*;
import com.google.cloud.pubsublite.*;
...
private final static String ZONE = "us-central1-b";
private final static Long PROJECT_NUM = 123L;
...
TopicPath topic = TopicPath.newBuilder()
.setLocation(CloudZone.parse(ZONE))
.setProject(ProjectNumber.of(PROJECT_NUM))
.setName(TopicName.of("my-topic")).build();
ProducerSettings settings = ProducerSettings.newBuilder()
.setTopicPath(topic)
.build();
try (Producer<byte[], byte[]> producer = settings.instantiate()) {
Future<RecordMetadata> sent = producer.send(new ProducerRecord(
topic.toString(), // Required to be the same topic.
"key".getBytes(),
"value".getBytes()
));
RecordMetadata meta = sent.get();
}
```
#### Receiving messages
With Pub/Sub Lite you can receive messages using a `Consumer<byte[], byte[]>`:
```java
import com.google.cloud.pubsublite.kafka.ConsumerSettings;
import org.apache.kafka.clients.consumer.*;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
...
private final static String ZONE = "us-central1-b";
private final static Long PROJECT_NUM = 123L;
...
SubscriptionPath subscription = SubscriptionPath.newBuilder()
.setLocation(CloudZone.parse(ZONE))
.setProject(ProjectNumber.of(PROJECT_NUM))
.setName(SubscriptionName.of("my-sub"))
.build();
ConsumerSettings settings = ConsumerSettings.newBuilder()
.setSubscriptionPath(subscription)
.setPerPartitionFlowControlSettings(FlowControlSettings.builder()
.setBytesOutstanding(10_000_000) // 10 MB
.setMessagesOutstanding(Long.MAX_VALUE)
.build())
.setAutocommit(true);
try (Consumer<byte[], byte[]> consumer = settings.instantiate()) {
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println(record.offset() + “: ” + record.value());
}
}
} catch (WakeupException e) {
// ignored
}
```
about: |
Because [Google Cloud Pub/Sub Lite][product-docs] provides partitioned zonal data storage with
predefined capacity, a large portion of the Kafka Producer/Consumer API can
be implemented using Pub/Sub Lite as a backend. The key differences are:
- Pub/Sub Lite does not support transactions. All transaction methods on
`Producer<byte[], byte[]>` will raise an exception.
- Producers operate on a single topic, and Consumers on a single subscription.
- ProducerRecord may not specify partition explicitly.
- Consumers may not dynamically create consumer groups (subscriptions).
- `Consumer.offsetsForTimes` and `Consumer.endOffsets` will raise an
exception.