From 81257d63297b67418abc7ec340c9b1a12a7d02c0 Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Tue, 26 Apr 2022 10:49:34 +0200 Subject: [PATCH 01/21] [BEAM-11828] => Fix read message queue implementation --- .../jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 9fa4492cf235..9830d74953a1 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -22,6 +22,7 @@ import com.google.auto.value.AutoValue; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; @@ -463,6 +464,7 @@ static class UnboundedJmsReader extends UnboundedReader { private T currentMessage; private Instant currentTimestamp; + private final java.util.Queue wait4cpQueue; public UnboundedJmsReader(UnboundedJmsSource source, JmsCheckpointMark checkpointMark) { this.source = source; @@ -472,6 +474,7 @@ public UnboundedJmsReader(UnboundedJmsSource source, JmsCheckpointMark checkp this.checkpointMark = new JmsCheckpointMark(); } this.currentMessage = null; + wait4cpQueue = new ArrayDeque<>(); } @Override @@ -525,7 +528,7 @@ public boolean advance() throws IOException { currentMessage = null; return false; } - + wait4cpQueue.add(message); checkpointMark.add(message); currentMessage = this.source.spec.getMessageMapper().mapMessage(message); From d5d9bcddc3729dc21f76b22c5d52b2c40907bc85 Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Fri, 3 Jun 2022 14:30:03 +0200 Subject: [PATCH 02/21] [BEAM-11828] => New implementation to fix acknowledgment --- .../beam/sdk/io/jms/JmsCheckpointMark.java | 79 ++++------ .../org/apache/beam/sdk/io/jms/JmsIO.java | 137 +++++++++++++----- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 3 +- 3 files changed, 131 insertions(+), 88 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java index 0d4a60d29385..94280738f66e 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java @@ -21,8 +21,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import java.util.Objects; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.jms.JMSException; import javax.jms.Message; import org.apache.beam.sdk.io.UnboundedSource; import org.checkerframework.checker.nullness.qual.Nullable; @@ -38,33 +37,21 @@ class JmsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable private static final Logger LOG = LoggerFactory.getLogger(JmsCheckpointMark.class); - private Instant oldestMessageTimestamp = Instant.now(); - private transient List messages = new ArrayList<>(); + private transient JmsIO.UnboundedJmsReader reader; + private transient List messagesToAck; + private final int readerHash; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - - JmsCheckpointMark() {} - - void add(Message message) throws Exception { - lock.writeLock().lock(); - try { - Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); - if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) { - oldestMessageTimestamp = currentMessageTimestamp; - } - messages.add(message); - } finally { - lock.writeLock().unlock(); - } + JmsCheckpointMark(JmsIO.UnboundedJmsReader reader, @Nullable List messagesToAck) { + this.reader = reader; + this.messagesToAck = messagesToAck; + this.readerHash = System.identityHashCode(reader); } - Instant getOldestMessageTimestamp() { - lock.readLock().lock(); - try { - return this.oldestMessageTimestamp; - } finally { - lock.readLock().unlock(); - } + // set an empty list to messages when deserialize + private void readObject(java.io.ObjectInputStream stream) + throws IOException, ClassNotFoundException { + stream.defaultReadObject(); + messagesToAck = new ArrayList<>(); } /** @@ -73,47 +60,37 @@ Instant getOldestMessageTimestamp() { * batch is a good bound for future messages. */ @Override - public void finalizeCheckpoint() { - lock.writeLock().lock(); + public void finalizeCheckpoint() throws IOException { try { - for (Message message : messages) { - try { - message.acknowledge(); - Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); - if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) { - oldestMessageTimestamp = currentMessageTimestamp; - } - } catch (Exception e) { - LOG.error("Exception while finalizing message: ", e); - } - } - messages.clear(); - } finally { - lock.writeLock().unlock(); + LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size()); + drainMessages(); + } catch (JMSException e) { + throw new IOException("Exception while finalizing message ", e); } } - // set an empty list to messages when deserialize - private void readObject(java.io.ObjectInputStream stream) - throws IOException, ClassNotFoundException { - stream.defaultReadObject(); - messages = new ArrayList<>(); + protected void drainMessages() throws JMSException { + for (Message message : messagesToAck) { + message.acknowledge(); + Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); + reader.watermark.updateAndGet(prev -> Math.max(currentMessageTimestamp.getMillis(), prev)); + } } @Override - public boolean equals(@Nullable Object o) { + public boolean equals(Object o) { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { + if (!(o instanceof JmsCheckpointMark)) { return false; } JmsCheckpointMark that = (JmsCheckpointMark) o; - return oldestMessageTimestamp.equals(that.oldestMessageTimestamp); + return readerHash == that.readerHash; } @Override public int hashCode() { - return Objects.hash(oldestMessageTimestamp); + return readerHash; } } diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index d26c55e4806f..882c7e9fc8f1 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -22,17 +22,21 @@ import com.google.auto.value.AutoValue; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -58,6 +62,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -447,7 +452,7 @@ public List> split(int desiredNumSplits, PipelineOptions o @Override public UnboundedJmsReader createReader( PipelineOptions options, JmsCheckpointMark checkpointMark) { - return new UnboundedJmsReader(this, checkpointMark); + return new UnboundedJmsReader(this); } @Override @@ -464,25 +469,24 @@ public Coder getOutputCoder() { static class UnboundedJmsReader extends UnboundedReader { private UnboundedJmsSource source; - private JmsCheckpointMark checkpointMark; private Connection connection; private Session session; private MessageConsumer consumer; private AutoScaler autoScaler; private T currentMessage; + private Message currentJmsMessage; private Instant currentTimestamp; - private final java.util.Queue wait4cpQueue; - public UnboundedJmsReader(UnboundedJmsSource source, JmsCheckpointMark checkpointMark) { + Set messagesToAck; + AtomicBoolean active = new AtomicBoolean(true); + AtomicLong watermark = new AtomicLong(0); + + public UnboundedJmsReader(UnboundedJmsSource source) { this.source = source; - if (checkpointMark != null) { - this.checkpointMark = checkpointMark; - } else { - this.checkpointMark = new JmsCheckpointMark(); - } this.currentMessage = null; - wait4cpQueue = new ArrayDeque<>(); + this.messagesToAck = new HashSet<>(); + watermark.getAndSet(System.currentTimeMillis()); } @Override @@ -504,6 +508,7 @@ public boolean start() throws IOException { this.autoScaler = spec.getAutoScaler(); } this.autoScaler.start(); + } catch (Exception e) { throw new IOException("Error connecting to JMS", e); } @@ -523,28 +528,32 @@ public boolean start() throws IOException { } catch (Exception e) { throw new IOException("Error creating JMS consumer", e); } + this.active.set(true); return advance(); } @Override public boolean advance() throws IOException { - try { - Message message = this.consumer.receiveNoWait(); + if (active.get()) { + try { + Message message = this.consumer.receiveNoWait(); - if (message == null) { - currentMessage = null; - return false; + if (message == null) { + currentMessage = null; + return false; + } + currentJmsMessage = message; + messagesToAck.add(message); + currentMessage = this.source.spec.getMessageMapper().mapMessage(message); + currentTimestamp = new Instant(message.getJMSTimestamp()); + return true; + + } catch (Exception e) { + throw new IOException(e); } - wait4cpQueue.add(message); - checkpointMark.add(message); - - currentMessage = this.source.spec.getMessageMapper().mapMessage(message); - currentTimestamp = new Instant(message.getJMSTimestamp()); - - return true; - } catch (Exception e) { - throw new IOException(e); + } else { + return false; } } @@ -558,7 +567,10 @@ public T getCurrent() throws NoSuchElementException { @Override public Instant getWatermark() { - return checkpointMark.getOldestMessageTimestamp(); + if (watermark == null) { + return new Instant(0); + } + return new Instant(watermark.get()); } @Override @@ -571,7 +583,9 @@ public Instant getCurrentTimestamp() { @Override public CheckpointMark getCheckpointMark() { - return checkpointMark; + List msgToAcks = Lists.newArrayList(messagesToAck); + messagesToAck.clear(); + return new JmsCheckpointMark(this, msgToAcks); } @Override @@ -586,28 +600,81 @@ public long getTotalBacklogBytes() { @Override public void close() throws IOException { + active.set(false); + maybeCloseClient(); + } + + void maybeCloseClient() throws IOException { try { - if (consumer != null) { - consumer.close(); - consumer = null; - } - if (session != null) { - session.close(); - session = null; + doClose(); + } catch (Exception e) { + throw new IOException(e); + } + } + + private void doClose() { + if (currentJmsMessage != null) { + try { + currentJmsMessage.acknowledge(); + } catch (JMSException e) { + LOG.error("Impossible to acknowledge last message", e); } + } + closeAutoscaler(); + closeConsumer(); + closeSession(); + closeConnection(); + } + + private void closeConnection() { + try { if (connection != null) { connection.stop(); connection.close(); connection = null; } + } catch (Exception e) { + LOG.error("Error closing connection", e); + } + } + + private void closeSession() { + try { + if (session != null) { + session.close(); + session = null; + } + } catch (Exception e) { + LOG.error("Error closing session", e); + } + } + + private void closeConsumer() { + try { + if (consumer != null) { + consumer.close(); + consumer = null; + } + } catch (Exception e) { + LOG.error("Error closing consumer", e); + } + } + + private void closeAutoscaler() { + try { if (autoScaler != null) { autoScaler.stop(); autoScaler = null; } } catch (Exception e) { - throw new IOException(e); + LOG.error("Error closing autoscaler", e); } } + + @Override + protected void finalize() throws Throwable { + doClose(); + } } /** diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 2fc816b0f595..4fd7b4c4216a 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -504,8 +504,7 @@ public void testCheckpointMarkSafety() throws Exception { /** Test the checkpoint mark default coder, which is actually AvroCoder. */ @Test public void testCheckpointMarkDefaultCoder() throws Exception { - JmsCheckpointMark jmsCheckpointMark = new JmsCheckpointMark(); - jmsCheckpointMark.add(new ActiveMQMessage()); + JmsCheckpointMark jmsCheckpointMark = new JmsCheckpointMark(null, null); Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder(); CoderProperties.coderSerializable(coder); CoderProperties.coderDecodeEncodeEqual(coder, jmsCheckpointMark); From c5ab8efc9f60f4db59355d778f54c4a10341cc8b Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Tue, 21 Jun 2022 10:22:18 +0200 Subject: [PATCH 03/21] [BEAM-11828] => Some refactoring (remove drainMessage method) --- .../beam/sdk/io/jms/JmsCheckpointMark.java | 19 +++++++++--------- .../org/apache/beam/sdk/io/jms/JmsIO.java | 20 ++++++++++++------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java index 94280738f66e..894ef76b9c86 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java @@ -63,17 +63,18 @@ private void readObject(java.io.ObjectInputStream stream) public void finalizeCheckpoint() throws IOException { try { LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size()); - drainMessages(); + if (reader.active.get() && reader != null) { + for (Message message : messagesToAck) { + message.acknowledge(); + Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); + reader.watermark.updateAndGet( + prev -> Math.max(currentMessageTimestamp.getMillis(), prev)); + } + } } catch (JMSException e) { throw new IOException("Exception while finalizing message ", e); - } - } - - protected void drainMessages() throws JMSException { - for (Message message : messagesToAck) { - message.acknowledge(); - Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); - reader.watermark.updateAndGet(prev -> Math.max(currentMessageTimestamp.getMillis(), prev)); + } finally { + reader = null; } } diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 882c7e9fc8f1..d3ee9694a2f0 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -604,15 +604,25 @@ public void close() throws IOException { maybeCloseClient(); } - void maybeCloseClient() throws IOException { + private void maybeCloseClient() throws IOException { try { - doClose(); + if (!active.get()) { + doClose(); + } } catch (Exception e) { throw new IOException(e); } } private void doClose() { + acknowledgeLastMessage(); + closeAutoscaler(); + closeConsumer(); + closeSession(); + closeConnection(); + } + + private void acknowledgeLastMessage() { if (currentJmsMessage != null) { try { currentJmsMessage.acknowledge(); @@ -620,10 +630,6 @@ private void doClose() { LOG.error("Impossible to acknowledge last message", e); } } - closeAutoscaler(); - closeConsumer(); - closeSession(); - closeConnection(); } private void closeConnection() { @@ -672,7 +678,7 @@ private void closeAutoscaler() { } @Override - protected void finalize() throws Throwable { + protected void finalize() { doClose(); } } From d16d154ad70a75019eaf4cb0ad072cb0804c7a50 Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Mon, 29 Aug 2022 16:16:33 +0200 Subject: [PATCH 04/21] Update CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 76f765104525..9c1c3014e9b5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -43,6 +43,7 @@ ## Bugfixes +* Fixed JmsIO acknowledgment issue (https://github.com/apache/beam/issues/20814) * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Known Issues From 5b14193716bfe4bbfa6f5f98f99272c589f49876 Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Thu, 29 Sep 2022 10:07:11 +0200 Subject: [PATCH 05/21] Pull request review: - Upgrade equals and hashcode method in JmsCheckpointMark - Add a serciveExecutor.schedule method in order to close the JMS session after a tiemout and discard all the related checkpointd --- .../beam/sdk/io/jms/JmsCheckpointMark.java | 8 +- .../org/apache/beam/sdk/io/jms/JmsIO.java | 98 ++++++++++--------- 2 files changed, 55 insertions(+), 51 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java index 894ef76b9c86..dc3d9e182e0d 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java @@ -39,12 +39,10 @@ class JmsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable private transient JmsIO.UnboundedJmsReader reader; private transient List messagesToAck; - private final int readerHash; JmsCheckpointMark(JmsIO.UnboundedJmsReader reader, @Nullable List messagesToAck) { this.reader = reader; this.messagesToAck = messagesToAck; - this.readerHash = System.identityHashCode(reader); } // set an empty list to messages when deserialize @@ -68,7 +66,7 @@ public void finalizeCheckpoint() throws IOException { message.acknowledge(); Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); reader.watermark.updateAndGet( - prev -> Math.max(currentMessageTimestamp.getMillis(), prev)); + prev -> Math.min(currentMessageTimestamp.getMillis(), prev)); } } } catch (JMSException e) { @@ -87,11 +85,11 @@ public boolean equals(Object o) { return false; } JmsCheckpointMark that = (JmsCheckpointMark) o; - return readerHash == that.readerHash; + return reader == that.reader; } @Override public int hashCode() { - return readerHash; + return System.identityHashCode(reader); } } diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 39318371487f..be8f38870bea 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -30,13 +30,15 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -129,11 +131,13 @@ public class JmsIO { private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class); + private static final long DEFAULT_CLOSE_TIMEOUT = 1000L; public static Read read() { return new AutoValue_JmsIO_Read.Builder() .setMaxNumRecords(Long.MAX_VALUE) .setCoder(SerializableCoder.of(JmsRecord.class)) + .setCloseTimeout(DEFAULT_CLOSE_TIMEOUT) .setMessageMapper( (MessageMapper) new MessageMapper() { @@ -168,7 +172,10 @@ public JmsRecord mapMessage(Message message) throws Exception { } public static Read readMessage() { - return new AutoValue_JmsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build(); + return new AutoValue_JmsIO_Read.Builder() + .setMaxNumRecords(Long.MAX_VALUE) + .setCloseTimeout(DEFAULT_CLOSE_TIMEOUT) + .build(); } public static Write write() { @@ -212,6 +219,8 @@ public abstract static class Read extends PTransform> abstract @Nullable AutoScaler getAutoScaler(); + abstract long getCloseTimeout(); + abstract Builder builder(); @AutoValue.Builder @@ -236,6 +245,8 @@ abstract static class Builder { abstract Builder setAutoScaler(AutoScaler autoScaler); + abstract Builder setCloseTimeout(long closeTimeout); + abstract Read build(); } @@ -370,6 +381,10 @@ public Read withAutoScaler(AutoScaler autoScaler) { return builder().setAutoScaler(autoScaler).build(); } + public Read withCloseTimeout(long closeTimeout) { + return builder().setCloseTimeout(closeTimeout).build(); + } + @Override public PCollection expand(PBegin input) { checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required"); @@ -475,7 +490,6 @@ static class UnboundedJmsReader extends UnboundedReader { private AutoScaler autoScaler; private T currentMessage; - private Message currentJmsMessage; private Instant currentTimestamp; Set messagesToAck; @@ -535,25 +549,20 @@ public boolean start() throws IOException { @Override public boolean advance() throws IOException { - if (active.get()) { - try { - Message message = this.consumer.receiveNoWait(); - - if (message == null) { - currentMessage = null; - return false; - } - currentJmsMessage = message; - messagesToAck.add(message); - currentMessage = this.source.spec.getMessageMapper().mapMessage(message); - currentTimestamp = new Instant(message.getJMSTimestamp()); - return true; + try { + Message message = this.consumer.receiveNoWait(); - } catch (Exception e) { - throw new IOException(e); + if (message == null) { + currentMessage = null; + return false; } - } else { - return false; + messagesToAck.add(message); + currentMessage = this.source.spec.getMessageMapper().mapMessage(message); + currentTimestamp = new Instant(message.getJMSTimestamp()); + return true; + + } catch (Exception e) { + throw new IOException(e); } } @@ -599,35 +608,32 @@ public long getTotalBacklogBytes() { } @Override - public void close() throws IOException { - active.set(false); - maybeCloseClient(); - } - - private void maybeCloseClient() throws IOException { - try { - if (!active.get()) { - doClose(); - } - } catch (Exception e) { - throw new IOException(e); - } + public void close() { + doClose(); } + @SuppressWarnings("FutureReturnValueIgnored") private void doClose() { - acknowledgeLastMessage(); - closeAutoscaler(); - closeConsumer(); - closeSession(); - closeConnection(); - } - - private void acknowledgeLastMessage() { - if (currentJmsMessage != null) { + if (active.get()) { try { - currentJmsMessage.acknowledge(); - } catch (JMSException e) { - LOG.error("Impossible to acknowledge last message", e); + closeAutoscaler(); + closeConsumer(); + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + executorService.schedule( + () -> { + LOG.debug( + "Closing session and connection after delay {}", source.spec.getCloseTimeout()); + // Discard the checkpoints + active.set(false); + closeSession(); + closeConnection(); + }, + source.spec.getCloseTimeout(), + TimeUnit.MILLISECONDS); + + } catch (Exception e) { + LOG.error("Error closing reader", e); } } } @@ -651,7 +657,7 @@ private void closeSession() { session = null; } } catch (Exception e) { - LOG.error("Error closing session", e); + LOG.error("Error closing session" + e.getMessage(), e); } } From 0a83c78e61d56581552d38c985ceaed729d18d53 Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Fri, 30 Sep 2022 18:48:07 +0200 Subject: [PATCH 06/21] Pull request review: - Get back to the initial implementation of JmsCheckpointMark - Add the discard attribute and discard() method to JmsCheckpointMark --- .../beam/sdk/io/jms/JmsCheckpointMark.java | 89 +++++++++++++------ .../org/apache/beam/sdk/io/jms/JmsIO.java | 72 ++++++--------- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 2 +- 3 files changed, 92 insertions(+), 71 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java index dc3d9e182e0d..4f26d7cb4cbd 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java @@ -21,7 +21,8 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import javax.jms.JMSException; +import java.util.Objects; +import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.jms.Message; import org.apache.beam.sdk.io.UnboundedSource; import org.checkerframework.checker.nullness.qual.Nullable; @@ -37,19 +38,43 @@ class JmsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable private static final Logger LOG = LoggerFactory.getLogger(JmsCheckpointMark.class); - private transient JmsIO.UnboundedJmsReader reader; - private transient List messagesToAck; + private Instant oldestMessageTimestamp = Instant.now(); + private transient List messages = new ArrayList<>(); + private transient boolean discarded = false; - JmsCheckpointMark(JmsIO.UnboundedJmsReader reader, @Nullable List messagesToAck) { - this.reader = reader; - this.messagesToAck = messagesToAck; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + JmsCheckpointMark() {} + + void add(Message message) throws Exception { + lock.writeLock().lock(); + try { + Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); + if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) { + oldestMessageTimestamp = currentMessageTimestamp; + } + messages.add(message); + } finally { + lock.writeLock().unlock(); + } } - // set an empty list to messages when deserialize - private void readObject(java.io.ObjectInputStream stream) - throws IOException, ClassNotFoundException { - stream.defaultReadObject(); - messagesToAck = new ArrayList<>(); + Instant getOldestMessageTimestamp() { + lock.readLock().lock(); + try { + return this.oldestMessageTimestamp; + } finally { + lock.readLock().unlock(); + } + } + + void discard() { + lock.writeLock().lock(); + try { + this.discarded = true; + } finally { + lock.writeLock().unlock(); + } } /** @@ -58,38 +83,50 @@ private void readObject(java.io.ObjectInputStream stream) * batch is a good bound for future messages. */ @Override - public void finalizeCheckpoint() throws IOException { + public void finalizeCheckpoint() { + lock.writeLock().lock(); try { - LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size()); - if (reader.active.get() && reader != null) { - for (Message message : messagesToAck) { - message.acknowledge(); - Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); - reader.watermark.updateAndGet( - prev -> Math.min(currentMessageTimestamp.getMillis(), prev)); + if (!discarded) { + for (Message message : messages) { + try { + message.acknowledge(); + Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); + if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) { + oldestMessageTimestamp = currentMessageTimestamp; + } + } catch (Exception e) { + LOG.error("Exception while finalizing message: ", e); + } } } - } catch (JMSException e) { - throw new IOException("Exception while finalizing message ", e); + messages.clear(); } finally { - reader = null; + lock.writeLock().unlock(); } } + // set an empty list to messages when deserialize + private void readObject(java.io.ObjectInputStream stream) + throws IOException, ClassNotFoundException { + stream.defaultReadObject(); + messages = new ArrayList<>(); + discarded = false; + } + @Override - public boolean equals(Object o) { + public boolean equals(@Nullable Object o) { if (this == o) { return true; } - if (!(o instanceof JmsCheckpointMark)) { + if (o == null || getClass() != o.getClass()) { return false; } JmsCheckpointMark that = (JmsCheckpointMark) o; - return reader == that.reader; + return oldestMessageTimestamp.equals(that.oldestMessageTimestamp); } @Override public int hashCode() { - return System.identityHashCode(reader); + return Objects.hash(oldestMessageTimestamp); } } diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index be8f38870bea..afd339854044 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -25,16 +25,12 @@ import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -64,7 +60,6 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -131,7 +126,7 @@ public class JmsIO { private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class); - private static final long DEFAULT_CLOSE_TIMEOUT = 1000L; + private static final long DEFAULT_CLOSE_TIMEOUT = 60000L; public static Read read() { return new AutoValue_JmsIO_Read.Builder() @@ -484,6 +479,7 @@ public Coder getOutputCoder() { static class UnboundedJmsReader extends UnboundedReader { private UnboundedJmsSource source; + private JmsCheckpointMark checkpointMark; private Connection connection; private Session session; private MessageConsumer consumer; @@ -492,15 +488,10 @@ static class UnboundedJmsReader extends UnboundedReader { private T currentMessage; private Instant currentTimestamp; - Set messagesToAck; - AtomicBoolean active = new AtomicBoolean(true); - AtomicLong watermark = new AtomicLong(0); - public UnboundedJmsReader(UnboundedJmsSource source) { this.source = source; + this.checkpointMark = new JmsCheckpointMark(); this.currentMessage = null; - this.messagesToAck = new HashSet<>(); - watermark.getAndSet(System.currentTimeMillis()); } @Override @@ -522,7 +513,6 @@ public boolean start() throws IOException { this.autoScaler = spec.getAutoScaler(); } this.autoScaler.start(); - } catch (Exception e) { throw new IOException("Error connecting to JMS", e); } @@ -542,7 +532,6 @@ public boolean start() throws IOException { } catch (Exception e) { throw new IOException("Error creating JMS consumer", e); } - this.active.set(true); return advance(); } @@ -556,11 +545,13 @@ public boolean advance() throws IOException { currentMessage = null; return false; } - messagesToAck.add(message); + + checkpointMark.add(message); + currentMessage = this.source.spec.getMessageMapper().mapMessage(message); currentTimestamp = new Instant(message.getJMSTimestamp()); - return true; + return true; } catch (Exception e) { throw new IOException(e); } @@ -576,10 +567,7 @@ public T getCurrent() throws NoSuchElementException { @Override public Instant getWatermark() { - if (watermark == null) { - return new Instant(0); - } - return new Instant(watermark.get()); + return checkpointMark.getOldestMessageTimestamp(); } @Override @@ -592,9 +580,7 @@ public Instant getCurrentTimestamp() { @Override public CheckpointMark getCheckpointMark() { - List msgToAcks = Lists.newArrayList(messagesToAck); - messagesToAck.clear(); - return new JmsCheckpointMark(this, msgToAcks); + return checkpointMark; } @Override @@ -614,27 +600,25 @@ public void close() { @SuppressWarnings("FutureReturnValueIgnored") private void doClose() { - if (active.get()) { - try { - closeAutoscaler(); - closeConsumer(); - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - - executorService.schedule( - () -> { - LOG.debug( - "Closing session and connection after delay {}", source.spec.getCloseTimeout()); - // Discard the checkpoints - active.set(false); - closeSession(); - closeConnection(); - }, - source.spec.getCloseTimeout(), - TimeUnit.MILLISECONDS); - - } catch (Exception e) { - LOG.error("Error closing reader", e); - } + + try { + closeAutoscaler(); + closeConsumer(); + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + executorService.schedule( + () -> { + LOG.debug( + "Closing session and connection after delay {}", source.spec.getCloseTimeout()); + // Discard the checkpoints and set the reader as inactive + checkpointMark.discard(); + closeSession(); + closeConnection(); + }, + source.spec.getCloseTimeout(), + TimeUnit.MILLISECONDS); + + } catch (Exception e) { + LOG.error("Error closing reader", e); } } diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index b6abc1bc0caf..bf79371a828e 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -504,7 +504,7 @@ public void testCheckpointMarkSafety() throws Exception { /** Test the checkpoint mark default coder, which is actually AvroCoder. */ @Test public void testCheckpointMarkDefaultCoder() throws Exception { - JmsCheckpointMark jmsCheckpointMark = new JmsCheckpointMark(null, null); + JmsCheckpointMark jmsCheckpointMark = new JmsCheckpointMark(); Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder(); CoderProperties.coderSerializable(coder); CoderProperties.coderDecodeEncodeEqual(coder, jmsCheckpointMark); From 81583637841f9401fa177c0b1c4ac863ee1aeb12 Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Tue, 4 Oct 2022 09:37:08 +0200 Subject: [PATCH 07/21] Adding some unit tests to new JmsCHeckpointMark discard method --- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index bf79371a828e..7697cff7cd9d 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -31,6 +32,7 @@ import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.Field; import java.lang.reflect.Proxy; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -554,6 +556,101 @@ public void testCustomAutoscaler() throws IOException { verify(autoScaler, times(1)).stop(); } + @Test + public void testCloseWithTimeout() + throws IOException, NoSuchFieldException, IllegalAccessException { + + int closeTimeout = 2000; + JmsIO.Read spec = + JmsIO.read() + .withConnectionFactory(connectionFactory) + .withUsername(USERNAME) + .withPassword(PASSWORD) + .withQueue(QUEUE) + .withCloseTimeout(closeTimeout); + + JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec); + JmsIO.UnboundedJmsReader reader = source.createReader(null, null); + + reader.start(); + reader.close(); + + boolean discarded = getDiscardedValue(reader); + assertFalse(discarded); + try { + Thread.sleep(closeTimeout + 1000); + } catch (InterruptedException ignored) { + } + discarded = getDiscardedValue(reader); + assertTrue(discarded); + } + + @Test + public void testDiscardCheckpointMark() throws Exception { + + Connection connection = + connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME, PASSWORD); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE)); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("test " + i)); + } + producer.close(); + session.close(); + connection.close(); + + JmsIO.Read spec = + JmsIO.read() + .withConnectionFactory(connectionFactoryWithSyncAcksAndWithoutPrefetch) + .withUsername(USERNAME) + .withPassword(PASSWORD) + .withQueue(QUEUE); + JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec); + JmsIO.UnboundedJmsReader reader = source.createReader(null, null); + + // start the reader and move to the first record + assertTrue(reader.start()); + + // consume 3 messages (NB: start already consumed the first message) + for (int i = 0; i < 3; i++) { + assertTrue(reader.advance()); + } + + // the messages are still pending in the queue (no ACK yet) + assertEquals(10, count(QUEUE)); + + // we finalize the checkpoint + reader.getCheckpointMark().finalizeCheckpoint(); + + // the checkpoint finalize ack the messages, and so they are not pending in the queue anymore + assertEquals(6, count(QUEUE)); + + // we read the 6 pending messages + for (int i = 0; i < 6; i++) { + assertTrue(reader.advance()); + } + + // still 6 pending messages as we didn't finalize the checkpoint + assertEquals(6, count(QUEUE)); + + // But here we discard the checkpoint + ((JmsCheckpointMark) reader.getCheckpointMark()).discard(); + // we finalize the checkpoint: no more message in the queue + reader.getCheckpointMark().finalizeCheckpoint(); + + assertEquals(6, count(QUEUE)); + } + + private boolean getDiscardedValue(JmsIO.UnboundedJmsReader reader) + throws NoSuchFieldException, IllegalAccessException { + JmsCheckpointMark checkpoint = (JmsCheckpointMark) reader.getCheckpointMark(); + Field privateField = JmsCheckpointMark.class.getDeclaredField("discarded"); + privateField.setAccessible(true); + boolean discarded = (boolean) privateField.get(checkpoint); + return discarded; + } + private int count(String queue) throws Exception { Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); connection.start(); From 490b849292cae90103204c6ceddb2cbae2f8919f Mon Sep 17 00:00:00 2001 From: rvballada Date: Fri, 14 Oct 2022 09:18:05 +0200 Subject: [PATCH 08/21] Code review: discard checkpoint and clear messages at beginning of finalizeCheckpoint method Co-authored-by: Lukasz Cwik --- .../beam/sdk/io/jms/JmsCheckpointMark.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java index 4f26d7cb4cbd..76044faa705a 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java @@ -86,16 +86,19 @@ void discard() { public void finalizeCheckpoint() { lock.writeLock().lock(); try { - if (!discarded) { - for (Message message : messages) { - try { - message.acknowledge(); - Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); - if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) { - oldestMessageTimestamp = currentMessageTimestamp; - } - } catch (Exception e) { - LOG.error("Exception while finalizing message: ", e); + if (discarded) { + messages.clear(); + return; + } + for (Message message : messages) { + try { + message.acknowledge(); + Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); + if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) { + oldestMessageTimestamp = currentMessageTimestamp; + } + } catch (Exception e) { + LOG.error("Exception while finalizing message: ", e); } } } From b16e9f6bf1cb6b88538329875d7b569c7e51a948 Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Fri, 14 Oct 2022 09:30:30 +0200 Subject: [PATCH 09/21] Throw an IllegalStateException when adding message when checkpoint is discarded --- .../java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java index 76044faa705a..0a8ffbb86b64 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java @@ -49,6 +49,11 @@ class JmsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable void add(Message message) throws Exception { lock.writeLock().lock(); try { + if (discarded) { + throw new IllegalStateException( + String.format( + "Attempting to add message %s to checkpoint that is discarded.", message)); + } Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) { oldestMessageTimestamp = currentMessageTimestamp; @@ -99,7 +104,6 @@ public void finalizeCheckpoint() { } } catch (Exception e) { LOG.error("Exception while finalizing message: ", e); - } } } messages.clear(); From 4b1d6ada4b3d4e9cfd01828880c1ea22c7a76818 Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Fri, 14 Oct 2022 09:57:10 +0200 Subject: [PATCH 10/21] Change closeTimeout from long to Duration --- .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 16 +++++++++++----- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 5 +++-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index afd339854044..4f2e2143a917 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -126,7 +126,7 @@ public class JmsIO { private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class); - private static final long DEFAULT_CLOSE_TIMEOUT = 60000L; + private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.millis(60000L); public static Read read() { return new AutoValue_JmsIO_Read.Builder() @@ -214,7 +214,7 @@ public abstract static class Read extends PTransform> abstract @Nullable AutoScaler getAutoScaler(); - abstract long getCloseTimeout(); + abstract Duration getCloseTimeout(); abstract Builder builder(); @@ -240,7 +240,7 @@ abstract static class Builder { abstract Builder setAutoScaler(AutoScaler autoScaler); - abstract Builder setCloseTimeout(long closeTimeout); + abstract Builder setCloseTimeout(Duration closeTimeout); abstract Read build(); } @@ -376,7 +376,13 @@ public Read withAutoScaler(AutoScaler autoScaler) { return builder().setAutoScaler(autoScaler).build(); } - public Read withCloseTimeout(long closeTimeout) { + /** + * Sets the amount of time to wait for callbacks from the runner stating that the output has + * been durably persisted before closing the connection to the JMS broker. Any callbacks that do + * not occur will cause any unacknowledged messages to be returned to the JMS broker and + * redelivered to other clients. + */ + public Read withCloseTimeout(Duration closeTimeout) { return builder().setCloseTimeout(closeTimeout).build(); } @@ -614,7 +620,7 @@ private void doClose() { closeSession(); closeConnection(); }, - source.spec.getCloseTimeout(), + source.spec.getCloseTimeout().getMillis(), TimeUnit.MILLISECONDS); } catch (Exception e) { diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 7697cff7cd9d..10f30f0c98f6 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -69,6 +69,7 @@ import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables; +import org.joda.time.Duration; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -560,7 +561,7 @@ public void testCustomAutoscaler() throws IOException { public void testCloseWithTimeout() throws IOException, NoSuchFieldException, IllegalAccessException { - int closeTimeout = 2000; + Duration closeTimeout = Duration.millis(2000L); JmsIO.Read spec = JmsIO.read() .withConnectionFactory(connectionFactory) @@ -578,7 +579,7 @@ public void testCloseWithTimeout() boolean discarded = getDiscardedValue(reader); assertFalse(discarded); try { - Thread.sleep(closeTimeout + 1000); + Thread.sleep(closeTimeout.getMillis() + 1000); } catch (InterruptedException ignored) { } discarded = getDiscardedValue(reader); From aa24d45621b0275da3a2c662eaec94ee87c260e0 Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Fri, 14 Oct 2022 10:24:14 +0200 Subject: [PATCH 11/21] CHeck that closeTimeout is non negative --- .../io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 4f2e2143a917..fa795edba53e 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -383,6 +383,8 @@ public Read withAutoScaler(AutoScaler autoScaler) { * redelivered to other clients. */ public Read withCloseTimeout(Duration closeTimeout) { + checkArgument(closeTimeout != null, "closeTimeout can not be null"); + checkArgument(closeTimeout.getMillis() >= 0, "Close timeout must be non-negative."); return builder().setCloseTimeout(closeTimeout).build(); } From d91bc235828d809f19fb3af8914ec7a843ab9ccb Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Mon, 17 Oct 2022 13:20:41 +0200 Subject: [PATCH 12/21] Add private package fields to perform testd --- .../beam/sdk/io/jms/JmsCheckpointMark.java | 6 +++-- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 23 +++++++++---------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java index 0a8ffbb86b64..244c3cbabb20 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java @@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.jms.Message; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.slf4j.Logger; @@ -40,9 +41,10 @@ class JmsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable private Instant oldestMessageTimestamp = Instant.now(); private transient List messages = new ArrayList<>(); - private transient boolean discarded = false; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + @VisibleForTesting transient boolean discarded = false; + + @VisibleForTesting final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); JmsCheckpointMark() {} diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 10f30f0c98f6..991c2469072e 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -32,7 +32,6 @@ import java.io.IOException; import java.io.Serializable; -import java.lang.reflect.Field; import java.lang.reflect.Proxy; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -558,8 +557,7 @@ public void testCustomAutoscaler() throws IOException { } @Test - public void testCloseWithTimeout() - throws IOException, NoSuchFieldException, IllegalAccessException { + public void testCloseWithTimeout() throws IOException { Duration closeTimeout = Duration.millis(2000L); JmsIO.Read spec = @@ -586,6 +584,16 @@ public void testCloseWithTimeout() assertTrue(discarded); } + private boolean getDiscardedValue(JmsIO.UnboundedJmsReader reader) { + JmsCheckpointMark checkpoint = (JmsCheckpointMark) reader.getCheckpointMark(); + checkpoint.lock.readLock().lock(); + try { + return checkpoint.discarded; + } finally { + checkpoint.lock.readLock().unlock(); + } + } + @Test public void testDiscardCheckpointMark() throws Exception { @@ -643,15 +651,6 @@ public void testDiscardCheckpointMark() throws Exception { assertEquals(6, count(QUEUE)); } - private boolean getDiscardedValue(JmsIO.UnboundedJmsReader reader) - throws NoSuchFieldException, IllegalAccessException { - JmsCheckpointMark checkpoint = (JmsCheckpointMark) reader.getCheckpointMark(); - Field privateField = JmsCheckpointMark.class.getDeclaredField("discarded"); - privateField.setAccessible(true); - boolean discarded = (boolean) privateField.get(checkpoint); - return discarded; - } - private int count(String queue) throws Exception { Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); connection.start(); From 803b964b57c2271fd5236ffab9ffbb44afeb6a06 Mon Sep 17 00:00:00 2001 From: rvballada Date: Tue, 18 Oct 2022 08:47:11 +0200 Subject: [PATCH 13/21] Code review: update comment Co-authored-by: Lukasz Cwik --- .../jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 991c2469072e..914e358f8ced 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -621,7 +621,7 @@ public void testDiscardCheckpointMark() throws Exception { // start the reader and move to the first record assertTrue(reader.start()); - // consume 3 messages (NB: start already consumed the first message) + // consume 3 more messages (NB: start already consumed the first message) for (int i = 0; i < 3; i++) { assertTrue(reader.advance()); } From 602cb2cdf778006f5fbec476024016f61292f99b Mon Sep 17 00:00:00 2001 From: rvballada Date: Tue, 18 Oct 2022 08:47:38 +0200 Subject: [PATCH 14/21] Code review: add comment Co-authored-by: Lukasz Cwik --- .../jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 914e358f8ced..71f0414e8399 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -645,7 +645,7 @@ public void testDiscardCheckpointMark() throws Exception { // But here we discard the checkpoint ((JmsCheckpointMark) reader.getCheckpointMark()).discard(); - // we finalize the checkpoint: no more message in the queue + // we finalize the checkpoint: no messages should be acked reader.getCheckpointMark().finalizeCheckpoint(); assertEquals(6, count(QUEUE)); From b7bb8970b3d792feb5b11ca80f616cb470f80e5d Mon Sep 17 00:00:00 2001 From: rvballada Date: Tue, 18 Oct 2022 08:48:05 +0200 Subject: [PATCH 15/21] Code review: remove empty space Co-authored-by: Lukasz Cwik --- .../jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 71f0414e8399..3429e5fa11e8 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -596,7 +596,6 @@ private boolean getDiscardedValue(JmsIO.UnboundedJmsReader reader) { @Test public void testDiscardCheckpointMark() throws Exception { - Connection connection = connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME, PASSWORD); connection.start(); From 497b191a9a9fe26849fd70678f95a587626c1e58 Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Tue, 18 Oct 2022 09:22:34 +0200 Subject: [PATCH 16/21] Code review: use ExecutorOptions to get an instance of ShceduleExecutorService --- .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 11 +++++++---- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 18 ++++++++++++++++-- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index fa795edba53e..898eb1454c93 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -48,6 +47,7 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; +import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -470,7 +470,7 @@ public List> split(int desiredNumSplits, PipelineOptions o @Override public UnboundedJmsReader createReader( PipelineOptions options, JmsCheckpointMark checkpointMark) { - return new UnboundedJmsReader(this); + return new UnboundedJmsReader(this, options); } @Override @@ -495,11 +495,13 @@ static class UnboundedJmsReader extends UnboundedReader { private T currentMessage; private Instant currentTimestamp; + private PipelineOptions options; - public UnboundedJmsReader(UnboundedJmsSource source) { + public UnboundedJmsReader(UnboundedJmsSource source, PipelineOptions options) { this.source = source; this.checkpointMark = new JmsCheckpointMark(); this.currentMessage = null; + this.options = options; } @Override @@ -612,7 +614,8 @@ private void doClose() { try { closeAutoscaler(); closeConsumer(); - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + ScheduledExecutorService executorService = + options.as(ExecutorOptions.class).getScheduledExecutorService(); executorService.schedule( () -> { LOG.debug( diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 3429e5fa11e8..4ec47f10955d 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -37,6 +37,8 @@ import java.util.ArrayList; import java.util.Enumeration; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -58,6 +60,7 @@ import org.apache.activemq.util.Callback; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.CoderProperties; @@ -94,7 +97,8 @@ public class JmsIOTest { private ConnectionFactory connectionFactory; private ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch; - @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule + public final transient TestPipeline pipeline = TestPipeline.fromOptions(createExecutorOptions()); @Before public void startBroker() throws Exception { @@ -569,7 +573,10 @@ public void testCloseWithTimeout() throws IOException { .withCloseTimeout(closeTimeout); JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec); - JmsIO.UnboundedJmsReader reader = source.createReader(null, null); + + ExecutorOptions options = createExecutorOptions(); + + JmsIO.UnboundedJmsReader reader = source.createReader(options, null); reader.start(); reader.close(); @@ -584,6 +591,13 @@ public void testCloseWithTimeout() throws IOException { assertTrue(discarded); } + private ExecutorOptions createExecutorOptions() { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + ExecutorOptions options = PipelineOptionsFactory.create().as(ExecutorOptions.class); + options.setScheduledExecutorService(executorService); + return options; + } + private boolean getDiscardedValue(JmsIO.UnboundedJmsReader reader) { JmsCheckpointMark checkpoint = (JmsCheckpointMark) reader.getCheckpointMark(); checkpoint.lock.readLock().lock(); From c3888bbd19c9129e959d672f6582e998c3622a97 Mon Sep 17 00:00:00 2001 From: "vincent.ballada" Date: Tue, 18 Oct 2022 13:50:55 +0200 Subject: [PATCH 17/21] Code review: avoid Thread.sleep with ExecutorService.awaitTermination (don't know if it is better) --- .../src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 4ec47f10955d..cc522b8b1696 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -564,6 +565,7 @@ public void testCustomAutoscaler() throws IOException { public void testCloseWithTimeout() throws IOException { Duration closeTimeout = Duration.millis(2000L); + long waitTimeout = closeTimeout.getMillis() + 1000L; JmsIO.Read spec = JmsIO.read() .withConnectionFactory(connectionFactory) @@ -584,7 +586,7 @@ public void testCloseWithTimeout() throws IOException { boolean discarded = getDiscardedValue(reader); assertFalse(discarded); try { - Thread.sleep(closeTimeout.getMillis() + 1000); + options.getScheduledExecutorService().awaitTermination(waitTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException ignored) { } discarded = getDiscardedValue(reader); From c5dd19f1ed8d47591d7929d1172eb0f4b4f4bb04 Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Tue, 18 Oct 2022 10:35:59 -0700 Subject: [PATCH 18/21] Apply suggestions from code review --- .../main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 4 ++-- .../java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 10 +++------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 898eb1454c93..c77c70820b27 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -379,8 +379,8 @@ public Read withAutoScaler(AutoScaler autoScaler) { /** * Sets the amount of time to wait for callbacks from the runner stating that the output has * been durably persisted before closing the connection to the JMS broker. Any callbacks that do - * not occur will cause any unacknowledged messages to be returned to the JMS broker and - * redelivered to other clients. + * not occur will cause unacknowledged messages to be returned to the JMS broker and redelivered + * to other clients. */ public Read withCloseTimeout(Duration closeTimeout) { checkArgument(closeTimeout != null, "closeTimeout can not be null"); diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index cc522b8b1696..fb8df27ccf83 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -98,8 +98,7 @@ public class JmsIOTest { private ConnectionFactory connectionFactory; private ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch; - @Rule - public final transient TestPipeline pipeline = TestPipeline.fromOptions(createExecutorOptions()); + @Rule public final transient TestPipeline pipeline = TestPipeline.fromOptions(); @Before public void startBroker() throws Exception { @@ -563,7 +562,6 @@ public void testCustomAutoscaler() throws IOException { @Test public void testCloseWithTimeout() throws IOException { - Duration closeTimeout = Duration.millis(2000L); long waitTimeout = closeTimeout.getMillis() + 1000L; JmsIO.Read spec = @@ -583,14 +581,12 @@ public void testCloseWithTimeout() throws IOException { reader.start(); reader.close(); - boolean discarded = getDiscardedValue(reader); - assertFalse(discarded); + assertFalse(getDiscardedValue(reader)); try { options.getScheduledExecutorService().awaitTermination(waitTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException ignored) { } - discarded = getDiscardedValue(reader); - assertTrue(discarded); + assertTrue(getDiscardedValue(reader)); } private ExecutorOptions createExecutorOptions() { From 0a8c6330fc8e9978e150216c3bbb66d95b8a8cd4 Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Tue, 18 Oct 2022 13:25:10 -0700 Subject: [PATCH 19/21] Apply suggestions from code review --- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index fb8df27ccf83..526f4cc24030 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -25,6 +25,10 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -37,7 +41,6 @@ import java.util.ArrayList; import java.util.Enumeration; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -76,6 +79,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Rule; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -98,7 +103,7 @@ public class JmsIOTest { private ConnectionFactory connectionFactory; private ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch; - @Rule public final transient TestPipeline pipeline = TestPipeline.fromOptions(); + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @Before public void startBroker() throws Exception { @@ -563,7 +568,6 @@ public void testCustomAutoscaler() throws IOException { @Test public void testCloseWithTimeout() throws IOException { Duration closeTimeout = Duration.millis(2000L); - long waitTimeout = closeTimeout.getMillis() + 1000L; JmsIO.Read spec = JmsIO.read() .withConnectionFactory(connectionFactory) @@ -574,26 +578,25 @@ public void testCloseWithTimeout() throws IOException { JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec); - ExecutorOptions options = createExecutorOptions(); + ScheduledExecutorService mockScheduledExecutorService = + Mockito.mock(ScheduledExecutorService.class); + ExecutorOptions options = PipelineOptionsFactory.as(ExecutorOptions.class); + options.setScheduledExecutorService(mockScheduledExecutorService); + ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); + when(mockScheduledExecutorService.schedule( + runnableArgumentCaptor.capture(), anyLong(), any(TimeUnit.class))) + .thenReturn(null /* unused */); JmsIO.UnboundedJmsReader reader = source.createReader(options, null); - - reader.start(); + reader.start(); + assertFalse(getDiscardedValue(reader)); reader.close(); - assertFalse(getDiscardedValue(reader)); - try { - options.getScheduledExecutorService().awaitTermination(waitTimeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignored) { - } + verify(mockScheduledExecutorService) + .schedule(any(Runnable.class), eq(closeTimeout.getMillis()), eq(TimeUnit.MILLISECONDS)); + runnableArgumentCaptor.getValue().run(); assertTrue(getDiscardedValue(reader)); - } - - private ExecutorOptions createExecutorOptions() { - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - ExecutorOptions options = PipelineOptionsFactory.create().as(ExecutorOptions.class); - options.setScheduledExecutorService(executorService); - return options; + verifyNoMoreInteractions(mockScheduledExecutorService); } private boolean getDiscardedValue(JmsIO.UnboundedJmsReader reader) { From a8df80bb52030a71a1ad7d5575159c6caf833f6a Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Tue, 18 Oct 2022 13:26:20 -0700 Subject: [PATCH 20/21] Apply suggestions from code review --- .../src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 526f4cc24030..124032690ec1 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -28,10 +28,10 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.io.IOException; @@ -79,11 +79,11 @@ import org.junit.After; import org.junit.Before; import org.junit.Rule; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; /** Tests of {@link JmsIO}. */ @RunWith(JUnit4.class) From 26113315daa4a982005309b611af8f51bdec908e Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Tue, 18 Oct 2022 14:34:49 -0700 Subject: [PATCH 21/21] Update sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java --- .../jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 124032690ec1..1979d7b4ff60 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -588,7 +588,7 @@ public void testCloseWithTimeout() throws IOException { .thenReturn(null /* unused */); JmsIO.UnboundedJmsReader reader = source.createReader(options, null); - reader.start(); + reader.start(); assertFalse(getDiscardedValue(reader)); reader.close(); assertFalse(getDiscardedValue(reader));