From f41669f36546d8aa54935bf581f2f7975195f39c Mon Sep 17 00:00:00 2001 From: "scala-steward-asf[bot]" <147768647+scala-steward-asf[bot]@users.noreply.github.com> Date: Sun, 16 Jun 2024 00:04:19 +0000 Subject: [PATCH 1/3] Update scalafmt-core to 3.8.2 --- .scalafmt.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.scalafmt.conf b/.scalafmt.conf index fe495bf2145..ff3b34a3eb7 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,4 +1,4 @@ -version = 3.8.1 +version = 3.8.2 runner.dialect = scala213 project.git = true style = defaultWithAlign From 38b03829b4e38e983521f3efd7e589d315658f18 Mon Sep 17 00:00:00 2001 From: "scala-steward-asf[bot]" <147768647+scala-steward-asf[bot]@users.noreply.github.com> Date: Sun, 16 Jun 2024 00:05:16 +0000 Subject: [PATCH 2/3] Reformat with scalafmt 3.8.2 Executed command: scalafmt --non-interactive --- .../pekko/pattern/BackoffSupervisorSpec.scala | 9 +++------ .../apache/pekko/pattern/CircuitBreakerMTSpec.scala | 3 ++- .../org/apache/pekko/routing/TailChoppingSpec.scala | 3 ++- .../actor/typed/delivery/DurableProducerQueue.scala | 3 ++- .../delivery/internal/ProducerControllerImpl.scala | 3 ++- .../scala/org/apache/pekko/dispatch/Mailboxes.scala | 3 ++- .../scala/org/apache/pekko/event/EventStream.scala | 3 ++- .../pekko/io/dns/internal/AsyncDnsResolver.scala | 3 ++- .../metrics/ClusterMetricsExtensionSpec.scala | 13 +++---------- .../internal/ShardingProducerControllerImpl.scala | 3 ++- .../pekko/cluster/sharding/ShardCoordinator.scala | 3 ++- .../ddata/typed/internal/ReplicatorBehavior.scala | 3 ++- .../org/apache/pekko/cluster/ClusterDaemon.scala | 3 ++- .../PersistenceTestKitDurableStateStore.scala | 6 ++++-- .../pekko/persistence/journal/ReplayFilter.scala | 3 ++- project/SbtMultiJvmPlugin.scala | 3 ++- project/StreamOperatorsIndexGenerator.scala | 8 ++------ .../scala/org/apache/pekko/remote/Endpoint.scala | 3 ++- .../stream/impl/LinearTraversalBuilderSpec.scala | 3 +-- .../org/apache/pekko/stream/impl/io/TcpStages.scala | 3 ++- .../org/apache/pekko/stream/javadsl/Source.scala | 3 ++- .../org/apache/pekko/stream/scaladsl/Source.scala | 3 ++- 22 files changed, 47 insertions(+), 43 deletions(-) diff --git a/actor-tests/src/test/scala/org/apache/pekko/pattern/BackoffSupervisorSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/pattern/BackoffSupervisorSpec.scala index 92ac422e399..96df572af5c 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/pattern/BackoffSupervisorSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/pattern/BackoffSupervisorSpec.scala @@ -273,12 +273,9 @@ class BackoffSupervisorSpec extends PekkoSpec with ImplicitSender with Eventuall val delayTable = Table( ("restartCount", "minBackoff", "maxBackoff", "randomFactor", "expectedResult"), - (0, 0.minutes, 0.minutes, 0d, 0.minutes), - (0, 5.minutes, 7.minutes, 0d, 5.minutes), - (2, 5.seconds, 7.seconds, 0d, 7.seconds), - (2, 5.seconds, 7.days, 0d, 20.seconds), - (29, 5.minutes, 10.minutes, 0d, 10.minutes), - (29, 10000.days, 10000.days, 0d, 10000.days), + (0, 0.minutes, 0.minutes, 0d, 0.minutes), (0, 5.minutes, 7.minutes, 0d, 5.minutes), + (2, 5.seconds, 7.seconds, 0d, 7.seconds), (2, 5.seconds, 7.days, 0d, 20.seconds), + (29, 5.minutes, 10.minutes, 0d, 10.minutes), (29, 10000.days, 10000.days, 0d, 10000.days), (Int.MaxValue, 10000.days, 10000.days, 0d, 10000.days)) forAll(delayTable) { ( diff --git a/actor-tests/src/test/scala/org/apache/pekko/pattern/CircuitBreakerMTSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/pattern/CircuitBreakerMTSpec.scala index 1d82aa5a2f2..f1e83caad4f 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/pattern/CircuitBreakerMTSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/pattern/CircuitBreakerMTSpec.scala @@ -32,7 +32,8 @@ class CircuitBreakerMTSpec extends PekkoSpec { def openBreaker(breaker: CircuitBreaker): Unit = { // returns true if the breaker is open def failingCall(): Boolean = - Await.result(breaker.withCircuitBreaker(Future.failed(new RuntimeException("FAIL"))).recover { + Await.result( + breaker.withCircuitBreaker(Future.failed(new RuntimeException("FAIL"))).recover { case _: CircuitBreakerOpenException => true case _ => false }, remainingOrDefault) diff --git a/actor-tests/src/test/scala/org/apache/pekko/routing/TailChoppingSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/routing/TailChoppingSpec.scala index e22321f09e2..c9d996a6b08 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/routing/TailChoppingSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/routing/TailChoppingSpec.scala @@ -26,7 +26,8 @@ import pekko.testkit._ object TailChoppingSpec { def newActor(id: Int, sleepTime: Duration)(implicit system: ActorSystem) = - system.actorOf(Props(new Actor { + system.actorOf( + Props(new Actor { var times: Int = _ def receive = { diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/DurableProducerQueue.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/DurableProducerQueue.scala index 713b52e282e..cd69940cd81 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/DurableProducerQueue.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/DurableProducerQueue.scala @@ -174,7 +174,8 @@ object DurableProducerQueue { override def equals(obj: Any): Boolean = { obj match { case other: MessageSent[_] => - seqNr == other.seqNr && message == other.message && ack == other.ack && confirmationQualifier == other.confirmationQualifier && timestampMillis == other.timestampMillis + seqNr == other.seqNr && message == other.message && ack == other.ack && confirmationQualifier == other + .confirmationQualifier && timestampMillis == other.timestampMillis case _ => false } } diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala index 6358740f159..aab18c1e1aa 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala @@ -711,7 +711,8 @@ private class ProducerControllerImpl[A: ClassTag]( } def receiveSendChunk(): Behavior[InternalCommand] = { - if (s.remainingChunks.nonEmpty && s.remainingChunks.head.seqNr <= s.requestedSeqNr && s.storeMessageSentInProgress == 0) { + if (s.remainingChunks.nonEmpty && s.remainingChunks.head.seqNr <= s.requestedSeqNr && s + .storeMessageSentInProgress == 0) { if (traceEnabled) context.log.trace("Send next chunk seqNr [{}].", s.remainingChunks.head.seqNr) if (durableQueue.isEmpty) { diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Mailboxes.scala b/actor/src/main/scala/org/apache/pekko/dispatch/Mailboxes.scala index 9a7aa116711..428cba3c18b 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/Mailboxes.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/Mailboxes.scala @@ -194,7 +194,8 @@ private[pekko] class Mailboxes( if (deploy.mailbox != Deploy.NoMailboxGiven) { verifyRequirements(lookup(deploy.mailbox)) - } else if (deploy.dispatcher != Deploy.NoDispatcherGiven && deploy.dispatcher != Deploy.DispatcherSameAsParent && hasMailboxType) { + } else if (deploy.dispatcher != Deploy.NoDispatcherGiven && deploy.dispatcher != Deploy + .DispatcherSameAsParent && hasMailboxType) { verifyRequirements(lookup(dispatcherConfig.getString("id"))) } else if (hasRequiredType(actorClass)) { try verifyRequirements(lookupByQueueType(getRequiredType(actorClass))) diff --git a/actor/src/main/scala/org/apache/pekko/event/EventStream.scala b/actor/src/main/scala/org/apache/pekko/event/EventStream.scala index 93c63263515..90ace42e0e1 100644 --- a/actor/src/main/scala/org/apache/pekko/event/EventStream.scala +++ b/actor/src/main/scala/org/apache/pekko/event/EventStream.scala @@ -103,7 +103,8 @@ class EventStream(sys: ActorSystem, private val debug: Boolean) extends LoggingB Logging.Debug( simpleName(this), this.getClass, - "initialized unsubscriber to: " + unsubscriber + ", registering " + subscribers.size + " initial subscribers with it")) + "initialized unsubscriber to: " + unsubscriber + ", registering " + subscribers + .size + " initial subscribers with it")) subscribers.foreach(registerWithUnsubscriber) true } else { diff --git a/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala b/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala index 2086700d38f..853980be88c 100644 --- a/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala +++ b/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala @@ -252,7 +252,8 @@ private[pekko] object AsyncDnsResolver { """^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$""".r private val ipv6Address = - """^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$""".r + """^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$""" + .r private[pekko] def isIpv4Address(name: String): Boolean = ipv4Address.findAllMatchIn(name).nonEmpty diff --git a/cluster-metrics/src/test/scala/org/apache/pekko/cluster/metrics/ClusterMetricsExtensionSpec.scala b/cluster-metrics/src/test/scala/org/apache/pekko/cluster/metrics/ClusterMetricsExtensionSpec.scala index 29aa48dee75..f9532a2e7a7 100644 --- a/cluster-metrics/src/test/scala/org/apache/pekko/cluster/metrics/ClusterMetricsExtensionSpec.scala +++ b/cluster-metrics/src/test/scala/org/apache/pekko/cluster/metrics/ClusterMetricsExtensionSpec.scala @@ -70,16 +70,9 @@ class ClusterMetricsExtensionSpec val history = metricsView.metricsHistory.reverse.map { _.head } val expected = List( - (0.700, 0.000, 0.000), - (0.700, 0.018, 0.007), - (0.700, 0.051, 0.020), - (0.700, 0.096, 0.038), - (0.700, 0.151, 0.060), - (0.700, 0.214, 0.085), - (0.700, 0.266, 0.106), - (0.700, 0.309, 0.123), - (0.700, 0.343, 0.137), - (0.700, 0.372, 0.148)) + (0.700, 0.000, 0.000), (0.700, 0.018, 0.007), (0.700, 0.051, 0.020), (0.700, 0.096, 0.038), + (0.700, 0.151, 0.060), (0.700, 0.214, 0.085), (0.700, 0.266, 0.106), (0.700, 0.309, 0.123), + (0.700, 0.343, 0.137), (0.700, 0.372, 0.148)) expected.size should ===(sampleCount) diff --git a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala index c8805b008de..d30b1df016d 100644 --- a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala +++ b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala @@ -504,7 +504,8 @@ private class ShardingProducerControllerImpl[A: ClassTag]( s.out.flatMap { case (outKey: OutKey, outState) => val idleDurationMillis = (now - outState.usedNanoTime) / 1000 / 1000 - if (outState.unconfirmed.isEmpty && outState.buffered.isEmpty && idleDurationMillis >= settings.cleanupUnusedAfter.toMillis) { + if (outState.unconfirmed.isEmpty && outState.buffered.isEmpty && idleDurationMillis >= settings + .cleanupUnusedAfter.toMillis) { context.log.debug("Cleanup unused [{}], because it was idle for [{} ms]", outKey, idleDurationMillis) context.stop(outState.producerController) Some(outKey) diff --git a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala index 94573467118..f687553dd13 100644 --- a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala +++ b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala @@ -1651,7 +1651,8 @@ private[pekko] class DDataShardCoordinator( updateStateRetries += 1 val template = - s"$typeName: The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': ${stateWriteConsistency.timeout.toMillis} millis (${if (terminating) "terminating" + s"$typeName: The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': ${stateWriteConsistency + .timeout.toMillis} millis (${if (terminating) "terminating" else "retrying"}). Attempt $updateStateRetries. " + s"Perhaps the ShardRegion has not started on all active nodes yet? event=$evt" diff --git a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala index 06b670ec689..8f4bdfe8950 100644 --- a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala +++ b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala @@ -58,7 +58,8 @@ import pekko.util.Timeout def withState( subscribeAdapters: Map[ - ActorRef[JReplicator.SubscribeResponse[ReplicatedData]], ActorRef[dd.Replicator.SubscribeResponse[ + ActorRef[JReplicator.SubscribeResponse[ReplicatedData]], + ActorRef[dd.Replicator.SubscribeResponse[ ReplicatedData]]]): Behavior[SReplicator.Command] = { def stopSubscribeAdapter( diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala index e7f4fca5ddd..34320067be1 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala @@ -882,7 +882,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh def leaving(address: Address): Unit = { // only try to update if the node is available (in the member ring) latestGossip.members.find(_.address == address).foreach { existingMember => - if (existingMember.status == Joining || existingMember.status == WeaklyUp || existingMember.status == Up || existingMember.status == PreparingForShutdown || existingMember.status == ReadyForShutdown) { + if (existingMember.status == Joining || existingMember.status == WeaklyUp || existingMember + .status == Up || existingMember.status == PreparingForShutdown || existingMember.status == ReadyForShutdown) { // mark node as LEAVING val newMembers = latestGossip.members - existingMember + existingMember.copy(status = Leaving) val newGossip = latestGossip.copy(members = newMembers) diff --git a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala index 143cefd4489..9af65831d55 100644 --- a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala +++ b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala @@ -122,7 +122,8 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem) override def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], pekko.NotUsed] = this.synchronized { val currentGlobalOffset = lastGlobalOffset.get() - changes(tag, offset).takeWhile(_.offset match { + changes(tag, offset).takeWhile( + _.offset match { case Sequence(fromOffset) => fromOffset < currentGlobalOffset case offset => @@ -137,7 +138,8 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem) offset: Offset): Source[DurableStateChange[A], NotUsed] = this.synchronized { val currentGlobalOffset = lastGlobalOffset.get() - changesBySlices(entityType, minSlice, maxSlice, offset).takeWhile(_.offset match { + changesBySlices(entityType, minSlice, maxSlice, offset).takeWhile( + _.offset match { case Sequence(fromOffset) => fromOffset < currentGlobalOffset case offset => diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/journal/ReplayFilter.scala b/persistence/src/main/scala/org/apache/pekko/persistence/journal/ReplayFilter.scala index 7eeefd7df17..2ed0ac960e1 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/journal/ReplayFilter.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/journal/ReplayFilter.scala @@ -138,7 +138,8 @@ private[pekko] class ReplayFilter( if (msg.persistent.sequenceNr >= seqNo) { val errMsg = s"Invalid replayed event [sequenceNr=${r.persistent.sequenceNr}, writerUUID=${r.persistent.writerUuid}] from a new writer. " + - s"An older writer already sent an event [sequenceNr=${msg.persistent.sequenceNr}, writerUUID=${msg.persistent.writerUuid}] whose sequence number was equal or greater for the same persistenceId [${r.persistent.persistenceId}]. " + + s"An older writer already sent an event [sequenceNr=${msg.persistent.sequenceNr}, writerUUID=${msg.persistent + .writerUuid}] whose sequence number was equal or greater for the same persistenceId [${r.persistent.persistenceId}]. " + "Perhaps, the new writer journaled the event out of sequence, or duplicate persistenceId for different entities?" logIssue(errMsg) mode match { diff --git a/project/SbtMultiJvmPlugin.scala b/project/SbtMultiJvmPlugin.scala index ffd2febc964..7f270ae0c03 100644 --- a/project/SbtMultiJvmPlugin.scala +++ b/project/SbtMultiJvmPlugin.scala @@ -248,7 +248,8 @@ object MultiJvmPlugin extends AutoPlugin { .foreach(classpathFile => IO.copyFile(classpathFile, new File(multiRunCopiedClassDir, classpathFile.getName), true)) val cp = - directoryBasedClasspathEntries.absString + File.pathSeparator + multiRunCopiedClassDir.getAbsolutePath + File.separator + "*" + directoryBasedClasspathEntries.absString + File.pathSeparator + multiRunCopiedClassDir.getAbsolutePath + File + .separator + "*" (testClass: String) => { Seq("-cp", cp, runner, "-s", testClass) ++ options } } diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala index e2aada17694..6def2eef025 100644 --- a/project/StreamOperatorsIndexGenerator.scala +++ b/project/StreamOperatorsIndexGenerator.scala @@ -187,12 +187,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { .map(_.replaceAll("Mat$", "")) .map(method => (element, method)) } ++ List( - (noElement, "Partition"), - (noElement, "MergeSequence"), - (noElement, "Broadcast"), - (noElement, "Balance"), - (noElement, "Unzip"), - (noElement, "UnzipWith")) + (noElement, "Partition"), (noElement, "MergeSequence"), (noElement, "Broadcast"), (noElement, "Balance"), + (noElement, "Unzip"), (noElement, "UnzipWith")) val sourceAndFlow = defs.collect { case ("Source", method) => method }.intersect(defs.collect { case ("Flow", method) => method }) diff --git a/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala b/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala index ffcbef77cac..81f3c06d445 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala @@ -912,7 +912,8 @@ private[remote] class EndpointWriter( if (pduSize > transport.maximumPayloadBytes) { val reasonText = - s"Discarding oversized payload sent to ${s.recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${s.message.getClass} was ${pdu.size} bytes." + s"Discarding oversized payload sent to ${s.recipient}: max allowed size ${transport + .maximumPayloadBytes} bytes, actual size of encoded ${s.message.getClass} was ${pdu.size} bytes." log.error( new OversizedPayloadException(reasonText), "Transient association error (association remains live)") diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/LinearTraversalBuilderSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/LinearTraversalBuilderSpec.scala index e252cc0144e..cccb0868d97 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/LinearTraversalBuilderSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/LinearTraversalBuilderSpec.scala @@ -817,8 +817,7 @@ class LinearTraversalBuilderSpec extends PekkoSpec { mat.islandAssignments should ===( List( - (sink, Attributes.none, TestDefaultIsland), - (flow2, Attributes.none, TestDefaultIsland), + (sink, Attributes.none, TestDefaultIsland), (flow2, Attributes.none, TestDefaultIsland), (flow1, Attributes.name("island2"), TestIsland2), (source, Attributes.name("island2") and Attributes.name("island1"), TestIsland1))) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala index d9cfeabfa00..2054d5392fc 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala @@ -50,7 +50,8 @@ import pekko.util.ByteString val halfClose: Boolean, val idleTimeout: Duration, val bindShutdownTimeout: FiniteDuration) - extends GraphStageWithMaterializedValue[SourceShape[StreamTcp.IncomingConnection], Future[ + extends GraphStageWithMaterializedValue[SourceShape[StreamTcp.IncomingConnection], + Future[ StreamTcp.ServerBinding]] { import ConnectionSourceStage._ diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 10637adac0e..d3fbdfdfb63 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -546,7 +546,8 @@ object Source { */ @deprecated("Use variant accepting completion and failure matchers", "Akka 2.6.0") def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = - new Source(scaladsl.Source.actorRef({ + new Source(scaladsl.Source.actorRef( + { case pekko.actor.Status.Success(s: CompletionStrategy) => s case pekko.actor.Status.Success(_) => CompletionStrategy.Draining case pekko.actor.Status.Success => CompletionStrategy.Draining diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 623bd2a2fb8..a39a15df59b 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -710,7 +710,8 @@ object Source { */ @deprecated("Use variant accepting completion and failure matchers instead", "Akka 2.6.0") def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = - actorRef({ + actorRef( + { case pekko.actor.Status.Success(s: CompletionStrategy) => s case pekko.actor.Status.Success(_) => CompletionStrategy.Draining case pekko.actor.Status.Success => CompletionStrategy.Draining From 38b5309d8e383193dcb2f45f668b0723494feaae Mon Sep 17 00:00:00 2001 From: "scala-steward-asf[bot]" <147768647+scala-steward-asf[bot]@users.noreply.github.com> Date: Sun, 16 Jun 2024 00:05:16 +0000 Subject: [PATCH 3/3] Add 'Reformat with scalafmt 3.8.2' to .git-blame-ignore-revs --- .git-blame-ignore-revs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index 807c9747364..63318f3144b 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -73,3 +73,6 @@ bed11ccbb1bd6b31592e490c36d68cb428296b46 #sort imports of stream modules c44c0b7cbdab11d85176cfe062288fdcba16c56a + +# Scala Steward: Reformat with scalafmt 3.8.2 +38b03829b4e38e983521f3efd7e589d315658f18