Skip to content

Commit

Permalink
fix: do not create or fetch the topic
Browse files Browse the repository at this point in the history
  • Loading branch information
rvcas committed Apr 13, 2022
1 parent e5f1570 commit 9fa192c
Showing 1 changed file with 11 additions and 24 deletions.
35 changes: 11 additions & 24 deletions src/sinks/gcp_pubsub/setup.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use google_cloud_rust_raw::pubsub::v1::{
pubsub::{GetTopicRequest, Topic},
pubsub_grpc::PublisherClient,
};
use google_cloud_rust_raw::pubsub::v1::{pubsub::Topic, pubsub_grpc::PublisherClient};
use grpcio::{Channel, ChannelBuilder, ChannelCredentials, EnvBuilder};
use serde::Deserialize;

Expand All @@ -28,9 +25,8 @@ impl SinkProvider for WithUtils<Config> {
let channel = connect("pubsub.googleapis.com");
let publisher = PublisherClient::new(channel);

// TODO: do we want to do this in the spawned thread instead?
let topic_full_name = format!("projects/{project_id}/topics/{topic_name}");
let topic = find_or_create_topic(&publisher, &topic_full_name).unwrap();
let topic = build_topic(&topic_full_name);

let utils = self.utils.clone();
let handle = std::thread::spawn(move || {
Expand All @@ -55,26 +51,17 @@ fn connect(endpoint: &str) -> Channel {
.secure_connect(endpoint, creds)
}

fn find_or_create_topic(client: &PublisherClient, topic_name: &str) -> grpcio::Result<Topic> {
// find topic
let mut request = GetTopicRequest::new();

request.set_topic(topic_name.to_string());

if let Ok(topic) = client.get_topic(&request) {
return Ok(topic);
}
fn build_topic(topic_name: &str) -> Topic {
let mut topic = Topic::new();

// otherwise create topic
let mut labels = HashMap::new();
topic.set_name(topic_name.to_string());

// TODO: do we need this?
labels.insert("environment".to_string(), "test".to_string());
// let mut labels = HashMap::new();

let mut topic = Topic::new();
// // TODO: do we need this?
// labels.insert("environment".to_string(), "test".to_string());

topic.set_name(topic_name.to_string());
topic.set_labels(labels);
// topic.set_labels(labels);

client.create_topic(&topic)
topic
}

0 comments on commit 9fa192c

Please sign in to comment.