Skip to content

Commit

Permalink
Merge branch 'master' into 3253_azure_storage
Browse files Browse the repository at this point in the history
  • Loading branch information
sfali committed Sep 30, 2024
2 parents d6d9ed0 + 8932e8f commit 1c9c3cf
Show file tree
Hide file tree
Showing 247 changed files with 1,652 additions and 1,141 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check-build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ jobs:
- { connector: hdfs, pre_cmd: 'file ${HOME}/.cache/coursier/v1/https/repo1.maven.org/maven2/org/typelevel/cats-kernel_2.13/2.0.0/cats-kernel_2.13-2.0.0.jar' }
- { connector: huawei-push-kit }
- { connector: influxdb, pre_cmd: 'docker compose up -d influxdb' }
- { connector: ironmq, pre_cmd: 'docker compose up -d ironauth ironmq' }
# - { connector: ironmq, pre_cmd: 'docker compose up -d ironauth ironmq' }
- { connector: jakarta-jms }
- { connector: jms, pre_cmd: 'docker compose up -d ibmmq' }
- { connector: json-streaming }
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/link-validator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ jobs:
run: sbt docs/makeSite

- name: Run Link Validator
run: cs launch net.runne::site-link-validator:0.2.4 -- scripts/link-validator.conf
run: cs launch net.runne::site-link-validator:0.2.5 -- scripts/link-validator.conf
4 changes: 2 additions & 2 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ updates.ignore = [
]

updates.pin = [
{ groupId = "com.fasterxml.jackson.core", version = "2.15." }
{ groupId = "com.fasterxml.jackson.datatype", version = "2.15." }
{ groupId = "com.fasterxml.jackson.core", version = "2.17." }
{ groupId = "com.fasterxml.jackson.datatype", version = "2.17." }
// v10 switches to Play 3
{ groupId = "com.github.jwt-scala", version = "9.4." }
]
8 changes: 0 additions & 8 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@ We follow the standard GitHub [fork & pull](https://help.github.com/articles/usi

You're always welcome to submit your PR straight away and start the discussion (without reading the rest of this wonderful doc, or the README.md). The goal of these notes is to make your experience contributing to Alpakka as smooth and pleasant as possible. We're happy to guide you through the process once you've submitted your PR.

# The Akka Community

In case of questions about the contribution process or for discussion of specific issues please visit the [akka/dev gitter chat](https://gitter.im/akka/dev).

You may also check out these [other resources](https://akka.io/get-involved/).

# Contributing to Alpakka

## Development Setup
Expand Down Expand Up @@ -44,8 +38,6 @@ This is the process for committing code into main.

1. If the branch merge conflicts with its target, rebase your branch onto the target branch.

In case of questions about the contribution process or for discussion of specific issues please visit the [akka/dev gitter chat](https://gitter.im/akka/dev).


## Alpakka specific advice

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ If you find an issue that you'd like to see fixed, the quickest way to make that

Refer to the [CONTRIBUTING.md](CONTRIBUTING.md) file for more details about the workflow, and general hints on how to prepare your pull request. If you're planning to implement a new module within Alpakka, look at our [contributor advice](contributor-advice.md).

You can also ask for clarifications or guidance in GitHub issues directly, or in the [akka/dev](https://gitter.im/akka/dev) chat if a more real time communication would be of benefit.
You can also ask for clarifications or guidance in GitHub issues directly.

Caveat Emptor
-------------
Expand All @@ -58,4 +58,4 @@ License
-------
Alpakka is licensed under the [Business Source License (BSL) 1.1](https://github.com/akka/alpakka/blob/main/LICENSE), please see the [Akka License FAQ](https://www.lightbend.com/akka/license-faq).

Tests and documentation are under a separate license, see the LICENSE file in each documentation and test root directory for details.
Tests and documentation are under a separate license, see the LICENSE file in each documentation and test root directory for details.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import javax.net.ssl.{SSLContext, TrustManager}

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

/**
* Only for internal implementations
Expand Down Expand Up @@ -121,7 +121,7 @@ final class AmqpDetailsConnectionProvider private (
copy(connectionName = Option(name))

override def get: Connection = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
val factory = new ConnectionFactory
credentials.foreach { credentials =>
factory.setUsername(credentials.username)
Expand Down Expand Up @@ -331,7 +331,6 @@ final class AmqpConnectionFactoryConnectionProvider private (val factory: Connec
copy(hostAndPorts = hostAndPorts.asScala.map(_.toScala).toIndexedSeq)

override def get: Connection = {
import scala.collection.JavaConverters._
factory.newConnection(hostAndPortList.map(hp => new Address(hp._1, hp._2)).asJava)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
package akka.stream.alpakka.amqp

import akka.annotation.InternalApi
import akka.util.JavaDurationConverters._

import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._

/**
* Internal API
Expand Down Expand Up @@ -221,8 +221,9 @@ final class AmqpWriteSettings private (
/**
* Java API
*/
def withConfirmationTimeout(confirmationTimeout: java.time.Duration): AmqpWriteSettings =
copy(confirmationTimeout = confirmationTimeout.asScala)
def withConfirmationTimeout(confirmationTimeout: java.time.Duration): AmqpWriteSettings = {
copy(confirmationTimeout = confirmationTimeout.toScala)
}

private def copy(connectionProvider: AmqpConnectionProvider = connectionProvider,
exchange: Option[String] = exchange,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private trait AmqpConnectorLogic { this: GraphStageLogic =>
connection.addShutdownListener(shutdownListener)
channel.addShutdownListener(shutdownListener)

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

settings.declarations.foreach {
case d: QueueDeclaration =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
private var unackedMessages = 0

override def whenConnected(): Unit = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
channel.basicQos(bufferSize)
val consumerCallback = getAsyncCallback(handleDelivery)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.japi.Pair
import akka.stream.alpakka.amqp._
import akka.stream.scaladsl.Keep

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

object AmqpFlow {

Expand All @@ -29,7 +29,7 @@ object AmqpFlow {
def create(
settings: AmqpWriteSettings
): akka.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlow(settings).mapMaterializedValue(f => f.toJava).asJava
akka.stream.alpakka.amqp.scaladsl.AmqpFlow(settings).mapMaterializedValue(f => f.asJava).asJava

/**
* Creates an `AmqpFlow` that accepts `WriteMessage` elements and emits `WriteResult`.
Expand All @@ -54,7 +54,7 @@ object AmqpFlow {
): akka.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlow
.withConfirm(settings = settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -80,7 +80,7 @@ object AmqpFlow {
): akka.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlow
.withConfirmUnordered(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -103,6 +103,6 @@ object AmqpFlow {
.withConfirmAndPassThroughUnordered[T](settings = settings)
)(Keep.right)
.map { case (writeResult, passThrough) => Pair(writeResult, passThrough) }
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.concurrent.CompletionStage
import akka.Done
import akka.stream.alpakka.amqp._

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

object AmqpFlowWithContext {

Expand All @@ -23,7 +23,7 @@ object AmqpFlowWithContext {
): akka.stream.javadsl.FlowWithContext[WriteMessage, T, WriteResult, T, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlowWithContext
.apply(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -40,6 +40,6 @@ object AmqpFlowWithContext {
): akka.stream.javadsl.FlowWithContext[WriteMessage, T, WriteResult, T, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlowWithContext
.withConfirm(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.stream.alpakka.amqp._
import akka.stream.javadsl.Flow
import akka.util.ByteString

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

object AmqpRpcFlow {

Expand All @@ -27,7 +27,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int): Flow[ByteString, ByteString, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.simple(settings, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -39,7 +39,7 @@ object AmqpRpcFlow {
bufferSize: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.atMostOnceFlow(settings, bufferSize)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -52,7 +52,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.atMostOnceFlow(settings, bufferSize, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -73,7 +73,7 @@ object AmqpRpcFlow {
): Flow[WriteMessage, CommittableReadResult, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.committableFlow(settings, bufferSize, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.map(cm => new CommittableReadResult(cm))
.asJava

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.Done
import akka.stream.alpakka.amqp._
import akka.util.ByteString

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

object AmqpSink {

Expand All @@ -21,7 +21,7 @@ object AmqpSink {
* either normally or because of an amqp failure.
*/
def create(settings: AmqpWriteSettings): akka.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpSink(settings).mapMaterializedValue(f => f.toJava).asJava
akka.stream.alpakka.amqp.scaladsl.AmqpSink(settings).mapMaterializedValue(f => f.asJava).asJava

/**
* Creates an `AmqpSink` that accepts `ByteString` elements.
Expand All @@ -30,7 +30,7 @@ object AmqpSink {
* either normally or because of an amqp failure.
*/
def createSimple(settings: AmqpWriteSettings): akka.stream.javadsl.Sink[ByteString, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpSink.simple(settings).mapMaterializedValue(f => f.toJava).asJava
akka.stream.alpakka.amqp.scaladsl.AmqpSink.simple(settings).mapMaterializedValue(f => f.asJava).asJava

/**
* Connects to an AMQP server upon materialization and sends incoming messages to the server.
Expand All @@ -43,6 +43,6 @@ object AmqpSink {
def createReplyTo(
settings: AmqpReplyToSinkSettings
): akka.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpSink.replyTo(settings).mapMaterializedValue(f => f.toJava).asJava
akka.stream.alpakka.amqp.scaladsl.AmqpSink.replyTo(settings).mapMaterializedValue(f => f.asJava).asJava

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import akka.Done
import akka.stream.alpakka.amqp.ReadResult
import akka.stream.alpakka.amqp.scaladsl

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

final class CommittableReadResult(cm: scaladsl.CommittableReadResult) {
val message: ReadResult = cm.message

def ack(): CompletionStage[Done] = ack(false)
def ack(multiple: Boolean): CompletionStage[Done] = cm.ack(multiple).toJava
def ack(multiple: Boolean): CompletionStage[Done] = cm.ack(multiple).asJava

def nack(): CompletionStage[Done] = nack(false, true)
def nack(multiple: Boolean, requeue: Boolean): CompletionStage[Done] =
cm.nack(multiple, requeue).toJava
cm.nack(multiple, requeue).asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

package akka.stream.alpakka.amqp.scaladsl

import akka.dispatch.ExecutionContexts
import akka.stream.alpakka.amqp._
import akka.stream.scaladsl.{Flow, Keep}
import akka.util.ByteString

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

object AmqpRpcFlow {
Expand Down Expand Up @@ -39,7 +39,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int = 1): Flow[WriteMessage, ReadResult, Future[String]] =
committableFlow(settings, bufferSize, repliesPerMessage)
.mapAsync(1) { cm =>
cm.ack().map(_ => cm.message)(ExecutionContexts.parasitic)
cm.ack().map(_ => cm.message)(ExecutionContext.parasitic)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
package akka.stream.alpakka.amqp.scaladsl

import akka.NotUsed
import akka.dispatch.ExecutionContexts
import akka.stream.alpakka.amqp.impl
import akka.stream.alpakka.amqp.{AmqpSourceSettings, ReadResult}
import akka.stream.scaladsl.Source

import scala.concurrent.ExecutionContext

object AmqpSource {
private implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic
private implicit val executionContext: ExecutionContext = ExecutionContext.parasitic

/**
* Scala API: Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import akka.util.ByteString;
import com.rabbitmq.client.AuthenticationFailureException;
import org.junit.*;
import scala.collection.JavaConverters;
import scala.concurrent.duration.Duration;
import scala.jdk.javaapi.CollectionConverters;

import java.net.ConnectException;
import java.util.Arrays;
Expand Down Expand Up @@ -161,10 +161,7 @@ public void publishAndConsumeRpcWithoutAutoAck() throws Exception {
.to(amqpSink)
.run(system);

List<ReadResult> probeResult =
JavaConverters.seqAsJavaListConverter(
result.second().toStrict(Duration.create(3, TimeUnit.SECONDS)))
.asJava();
java.util.Collection<ReadResult> probeResult = CollectionConverters.asJavaCollection(result.second().toStrict(Duration.create(3, TimeUnit.SECONDS)));
assertEquals(
probeResult.stream().map(s -> s.bytes().utf8String()).collect(Collectors.toList()), input);
sourceToSink.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.TestSink;
import akka.util.ByteString;
import scala.collection.JavaConverters;

import scala.jdk.javaapi.CollectionConverters;

/** Needs a local running AMQP server on the default port with no password. */
public class AmqpFlowTest {
Expand Down Expand Up @@ -86,7 +87,7 @@ private void shouldEmitConfirmationForPublishedMessages(

result
.request(input.size())
.expectNextN(JavaConverters.asScalaBufferConverter(expectedOutput).asScala().toList());
.expectNextN(CollectionConverters.asScala(expectedOutput).toList());
}

@Test
Expand Down Expand Up @@ -120,7 +121,7 @@ private void shouldPropagateContext(

result
.request(input.size())
.expectNextN(JavaConverters.asScalaBufferConverter(expectedOutput).asScala().toList());
.expectNextN(CollectionConverters.asScala(expectedOutput).toList());
}

@Test
Expand All @@ -143,6 +144,6 @@ public void shouldPropagatePassThrough() {

result
.request(input.size())
.expectNextN(JavaConverters.asScalaBufferConverter(expectedOutput).asScala().toList());
.expectNextN(CollectionConverters.asScala(expectedOutput).toList());
}
}
Loading

0 comments on commit 1c9c3cf

Please sign in to comment.