Skip to content

Commit

Permalink
enhancement(chronicle-endpoints): add support for all Google SecOps r…
Browse files Browse the repository at this point in the history
…egions endpoints in the chronicle unstructured log sink
  • Loading branch information
ChocPanda committed Dec 13, 2024
1 parent 7e93489 commit afeea1b
Show file tree
Hide file tree
Showing 4 changed files with 363 additions and 3 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions changelog.d/add_chronicle_regional_endpoints.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add support for more chronicle regional endpoints as listed - https://cloud.google.com/chronicle/docs/reference/ingestion-api#regional_endpoints

authors: chocpanda
280 changes: 280 additions & 0 deletions src/sinks/gcp_chronicle/chronicle_udm_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
//! This sink sends data to Google Chronicles UDM Events log entries endpoint.
//! See <https://cloud.google.com/chronicle/docs/reference/ingestion-api#udmevents>
//! for more information.
use bytes::{Bytes, BytesMut};

use futures_util::{future::BoxFuture, task::Poll};
use goauth::scopes::Scope;
use http::header::{self, HeaderName, HeaderValue};
use http::{Request, StatusCode, Uri};
use hyper::Body;
use indoc::indoc;
use serde::Serialize;
use serde_json::json;
use snafu::Snafu;
use std::collections::HashMap;
use std::io;
use tokio_util::codec::Encoder as _;
use tower::{Service, ServiceBuilder};
use vector_lib::configurable::configurable_component;
use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
use vector_lib::{
config::{telemetry, AcknowledgementsConfig, Input},
event::{Event, EventFinalizers, Finalizable},
sink::VectorSink,
EstimatedJsonEncodedSizeOf,
};
use vrl::value::Kind;

use crate::sinks::util::service::TowerRequestConfigDefaults;
use crate::{
codecs::{self, EncodingConfig},
config::{GenerateConfig, SinkConfig, SinkContext},
gcp::{GcpAuthConfig, GcpAuthenticator},
http::HttpClient,
schema,
sinks::{
gcp_chronicle::{
compression::ChronicleCompression,
partitioner::{ChroniclePartitionKey, ChroniclePartitioner},
sink::ChronicleSink,
},
gcs_common::{
config::{healthcheck_response, GcsRetryLogic},
service::GcsResponse,
},
util::{
encoding::{as_tracked_write, Encoder},
metadata::RequestMetadataBuilder,
request_builder::EncodeResult,
BatchConfig, Compression, RequestBuilder, SinkBatchSettings, TowerRequestConfig,
},
Healthcheck,
},
template::{Template, TemplateParseError},
tls::{TlsConfig, TlsSettings},
};

#[derive(Clone, Copy, Debug, Default)]
pub struct ChronicleUDMEventsDefaultBatchSettings;

// Chronicle Ingestion API has a 1MB limit[1] for UDMEvents log entries. We're also using a
// conservatively low batch timeout to ensure events make it to Chronicle in a timely fashion, but
// high enough that it allows for reasonable batching.
//
// [1]: https://cloud.google.com/chronicle/docs/reference/ingestion-api#UDMEventslogentries
impl SinkBatchSettings for ChronicleUDMEventsDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = None;
const MAX_BYTES: Option<usize> = Some(1_000_000);
const TIMEOUT_SECS: f64 = 15.0;
}

#[derive(Clone, Copy, Debug)]
pub struct ChronicleUDMEventsTowerRequestConfigDefaults;

impl TowerRequestConfigDefaults for ChronicleUDMEventsTowerRequestConfigDefaults {
const RATE_LIMIT_NUM: u64 = 1_000;
}

/// Configuration for the `gcp_chronicle_UDMEvents` sink.
#[configurable_component(sink(
"gcp_chronicle_UDMEvents",
"Store UDMEvents log events in Google Chronicle."
))]
#[derive(Clone, Debug)]
pub struct ChronicleUDMEventsConfig {
/// The endpoint to send data to.
#[configurable(metadata(
docs::examples = "127.0.0.1:8080",
docs::examples = "example.com:12345"
))]
pub endpoint: Option<String>,

/// The GCP region to use.
#[configurable(derived)]
pub region: Option<Region>,

/// The Unique identifier (UUID) corresponding to the Chronicle instance.
#[configurable(validation(format = "uuid"))]
#[configurable(metadata(docs::examples = "c8c65bfa-5f2c-42d4-9189-64bb7b939f2c"))]
pub customer_id: String,

/// User-configured environment namespace to identify the data domain the logs originated from.
#[configurable(metadata(docs::templateable))]
#[configurable(metadata(
docs::examples = "production",
docs::examples = "production-{{ namespace }}",
))]
#[configurable(metadata(docs::advanced))]
pub namespace: Option<Template>,

/// A set of labels that are attached to each batch of events.
#[configurable(metadata(docs::examples = "chronicle_labels_examples()"))]
#[configurable(metadata(docs::additional_props_description = "A Chronicle label."))]
pub labels: Option<HashMap<String, String>>,

#[serde(flatten)]
pub auth: GcpAuthConfig,

#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<ChronicleUDMEventsDefaultBatchSettings>,

#[configurable(derived)]
pub encoding: EncodingConfig,

#[serde(default)]
#[configurable(derived)]
pub compression: ChronicleCompression,

#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig<ChronicleUDMEventsTowerRequestConfigDefaults>,

#[configurable(derived)]
pub tls: Option<TlsConfig>,

/// The type of log entries in a request.
///
/// This must be one of the [supported log types][UDMEvents_log_types_doc], otherwise
/// Chronicle rejects the entry with an error.
///
/// [UDMEvents_log_types_doc]: https://cloud.google.com/chronicle/docs/ingestion/parser-list/supported-default-parsers
#[configurable(metadata(docs::examples = "WINDOWS_DNS", docs::examples = "{{ log_type }}"))]
pub log_type: Template,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
acknowledgements: AcknowledgementsConfig,
}

fn chronicle_labels_examples() -> HashMap<String, String> {
let mut examples = HashMap::new();
examples.insert("source".to_string(), "vector".to_string());
examples.insert("tenant".to_string(), "marketing".to_string());
examples
}

impl GenerateConfig for ChronicleUDMEventsConfig {
fn generate_config() -> toml::Value {
toml::from_str(indoc! {r#"
credentials_path = "/path/to/credentials.json"
customer_id = "customer_id"
namespace = "namespace"
compression = "gzip"
log_type = "log_type"
encoding.codec = "text"
"#})
.unwrap()
}
}

pub fn build_healthcheck(
client: HttpClient,
base_url: &str,
auth: GcpAuthenticator,
) -> crate::Result<Healthcheck> {
let uri = base_url.parse::<Uri>()?;

let healthcheck = async move {
let mut request = http::Request::get(&uri).body(Body::empty())?;
auth.apply(&mut request);

let response = client.send(request).await?;
healthcheck_response(response, GcsHealthcheckError::NotFound.into())
};

Ok(Box::pin(healthcheck))
}

#[derive(Debug, Snafu)]
pub enum ChronicleError {
#[snafu(display("Region or endpoint not defined"))]
RegionOrEndpoint,
#[snafu(display("You can only specify one of region or endpoint"))]
BothRegionAndEndpoint,
}

#[async_trait::async_trait]
#[typetag::serde(name = "gcp_chronicle_UDMEvents")]
impl SinkConfig for ChronicleUDMEventsConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let creds = self.auth.build(Scope::MalachiteIngestion).await?;

let tls = TlsSettings::from_options(&self.tls)?;
let client = HttpClient::new(tls, cx.proxy())?;

let endpoint = self.create_endpoint("v2/UDMEventslogentries:batchCreate")?;

// For the healthcheck we see if we can fetch the list of available log types.
let healthcheck_endpoint = self.create_endpoint("v2/logtypes")?;

let healthcheck = build_healthcheck(client.clone(), &healthcheck_endpoint, creds.clone())?;
creds.spawn_regenerate_token();
let sink = self.build_sink(client, endpoint, creds)?;

Ok((sink, healthcheck))
}

fn input(&self) -> Input {
let requirement =
schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp());

Input::log().with_schema_requirement(requirement)
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}

impl ChronicleUDMEventsConfig {
fn build_sink(
&self,
client: HttpClient,
base_url: String,
creds: GcpAuthenticator,
) -> crate::Result<VectorSink> {
use crate::sinks::util::service::ServiceBuilderExt;

let request = self.request.into_settings();

let batch_settings = self.batch.into_batcher_settings()?;

let partitioner = self.partitioner()?;

let svc = ServiceBuilder::new()
.settings(request, GcsRetryLogic)
.service(ChronicleService::new(client, base_url, creds));

let request_settings = ChronicleRequestBuilder::new(self)?;

let sink = ChronicleSink::new(svc, request_settings, partitioner, batch_settings, "http");

Ok(VectorSink::from_event_streamsink(sink))
}

fn partitioner(&self) -> crate::Result<ChroniclePartitioner> {
Ok(ChroniclePartitioner::new(
self.log_type.clone(),
self.namespace.clone(),
))
}

fn create_endpoint(&self, path: &str) -> Result<String, ChronicleError> {
Ok(format!(
"{}/{}",
match (&self.endpoint, self.region) {
(Some(endpoint), None) => endpoint.trim_end_matches('/'),
(None, Some(region)) => region.endpoint(),
(Some(_), Some(_)) => return Err(ChronicleError::BothRegionAndEndpoint),
(None, None) => return Err(ChronicleError::RegionOrEndpoint),
},
path
))
}
}
Loading

0 comments on commit afeea1b

Please sign in to comment.