Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HUAWEI Push Kit #2643

Merged
merged 4 commits into from
Nov 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/autolabeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependency-change: "/project/Dependencies.scala"
'p:google-fcm': ["/google-fcm"]
'p:hbase': ["/hbase"]
'p:hdfs': ["/hdfs"]
'p:huawei-push-kit': ["/huawei-push-kit"]
'p:influxdb': ["/influxdb"]
'p:ironmq': ["/ironmq"]
'p:jms': ["/jms"]
Expand Down
4 changes: 2 additions & 2 deletions .github/release-drafter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ categories:
label: 'p:google-fcm'
- title: 'HBase'
label: 'p:hbase'
- title: 'Huawei Push Kit'
label: 'p:huawei-push-kit'
- title: 'InfluxDB'
label: 'p:influxdb'
- title: 'IronMQ'
Expand Down Expand Up @@ -99,8 +101,6 @@ change-template: '- $TITLE [#$NUMBER](https://github.com/akka/alpakka/issues/$NU
template: |
# :mega: Alpakka $NEXT_PATCH_VERSION released!

### See the full Release Notes in the [Alpakka documentation](https://doc.akka.io/docs/alpakka/3.0/release-notes/3.0.x.html).

## Changes

$CHANGES
Expand Down
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ jobs:
- name: hdfs
env:
- PRE_CMD="file /home/travis/.cache/coursier/v1/https/repo1.maven.org/maven2/org/typelevel/cats-kernel_2.13/2.0.0/cats-kernel_2.13-2.0.0.jar"
- name: huawei-push-kit
- name: influxdb
env:
- PRE_CMD="docker-compose up -d influxdb"
Expand Down
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ lazy val alpakka = project
googleFcm,
hbase,
hdfs,
huaweiPushKit,
influxdb,
ironmq,
jms,
Expand Down Expand Up @@ -220,6 +221,10 @@ lazy val hbase = alpakkaProject("hbase", "hbase", Dependencies.HBase, Test / for

lazy val hdfs = alpakkaProject("hdfs", "hdfs", Dependencies.Hdfs)

lazy val huaweiPushKit =
alpakkaProject("huawei-push-kit", "huawei.pushkit", Dependencies.HuaweiPushKit)
.disablePlugins(MimaPlugin)

lazy val influxdb = alpakkaProject("influxdb", "influxdb", Dependencies.InfluxDB)

lazy val ironmq = alpakkaProject(
Expand Down
82 changes: 82 additions & 0 deletions docs/src/main/paradox/huawei-push-kit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Huawei Push Kit

@@@ note { title="Huawei Push Kit" }

Huawei Push Kit is a messaging service provided for you. It establishes a messaging channel from the cloud to devices. By integrating Push Kit, you can send messages to your apps on users' devices in real time.

@@@

The Alpakka Huawei Push Kit connector provides a way to send notifications with [Huawei Push Kit](https://developer.huawei.com/consumer/en/hms/huawei-pushkit).

@@project-info{ projectId="huawei-push-kit" }

## Artifacts

@@dependency [sbt,Maven,Gradle] {
group=com.lightbend.akka
artifact=akka-stream-alpakka-huawei-push-kit_$scala.binary.version$
version=$project.version$
symbol2=AkkaVersion
value2=$akka.version$
group2=com.typesafe.akka
artifact2=akka-stream_$scala.binary.version$
version2=AkkaVersion
symbol3=AkkaHttpVersion
value3=$akka-http.version$
group3=com.typesafe.akka
artifact3=akka-http_$scala.binary.version$
version3=AkkaHttpVersion
group4=com.typesafe.akka
artifact4=akka-http-spray-json_$scala.binary.version$
version4=AkkaHttpVersion
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

@@dependencies { projectId="huawei-push-kit" }

## Settings

All of the configuration settings for Huawei Push Kit can be found in the @github[reference.conf](/huawei-push-kit/src/main/resources/reference.conf).

@@snip [snip](/huawei-push-kit/src/test/resources/application.conf) { #init-credentials }

The `test` and `maxConcurrentConnections` parameters in @scaladoc[HmsSettings](akka.stream.alpakka.huawei.pushkit.HmsSettings) are the predefined values.
You can send test notifications [(so called validate only).](https://developer.huawei.com/consumer/en/doc/development/HMSCore-References-V5/https-send-api-0000001050986197-V5)
And you can set the number of maximum concurrent connections.

## Sending notifications

To send a notification message create your notification object, and send it!

Scala
: @@snip [snip](/huawei-push-kit/src/test/scala/docs/scaladsl/PushKitExamples.scala) { #imports #asFlow-send }

Java
: @@snip [snip](/huawei-push-kit/src/test/java/docs/javadsl/PushKitExamples.java) { #imports #asFlow-send }

With this type of send you can get responses from the server.
These responses can be @scaladoc[PushKitResponse](akka.stream.alpakka.huawei.pushkit.PushKitResponse) or @scaladoc[ErrorResponse](akka.stream.alpakka.huawei.pushkit.ErrorResponse).
You can choose what you want to do with this information, but keep in mind
if you try to resend the failed messages you will need to use exponential backoff! (see @extref[[Akka docs `RestartFlow.onFailuresWithBackoff`](akka:stream/operators/RestartFlow/onFailuresWithBackoff.html))

If you don't care if the notification was sent successfully, you may use `fireAndForget`.

Scala
: @@snip [snip](/huawei-push-kit/src/test/scala/docs/scaladsl/PushKitExamples.scala) { #imports #simple-send }

Java
: @@snip [snip](/huawei-push-kit/src/test/java/docs/javadsl/PushKitExamples.java) { #imports #simple-send }

With fire and forget you will just send messages and ignore all the errors.

To help the integration and error handling or logging, there is a variation of the flow where you can send data beside your notification.

## Scala only

You can build notification described in the original documentation.
It can be done by hand, or using some builder method.
See an example of the condition builder below.

Scala
: @@snip [snip](/huawei-push-kit/src/test/scala/docs/scaladsl/PushKitExamples.scala) { #condition-builder }
1 change: 1 addition & 0 deletions docs/src/main/paradox/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The [Alpakka project](https://doc.akka.io/docs/alpakka/current/) is an open sour
* [gRPC](external/grpc.md)
* [Hadoop Distributed File System](hdfs.md)
* [HBase](hbase.md)
* [Huawei Push Kit](huawei-push-kit.md)
* [HTTP](external/http.md)
* [IBM Bluemix Cloud Object storage](bluemix-cos.md)
* [IBM DB2 Event Store](external/db2-event-store.md)
Expand Down
17 changes: 17 additions & 0 deletions huawei-push-kit/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
alpakka.huawei.pushkit {
app-id: ""
app-secret: ""
test: false
max-concurrent-connections: 50

# An address of a proxy that will be used for all connections using HTTP CONNECT tunnel.
# forward-proxy {
# host = "proxy"
# port = 8080
# credentials {
# username = "username"
# password = "password"
# }
# trust-pem = "/path/to/file.pem"
# }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.huawei.pushkit

import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.http.scaladsl.{Http, HttpsConnectionContext}

import java.io.FileInputStream
import java.security.KeyStore
import java.security.cert.{CertificateFactory, X509Certificate}
import javax.net.ssl.{SSLContext, TrustManagerFactory}

/**
* INTERNAL API
*/
@InternalApi
private[pushkit] object ForwardProxyHttpsContext {

val SSL = "SSL"
val X509 = "X509"

implicit class ForwardProxyHttpsContext(forwardProxy: ForwardProxy) {

def httpsContext(system: ActorSystem) = {
forwardProxy.trustPem match {
case Some(trustPem) => createHttpsContext(trustPem)
case None => defaultHttpsContext(system)
}
}
}

private def defaultHttpsContext(implicit system: ActorSystem) = {
Http().createDefaultClientHttpsContext()
}

private def createHttpsContext(trustPem: ForwardProxyTrustPem) = {
val certificate = x509Certificate(trustPem)
val sslContext = SSLContext.getInstance(SSL)

val alias = certificate.getIssuerDN.getName
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
trustStore.load(null, null)
trustStore.setCertificateEntry(alias, certificate)

val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
tmf.init(trustStore)
val trustManagers = tmf.getTrustManagers
sslContext.init(null, trustManagers, null)
new HttpsConnectionContext(sslContext)
}

private def x509Certificate(trustPem: ForwardProxyTrustPem) = {
val stream = new FileInputStream(trustPem.pemPath)
var result: X509Certificate = null
try result = CertificateFactory.getInstance(X509).generateCertificate(stream).asInstanceOf[X509Certificate]
finally if (stream != null) stream.close()
result
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.huawei.pushkit

import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.http.scaladsl.ClientTransport
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}

import java.net.InetSocketAddress

/**
* INTERNAL API
*/
@InternalApi
private[pushkit] object ForwardProxyPoolSettings {

implicit class ForwardProxyPoolSettings(forwardProxy: ForwardProxy) {

def poolSettings(system: ActorSystem) = {
val address = InetSocketAddress.createUnresolved(forwardProxy.host, forwardProxy.port)
val transport = forwardProxy.credentials.fold(ClientTransport.httpsProxy(address))(
c => ClientTransport.httpsProxy(address, BasicHttpCredentials(c.username, c.password))
)

ConnectionPoolSettings(system)
.withConnectionSettings(
ClientConnectionSettings(system)
.withTransport(transport)
)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.huawei.pushkit

import akka.actor.{
ActorSystem,
ClassicActorSystemProvider,
ExtendedActorSystem,
Extension,
ExtensionId,
ExtensionIdProvider
}
import akka.annotation.InternalApi

import scala.collection.immutable.ListMap

/**
* INTERNAL API.
* Manages one [[HmsSettings]] per `ActorSystem`.
*/
@InternalApi
private[pushkit] final class HmsSettingExt private (sys: ExtendedActorSystem) extends Extension {
private var cachedSettings: Map[String, HmsSettings] = ListMap.empty
val settings: HmsSettings = settings(HmsSettings.ConfigPath)

def settings(path: String): HmsSettings =
cachedSettings.getOrElse(path, {
val settings = HmsSettings(sys.settings.config.getConfig(path))
cachedSettings += path -> settings
settings
})
}

/**
* INTERNAL API
*/
@InternalApi
private[pushkit] object HmsSettingExt extends ExtensionId[HmsSettingExt] with ExtensionIdProvider {

def apply()(implicit system: ActorSystem): HmsSettingExt = super.apply(system)

override def lookup = HmsSettingExt
override def createExtension(system: ExtendedActorSystem) = new HmsSettingExt(system)

/**
* Java API.
* Get the HmsSettings extension with the new actors API.
*/
override def get(system: ClassicActorSystemProvider): HmsSettingExt = super.apply(system)
}
Loading