Skip to content

Commit

Permalink
HUAWEI Push Kit
Browse files Browse the repository at this point in the history
  • Loading branch information
atyutin90 committed Apr 14, 2021
1 parent 0df5d7e commit 7db1d36
Show file tree
Hide file tree
Showing 26 changed files with 1,750 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/autolabeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependency-change: "/project/Dependencies.scala"
'p:google-fcm': ["/google-fcm"]
'p:hbase': ["/hbase"]
'p:hdfs': ["/hdfs"]
'p:huawei-push-kit': ["/huawei-push-kit"]
'p:influxdb': ["/influxdb"]
'p:ironmq': ["/ironmq"]
'p:jms': ["/jms"]
Expand Down
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ lazy val alpakka = project
googleFcm,
hbase,
hdfs,
huaweiPushKit,
influxdb,
ironmq,
jms,
Expand Down Expand Up @@ -237,6 +238,10 @@ lazy val hbase = alpakkaProject("hbase", "hbase", Dependencies.HBase, Test / for

lazy val hdfs = alpakkaProject("hdfs", "hdfs", Dependencies.Hdfs, fatalWarnings := true)

lazy val huaweiPushKit =
alpakkaProject("huawei-push-kit", "huawei.pushkit", Dependencies.HuaweiPushKit, fatalWarnings := true)
.disablePlugins(MimaPlugin)

lazy val influxdb = alpakkaProject("influxdb", "influxdb", Dependencies.InfluxDB, fatalWarnings := false)

lazy val ironmq = alpakkaProject(
Expand Down
82 changes: 82 additions & 0 deletions docs/src/main/paradox/huawei-push-kit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# HUAWEI Push Kit

@@@ note { title="HUAWEI Push Kit" }

HUAWEI Push Kit is a messaging service provided for you. It establishes a messaging channel from the cloud to devices. By integrating Push Kit, you can send messages to your apps on users' devices in real time.

@@@

The Alpakka HUAWEI Push Kit connector provides a way to send notifications with [HUAWEI Push Kit](https://developer.huawei.com/consumer/en/hms/huawei-pushkit).

@@project-info{ projectId="huawei-push-kit" }

## Artifacts

@@dependency [sbt,Maven,Gradle] {
group=com.lightbend.akka
artifact=akka-stream-alpakka-huawei-push-kit_$scala.binary.version$
version=$project.version$
symbol2=AkkaVersion
value2=$akka.version$
group2=com.typesafe.akka
artifact2=akka-stream_$scala.binary.version$
version2=AkkaVersion
symbol3=AkkaHttpVersion
value3=$akka-http.version$
group3=com.typesafe.akka
artifact3=akka-http_$scala.binary.version$
version3=AkkaHttpVersion
group4=com.typesafe.akka
artifact4=akka-http-spray-json_$scala.binary.version$
version4=AkkaHttpVersion
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

@@dependencies { projectId="huawei-push-kit" }

## Settings

All of the configuration settings for HUAWEI Push Kit can be found in the @github[reference.conf](/huawei-push-kit/src/main/resources/reference.conf).

@@snip [snip](/huawei-push-kit/src/test/resources/application.conf) { #init-credentials }

The `test` and `maxConcurrentConnections` parameters in [HmsSettings](akka.stream.alpakka.huawei.pushkit.HmsSettings) are the predefined values.
You can send test notifications [(so called validate only).](https://developer.huawei.com/consumer/en/doc/development/HMSCore-References-V5/https-send-api-0000001050986197-V5)
And you can set the number of maximum concurrent connections.

## Sending notifications

To send a notification message create your notification object, and send it!

Scala
: @@snip [snip](/huawei-push-kit/src/test/scala/docs/scaladsl/PushKitExamples.scala) { #imports #asFlow-send }

Java
: @@snip [snip](/huawei-push-kit/src/test/java/docs/javadsl/PushKitExamples.java) { #imports #asFlow-send }

With this type of send you can get responses from the server.
These responses can be @scaladoc[PushKitResponse](akka.stream.alpakka.huawei.pushkit.PushKitResponse) or @scaladoc[ErrorResponse](akka.stream.alpakka.huawei.pushkit.ErrorResponse).
You can choose what you want to do with this information, but keep in mind
if you try to resend the failed messages you will need to use exponential backoff! (see [Akka docs `RestartFlow.onFailuresWithBackoff`](https://doc.akka.io/docs/akka/current/stream/operators/RestartFlow/onFailuresWithBackoff.html))

If you don't care if the notification was sent successfully, you may use `fireAndForget`.

Scala
: @@snip [snip](/huawei-push-kit/src/test/scala/docs/scaladsl/PushKitExamples.scala) { #imports #simple-send }

Java
: @@snip [snip](/huawei-push-kit/src/test/java/docs/javadsl/PushKitExamples.java) { #imports #simple-send }

With fire and forget you will just send messages and ignore all the errors.

To help the integration and error handling or logging, there is a variation of the flow where you can send data beside your notification.

## Scala only

You can build notification described in the original documentation.
It can be done by hand, or using some builder method.
Example is condition builder.

Scala
: @@snip [snip](/huawei-push-kit/src/test/scala/docs/scaladsl/PushKitExamples.scala) { #condition-builder }
1 change: 1 addition & 0 deletions docs/src/main/paradox/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The [Alpakka project](https://doc.akka.io/docs/alpakka/current/) is an open sour
* [gRPC](external/grpc.md)
* [Hadoop Distributed File System](hdfs.md)
* [HBase](hbase.md)
* [HUAWEI Push Kit](huawei-push-kit.md)
* [HTTP](external/http.md)
* [IBM Bluemix Cloud Object storage](bluemix-cos.md)
* [IBM DB2 Event Store](external/db2-event-store.md)
Expand Down
17 changes: 17 additions & 0 deletions huawei-push-kit/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
alpakka.huawei.pushkit {
app-id: ""
app-secret: ""
test: false
max-concurrent-connections: 50

# An address of a proxy that will be used for all connections using HTTP CONNECT tunnel.
# forward-proxy {
# host = "proxy"
# port = 8080
# credentials {
# username = "username"
# password = "password"
# }
# trust-pem = "/path/to/file.pem"
# }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.huawei.pushkit

import akka.actor.ActorSystem
import akka.http.scaladsl.{Http, HttpsConnectionContext}

import java.io.FileInputStream
import java.security.KeyStore
import java.security.cert.{CertificateFactory, X509Certificate}
import javax.net.ssl.{SSLContext, TrustManagerFactory}

private[pushkit] object ForwardProxyHttpsContext {

val SSL = "SSL"
val X509 = "X509"

implicit class ForwardProxyHttpsContext(forwardProxy: ForwardProxy) {

def httpsContext(system: ActorSystem) = {
forwardProxy.trustPem match {
case Some(trustPem) => createHttpsContext(trustPem)
case None => defaultHttpsContext(system)
}
}
}

private def defaultHttpsContext(implicit system: ActorSystem) = {
Http().createDefaultClientHttpsContext()
}

private def createHttpsContext(trustPem: ForwardProxyTrustPem) = {
val certificate = x509Certificate(trustPem)
val sslContext = SSLContext.getInstance(SSL)

val alias = certificate.getIssuerDN.getName
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
trustStore.load(null, null)
trustStore.setCertificateEntry(alias, certificate)

val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
tmf.init(trustStore)
val trustManagers = tmf.getTrustManagers
sslContext.init(null, trustManagers, null)
new HttpsConnectionContext(sslContext)
}

private def x509Certificate(trustPem: ForwardProxyTrustPem) = {
val stream = new FileInputStream(trustPem.pemPath)
var result: X509Certificate = null
try result = CertificateFactory.getInstance(X509).generateCertificate(stream).asInstanceOf[X509Certificate]
finally if (stream != null) stream.close()
result
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.huawei.pushkit

import akka.actor.ActorSystem
import akka.http.scaladsl.ClientTransport
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}

import java.net.InetSocketAddress

private[pushkit] object ForwardProxyPoolSettings {

implicit class ForwardProxyPoolSettings(forwardProxy: ForwardProxy) {

def poolSettings(system: ActorSystem) = {
val address = InetSocketAddress.createUnresolved(forwardProxy.host, forwardProxy.port)
val transport = forwardProxy.credentials.fold(ClientTransport.httpsProxy(address))(
c => ClientTransport.httpsProxy(address, BasicHttpCredentials(c.username, c.password))
)

ConnectionPoolSettings(system)
.withConnectionSettings(
ClientConnectionSettings(system)
.withTransport(transport)
)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.huawei.pushkit

import akka.actor.{
ActorSystem,
ClassicActorSystemProvider,
ExtendedActorSystem,
Extension,
ExtensionId,
ExtensionIdProvider
}
import akka.annotation.InternalApi

import scala.collection.immutable.ListMap

/**
* Manages one [[HmsSettings]] per `ActorSystem`.
*/
@InternalApi
private[pushkit] final class HmsSettingExt private (sys: ExtendedActorSystem) extends Extension {
private var cachedSettings: Map[String, HmsSettings] = ListMap.empty
val settings: HmsSettings = settings(HmsSettings.ConfigPath)

def settings(path: String): HmsSettings =
cachedSettings.getOrElse(path, {
val settings = HmsSettings(sys.settings.config.getConfig(path))
cachedSettings += path -> settings
settings
})
}

@InternalApi
private[pushkit] object HmsSettingExt extends ExtensionId[HmsSettingExt] with ExtensionIdProvider {

def apply()(implicit system: ActorSystem): HmsSettingExt = super.apply(system)

override def lookup = HmsSettingExt
override def createExtension(system: ExtendedActorSystem) = new HmsSettingExt(system)

/**
* Java API.
* Get the HmsSettings extension with the new actors API.
*/
override def get(system: ClassicActorSystemProvider): HmsSettingExt = super.apply(system)
}
Loading

0 comments on commit 7db1d36

Please sign in to comment.