Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Io jms fix ack message checkpoint #22932

Merged
merged 25 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
81257d6
[BEAM-11828] => Fix read message queue implementation
rvballada Apr 26, 2022
6c8cdbb
Merge branch 'apache:master' into io_jms_fix_ack_message_checkpoint
rvballada Apr 26, 2022
d5d9bcd
[BEAM-11828] => New implementation to fix acknowledgment
rvballada Jun 3, 2022
c5ab8ef
[BEAM-11828] => Some refactoring (remove drainMessage method)
rvballada Jun 21, 2022
e93c31a
Merge branch 'apache:master' into io_jms_fix_ack_message_checkpoint
rvballada Aug 25, 2022
3d3ce77
Merge branch 'apache:master' into io_jms_fix_ack_message_checkpoint
rvballada Aug 29, 2022
d16d154
Update CHANGES.md
rvballada Aug 29, 2022
5b14193
Pull request review:
rvballada Sep 29, 2022
0a83c78
Pull request review:
rvballada Sep 30, 2022
8158363
Adding some unit tests to new JmsCHeckpointMark discard method
rvballada Oct 4, 2022
490b849
Code review: discard checkpoint and clear messages at beginning of fi…
rvballada Oct 14, 2022
b16e9f6
Throw an IllegalStateException when adding message when checkpoint is…
rvballada Oct 14, 2022
4b1d6ad
Change closeTimeout from long to Duration
rvballada Oct 14, 2022
aa24d45
CHeck that closeTimeout is non negative
rvballada Oct 14, 2022
5e3c4be
Merge branch 'apache:master' into io_jms_fix_ack_message_checkpoint
rvballada Oct 17, 2022
d91bc23
Add private package fields to perform testd
rvballada Oct 17, 2022
803b964
Code review: update comment
rvballada Oct 18, 2022
602cb2c
Code review: add comment
rvballada Oct 18, 2022
b7bb897
Code review: remove empty space
rvballada Oct 18, 2022
497b191
Code review: use ExecutorOptions to get an instance of ShceduleExecut…
rvballada Oct 18, 2022
c3888bb
Code review: avoid Thread.sleep with ExecutorService.awaitTermination…
rvballada Oct 18, 2022
c5dd19f
Apply suggestions from code review
lukecwik Oct 18, 2022
0a8c633
Apply suggestions from code review
lukecwik Oct 18, 2022
a8df80b
Apply suggestions from code review
lukecwik Oct 18, 2022
2611331
Update sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsI…
lukecwik Oct 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

## Bugfixes

* Fixed JmsIO acknowledgment issue (https://github.com/apache/beam/issues/20814)
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
## Known Issues

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.Message;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -41,13 +42,20 @@ class JmsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable
private Instant oldestMessageTimestamp = Instant.now();
private transient List<Message> messages = new ArrayList<>();

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@VisibleForTesting transient boolean discarded = false;

@VisibleForTesting final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

JmsCheckpointMark() {}

void add(Message message) throws Exception {
lock.writeLock().lock();
try {
if (discarded) {
throw new IllegalStateException(
String.format(
"Attempting to add message %s to checkpoint that is discarded.", message));
}
Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
oldestMessageTimestamp = currentMessageTimestamp;
Expand All @@ -67,6 +75,15 @@ Instant getOldestMessageTimestamp() {
}
rvballada marked this conversation as resolved.
Show resolved Hide resolved
}

void discard() {
lock.writeLock().lock();
try {
this.discarded = true;
} finally {
lock.writeLock().unlock();
}
}

/**
* Acknowledge all outstanding message. Since we believe that messages will be delivered in
* timestamp order, and acknowledged messages will not be retried, the newest message in this
Expand All @@ -76,6 +93,10 @@ Instant getOldestMessageTimestamp() {
public void finalizeCheckpoint() {
lock.writeLock().lock();
try {
if (discarded) {
messages.clear();
return;
}
for (Message message : messages) {
try {
message.acknowledge();
Expand All @@ -98,6 +119,7 @@ private void readObject(java.io.ObjectInputStream stream)
throws IOException, ClassNotFoundException {
stream.defaultReadObject();
messages = new ArrayList<>();
discarded = false;
}

@Override
Expand Down
111 changes: 94 additions & 17 deletions sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
Expand All @@ -45,6 +47,7 @@
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -123,11 +126,13 @@
public class JmsIO {

private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.millis(60000L);

public static Read<JmsRecord> read() {
return new AutoValue_JmsIO_Read.Builder<JmsRecord>()
.setMaxNumRecords(Long.MAX_VALUE)
.setCoder(SerializableCoder.of(JmsRecord.class))
.setCloseTimeout(DEFAULT_CLOSE_TIMEOUT)
.setMessageMapper(
(MessageMapper<JmsRecord>)
new MessageMapper<JmsRecord>() {
Expand Down Expand Up @@ -162,7 +167,10 @@ public JmsRecord mapMessage(Message message) throws Exception {
}

public static <T> Read<T> readMessage() {
return new AutoValue_JmsIO_Read.Builder<T>().setMaxNumRecords(Long.MAX_VALUE).build();
return new AutoValue_JmsIO_Read.Builder<T>()
.setMaxNumRecords(Long.MAX_VALUE)
.setCloseTimeout(DEFAULT_CLOSE_TIMEOUT)
.build();
}

public static <EventT> Write<EventT> write() {
Expand Down Expand Up @@ -206,6 +214,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>

abstract @Nullable AutoScaler getAutoScaler();

abstract Duration getCloseTimeout();

abstract Builder<T> builder();

@AutoValue.Builder
Expand All @@ -230,6 +240,8 @@ abstract static class Builder<T> {

abstract Builder<T> setAutoScaler(AutoScaler autoScaler);

abstract Builder<T> setCloseTimeout(Duration closeTimeout);

abstract Read<T> build();
}

Expand Down Expand Up @@ -364,6 +376,18 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
return builder().setAutoScaler(autoScaler).build();
}

/**
* Sets the amount of time to wait for callbacks from the runner stating that the output has
* been durably persisted before closing the connection to the JMS broker. Any callbacks that do
* not occur will cause any unacknowledged messages to be returned to the JMS broker and
* redelivered to other clients.
lukecwik marked this conversation as resolved.
Show resolved Hide resolved
*/
public Read<T> withCloseTimeout(Duration closeTimeout) {
checkArgument(closeTimeout != null, "closeTimeout can not be null");
checkArgument(closeTimeout.getMillis() >= 0, "Close timeout must be non-negative.");
return builder().setCloseTimeout(closeTimeout).build();
rvballada marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public PCollection<T> expand(PBegin input) {
checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required");
Expand Down Expand Up @@ -446,7 +470,7 @@ public List<UnboundedJmsSource<T>> split(int desiredNumSplits, PipelineOptions o
@Override
public UnboundedJmsReader<T> createReader(
PipelineOptions options, JmsCheckpointMark checkpointMark) {
return new UnboundedJmsReader<T>(this, checkpointMark);
return new UnboundedJmsReader<T>(this, options);
}

@Override
Expand All @@ -471,15 +495,13 @@ static class UnboundedJmsReader<T> extends UnboundedReader<T> {

private T currentMessage;
private Instant currentTimestamp;
private PipelineOptions options;

public UnboundedJmsReader(UnboundedJmsSource<T> source, JmsCheckpointMark checkpointMark) {
public UnboundedJmsReader(UnboundedJmsSource<T> source, PipelineOptions options) {
this.source = source;
if (checkpointMark != null) {
this.checkpointMark = checkpointMark;
} else {
this.checkpointMark = new JmsCheckpointMark();
}
this.checkpointMark = new JmsCheckpointMark();
this.currentMessage = null;
this.options = options;
}

@Override
Expand Down Expand Up @@ -582,29 +604,84 @@ public long getTotalBacklogBytes() {
}

@Override
public void close() throws IOException {
public void close() {
doClose();
}

@SuppressWarnings("FutureReturnValueIgnored")
private void doClose() {

try {
if (consumer != null) {
consumer.close();
consumer = null;
closeAutoscaler();
closeConsumer();
ScheduledExecutorService executorService =
options.as(ExecutorOptions.class).getScheduledExecutorService();
executorService.schedule(
() -> {
LOG.debug(
"Closing session and connection after delay {}", source.spec.getCloseTimeout());
// Discard the checkpoints and set the reader as inactive
checkpointMark.discard();
closeSession();
closeConnection();
},
source.spec.getCloseTimeout().getMillis(),
TimeUnit.MILLISECONDS);
rvballada marked this conversation as resolved.
Show resolved Hide resolved

} catch (Exception e) {
LOG.error("Error closing reader", e);
}
}

private void closeConnection() {
try {
if (connection != null) {
connection.stop();
connection.close();
connection = null;
}
} catch (Exception e) {
LOG.error("Error closing connection", e);
}
}

private void closeSession() {
try {
if (session != null) {
session.close();
session = null;
}
if (connection != null) {
connection.stop();
connection.close();
connection = null;
} catch (Exception e) {
LOG.error("Error closing session" + e.getMessage(), e);
}
}

private void closeConsumer() {
try {
if (consumer != null) {
consumer.close();
consumer = null;
}
} catch (Exception e) {
LOG.error("Error closing consumer", e);
}
}

private void closeAutoscaler() {
try {
if (autoScaler != null) {
autoScaler.stop();
autoScaler = null;
}
} catch (Exception e) {
throw new IOException(e);
LOG.error("Error closing autoscaler", e);
}
}

@Override
protected void finalize() {
doClose();
}
}

/**
Expand Down
Loading