Skip to content

Commit

Permalink
Example for pubsub authenticated push (#1407)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajaaym authored and anguillanneuf committed May 7, 2019
1 parent ee82760 commit 5173029
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 48 deletions.
24 changes: 24 additions & 0 deletions appengine-java8/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ gcloud beta pubsub subscriptions create <your-subscription-name> \
--ack-deadline 30
```

- Create a subscription for authenticated pushes to send messages to a Google Cloud Project URL such as https://<your-project-id>.appspot.com/authenticated-push.

The push auth service account must have Service Account Token Creator Role assigned, which can be done in the Cloud Console [IAM & admin](https://console.cloud.google.com/iam-admin/iam) UI.
`--push-auth-token-audience` is optional. If set, remember to modify the audience field check in [PubSubAuthenticatedPush.java](src/main/java/com/example/appengine/pubsub/PubSubAuthenticatedPush.java#L36).

```
gcloud beta pubsub subscriptions create <your-subscription-name> \
--topic <your-topic-name> \
--push-endpoint \
https://<your-project-id>.appspot.com/pubsub/authenticated-push?token=<your-verification-token> \
--ack-deadline 30 \
--push-auth-service-account=[your-service-account-email] \
--push-auth-token-audience=example.com
```

## Run locally
Set the following environment variables and run using shown Maven command. You can then
direct your browser to `http://localhost:8080/`
Expand All @@ -70,6 +85,15 @@ mvn appengine:run
"localhost:8080/pubsub/push?token=<your-token>"
```

### Authenticated push notifications

Simulating authenticated push requests will fail because requests need to contain a Cloud Pub/Sub-generated JWT in the "Authorization" header.

```
curl -H "Content-Type: application/json" -i --data @sample_message.json
"localhost:8080/pubsub/authenticated-push?token=<your-token>"
```

## Deploy

Update the environment variables `PUBSUB_TOPIC` and `PUBSUB_VERIFICATION_TOKEN` in
Expand Down
5 changes: 5 additions & 0 deletions appengine-java8/pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
<type>jar</type>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.googlecode.jatl</groupId>
<artifactId>jatl</artifactId>
<version>0.2.2</version>
</dependency>

<!-- [START dependencies] -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/


package com.example.appengine.pubsub;

import java.util.List;
Expand All @@ -26,8 +25,31 @@ public interface MessageRepository {

/**
* Retrieve most recent stored messages.
*
* @param limit number of messages
* @return list of messages
*/
List<Message> retrieve(int limit);

/** Save claim to persistent storage. */
void saveClaim(String claim);

/**
* Retrieve most recent stored claims.
*
* @param limit number of messages
* @return list of claims
*/
List<String> retrieveClaims(int limit);

/** Save token to persistent storage. */
void saveToken(String token);

/**
* Retrieve most recent stored tokens.
*
* @param limit number of messages
* @return list of tokens
*/
List<String> retrieveTokens(int limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/


package com.example.appengine.pubsub;

import com.google.cloud.datastore.Datastore;
Expand All @@ -35,15 +34,21 @@ public class MessageRepositoryImpl implements MessageRepository {

private String messagesKind = "messages";
private KeyFactory keyFactory = getDatastoreInstance().newKeyFactory().setKind(messagesKind);
private String claimsKind = "claims";
private KeyFactory claimsKindKeyFactory =
getDatastoreInstance().newKeyFactory().setKind(claimsKind);
private String tokensKind = "tokens";
private KeyFactory tokensKindKeyFactory =
getDatastoreInstance().newKeyFactory().setKind(tokensKind);

@Override
public void save(Message message) {
// Save message to "messages"
Datastore datastore = getDatastoreInstance();
Key key = datastore.allocateId(keyFactory.newKey());

Entity.Builder messageEntityBuilder = Entity.newBuilder(key)
.set("messageId", message.getMessageId());
Entity.Builder messageEntityBuilder =
Entity.newBuilder(key).set("messageId", message.getMessageId());

if (message.getData() != null) {
messageEntityBuilder = messageEntityBuilder.set("data", message.getData());
Expand Down Expand Up @@ -84,15 +89,72 @@ public List<Message> retrieve(int limit) {
return messages;
}

@Override
public void saveClaim(String claim) {
// Save message to "messages"
Datastore datastore = getDatastoreInstance();
Key key = datastore.allocateId(claimsKindKeyFactory.newKey());

Entity.Builder claimEntityBuilder = Entity.newBuilder(key).set("claim", claim);

datastore.put(claimEntityBuilder.build());
}

@Override
public List<String> retrieveClaims(int limit) {
// Get claim saved in Datastore
Datastore datastore = getDatastoreInstance();
Query<Entity> query = Query.newEntityQueryBuilder().setKind(claimsKind).setLimit(limit).build();
QueryResults<Entity> results = datastore.run(query);

List<String> claims = new ArrayList<>();
while (results.hasNext()) {
Entity entity = results.next();
String claim = entity.getString("claim");
if (claim != null) {
claims.add(claim);
}
}
return claims;
}

@Override
public void saveToken(String token) {
// Save message to "messages"
Datastore datastore = getDatastoreInstance();
Key key = datastore.allocateId(tokensKindKeyFactory.newKey());

Entity.Builder tokenEntityBuilder = Entity.newBuilder(key).set("token", token);

datastore.put(tokenEntityBuilder.build());
}

@Override
public List<String> retrieveTokens(int limit) {
// Get token saved in Datastore
Datastore datastore = getDatastoreInstance();
Query<Entity> query = Query.newEntityQueryBuilder().setKind(tokensKind).setLimit(limit).build();
QueryResults<Entity> results = datastore.run(query);

List<String> tokens = new ArrayList<>();
while (results.hasNext()) {
Entity entity = results.next();
String token = entity.getString("token");
if (token != null) {
tokens.add(token);
}
}
return tokens;
}

private Datastore getDatastoreInstance() {
return DatastoreOptions.getDefaultInstance().getService();
}

private MessageRepositoryImpl() {
}
private MessageRepositoryImpl() {}

// retrieve a singleton instance
public static synchronized MessageRepositoryImpl getInstance() {
public static synchronized MessageRepositoryImpl getInstance() {
if (instance == null) {
instance = new MessageRepositoryImpl();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.appengine.pubsub;

import com.google.api.client.googleapis.auth.oauth2.GoogleIdToken;
import com.google.api.client.googleapis.auth.oauth2.GoogleIdTokenVerifier;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.io.IOException;
import java.util.Base64;
import java.util.Collections;
import java.util.stream.Collectors;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

// [START gae_standard_pubsub_auth_push]
@WebServlet(value = "/pubsub/authenticated-push")
public class PubSubAuthenticatedPush extends HttpServlet {
private final String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
private final MessageRepository messageRepository;
private final GoogleIdTokenVerifier verifier =
new GoogleIdTokenVerifier.Builder(new NetHttpTransport(), new JacksonFactory())
/**
* Please change example.com to match with value you are providing while creating
* subscription as provided in @see <a
* href="https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/appengine-java8/pubsub">README</a>.
*/
.setAudience(Collections.singletonList("example.com"))
.build();
private final Gson gson = new Gson();
private final JsonParser jsonParser = new JsonParser();

@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp)
throws IOException, ServletException {

// Verify that the request originates from the application.
if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
return;
}
// Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
String authorizationHeader = req.getHeader("Authorization");
if (authorizationHeader == null
|| authorizationHeader.isEmpty()
|| authorizationHeader.split(" ").length != 2) {
resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
return;
}
String authorization = authorizationHeader.split(" ")[1];

try {
// Verify and decode the JWT.
GoogleIdToken idToken = verifier.verify(authorization);
messageRepository.saveToken(authorization);
messageRepository.saveClaim(idToken.getPayload().toPrettyString());
// parse message object from "message" field in the request body json
// decode message data from base64
Message message = getMessage(req);
messageRepository.save(message);
// 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
resp.setStatus(102);
super.doPost(req, resp);
} catch (Exception e) {
resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
}
}

private Message getMessage(HttpServletRequest request) throws IOException {
String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
JsonElement jsonRoot = jsonParser.parse(requestBody);
String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
Message message = gson.fromJson(messageStr, Message.class);
// decode from base64
String decoded = decode(message.getData());
message.setData(decoded);
return message;
}

private String decode(String data) {
return new String(Base64.getDecoder().decode(data));
}

PubSubAuthenticatedPush(MessageRepository messageRepository) {
this.messageRepository = messageRepository;
}

public PubSubAuthenticatedPush() {
this(MessageRepositoryImpl.getInstance());
}
}
// [END gae_standard_pubsub_auth_push]
Loading

0 comments on commit 5173029

Please sign in to comment.