Skip to content

Commit

Permalink
reports: add topic for TAP indexer fees (edgeandnode#938)
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus authored Sep 16, 2024
1 parent 7a403d0 commit ceeb53f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 27 deletions.
17 changes: 10 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ async fn main() {

let signer_address = Wallet::from_bytes(conf.scalar.signer.0.as_ref())
.expect("failed to prepare receipt wallet");
let gateway_id = format!("{:?}", Address::from(signer_address.address().0));
let tap_signer = Address::from(signer_address.address().0);

let conf_repr = format!("{conf:?}");

init_logging("graph-gateway", conf.log_json);
tracing::info!("gateway ID: {}", gateway_id);
tracing::info!("gateway ID: {:?}", tap_signer);
tracing::debug!(config = %conf_repr);

let http_client = reqwest::Client::builder()
Expand Down Expand Up @@ -139,12 +139,15 @@ async fn main() {
Box::leak(Box::new(Budgeter::new(USD(conf.query_fees_target))));

let reporter = reports::Reporter::create(
gateway_id.clone(),
tap_signer,
conf.graph_env_id,
conf.query_fees_target,
"gateway_client_query_results",
"gateway_indexer_attempts",
"gateway_attestations",
reports::Topics {
client_request: "gateway_client_query_results",
indexer_request: "gateway_indexer_attempts",
attestation: "gateway_attestations",
indexer_fees: "gateway_indexer_fees",
},
conf.kafka,
)
.unwrap();
Expand Down Expand Up @@ -229,7 +232,7 @@ async fn main() {
// Set up the query tracing span
.layer(RequestTracingLayer)
// Set the query ID on the request
.layer(SetRequestIdLayer::new(gateway_id))
.layer(SetRequestIdLayer::new(format!("{:?}", tap_signer)))
// Handle legacy in-path auth, and convert it into a header
.layer(middleware::from_fn(legacy_auth_adapter))
// Require the query to be authorized
Expand Down
76 changes: 56 additions & 20 deletions src/reports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,42 @@ pub struct IndexerRequest {
}

pub struct Reporter {
pub gateway_id: String,
pub tap_signer: Address,
pub graph_env: String,
pub budget: String,
pub client_request_topic: &'static str,
pub indexer_request_topic: &'static str,
pub attestation_topic: &'static str,
pub topics: Topics,
pub write_buf: Vec<u8>,
pub kafka_producer: rdkafka::producer::ThreadedProducer<
rdkafka::producer::DefaultProducerContext,
rdkafka::producer::NoCustomPartitioner,
>,
}

pub struct Topics {
pub client_request: &'static str,
pub indexer_request: &'static str,
pub attestation: &'static str,
pub indexer_fees: &'static str,
}

impl Reporter {
#[allow(clippy::too_many_arguments)]
pub fn create(
gateway_id: String,
tap_signer: Address,
graph_env: String,
budget: NotNan<f64>,
client_request_topic: &'static str,
indexer_request_topic: &'static str,
attestation_topic: &'static str,
topics: Topics,
kafka_config: impl Into<rdkafka::ClientConfig>,
) -> anyhow::Result<mpsc::UnboundedSender<ClientRequest>> {
let kafka_producer = kafka_config
.into()
.create()
.context("kafka producer error")?;
let mut reporter = Self {
gateway_id,
tap_signer,
graph_env,
budget: budget.to_string(),
client_request_topic,
indexer_request_topic,
attestation_topic,
topics,
write_buf: Default::default(),
kafka_producer,
};
Expand Down Expand Up @@ -119,8 +121,9 @@ impl Reporter {
.first()
.map(|i| i.subgraph_chain.as_str())
.unwrap_or("");
let gateway_id = format!("{:?}", self.tap_signer);
let client_request_payload = json!({
"gateway_id": &self.gateway_id,
"gateway_id": &gateway_id,
"query_id": &client_request.id,
"ray_id": &client_request.id,
"network_chain": &self.graph_env,
Expand Down Expand Up @@ -166,7 +169,7 @@ impl Reporter {
};

let indexer_request_payload = json!({
"gateway_id": &self.gateway_id,
"gateway_id": &gateway_id,
"query_id": &client_request.id,
"ray_id": &client_request.id,
"network_chain": &self.graph_env,
Expand All @@ -192,17 +195,38 @@ impl Reporter {
});
serde_json::to_writer(&mut self.write_buf, &indexer_request_payload).unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.indexer_request_topic)
rdkafka::producer::BaseRecord::to(self.topics.indexer_request)
.payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!(
"failed to send to topic {}",
self.indexer_request_topic
self.topics.indexer_request
))?;
self.write_buf.clear();

if matches!(&indexer_request.receipt, Receipt::TAP(_)) {
IndexerFeesProtobuf {
signer: self.tap_signer.to_vec(),
receiver: indexer_request.indexer.to_vec(),
fee_grt: indexer_request.receipt.grt_value() as f64 * 1e-18,
}
.encode(&mut self.write_buf)
.unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.topics.indexer_fees)
.payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!(
"failed to send to topic {}",
self.topics.indexer_fees
))?;
self.write_buf.clear();
}

if let Some((original_response, attestation)) = indexer_request
.result
.ok()
Expand All @@ -225,28 +249,28 @@ impl Reporter {
.encode(&mut self.write_buf)
.unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.attestation_topic)
rdkafka::producer::BaseRecord::to(self.topics.attestation)
.payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!(
"failed to send to topic {}",
self.attestation_topic
self.topics.attestation
))?;
self.write_buf.clear();
}
}

serde_json::to_writer(&mut self.write_buf, &client_request_payload).unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.client_request_topic).payload(&self.write_buf);
rdkafka::producer::BaseRecord::to(self.topics.client_request).payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!(
"failed to send to topic {}",
self.client_request_topic
self.topics.client_request
))?;
self.write_buf.clear();

Expand Down Expand Up @@ -276,3 +300,15 @@ pub struct AttestationProtobuf {
#[prost(bytes, tag = "7")]
signature: Vec<u8>,
}

#[derive(Clone, PartialEq, prost::Message)]
pub struct IndexerFeesProtobuf {
/// 20 bytes (address)
#[prost(bytes, tag = "1")]
signer: Vec<u8>,
/// 20 bytes (address)
#[prost(bytes, tag = "2")]
receiver: Vec<u8>,
#[prost(double, tag = "3")]
fee_grt: f64,
}

0 comments on commit ceeb53f

Please sign in to comment.