Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Ingestion with Time Partition - Till 1 month older events #790

Merged
merged 5 commits into from
May 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::handlers::http::base_path_without_preceding_slash;
use crate::option::CONFIG;
use crate::{
catalog::manifest::Manifest,
event::DEFAULT_TIMESTAMP_KEY,
query::PartialTimeFilter,
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
};
Expand All @@ -41,8 +42,11 @@ pub trait Snapshot {
}

pub trait ManifestFile {
#[allow(unused)]
fn file_name(&self) -> &str;
#[allow(unused)]
fn ingestion_size(&self) -> u64;
#[allow(unused)]
fn file_size(&self) -> u64;
fn num_rows(&self) -> u64;
fn columns(&self) -> &[Column];
Expand Down Expand Up @@ -70,14 +74,17 @@ impl ManifestFile for manifest::File {
}
}

fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
fn get_file_bounds(
file: &manifest::File,
partition_column: String,
) -> (DateTime<Utc>, DateTime<Utc>) {
match file
.columns()
.iter()
.find(|col| col.name == "p_timestamp")
.find(|col| col.name == partition_column)
.unwrap()
.stats
.clone()
.as_ref()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
Expand All @@ -95,8 +102,19 @@ pub async fn update_snapshot(
) -> Result<(), ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let meta_clone = meta.clone();
let manifests = &mut meta.snapshot.manifest_list;
let (lower_bound, _) = get_file_bounds(&change);
let time_partition: Option<String> = meta_clone.time_partition;
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(&change, time_partition);
lower_bound
}
None => {
let (lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string());
lower_bound
}
};
let pos = manifests.iter().position(|item| {
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
});
Expand Down Expand Up @@ -239,6 +257,7 @@ pub async fn get_first_event(
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;
let time_partition = meta.time_partition;
if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
Expand All @@ -257,7 +276,17 @@ pub async fn get_first_event(
));
};
if let Some(first_event) = manifest.files.first() {
let (lower_bound, _) = get_file_bounds(first_event);
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
lower_bound
}
None => {
let (lower_bound, _) =
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
lower_bound
}
};
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
}
}
Expand Down
23 changes: 18 additions & 5 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::sync::Arc;
use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
use crate::metadata;
use chrono::NaiveDateTime;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
Expand All @@ -41,19 +42,30 @@ pub struct Event {
pub origin_format: &'static str,
pub origin_size: u64,
pub is_first_event: bool,
pub parsed_timestamp: NaiveDateTime,
pub time_partition: Option<String>,
}

// Events holds the schema related to a each event for a single log stream
impl Event {
pub async fn process(self) -> Result<(), EventError> {
let key = get_schema_key(&self.rb.schema().fields);
let num_rows = self.rb.num_rows() as u64;
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
key = format!("{key}{parsed_timestamp_to_min}");
}

let num_rows = self.rb.num_rows() as u64;
if self.is_first_event {
commit_schema(&self.stream_name, self.rb.schema())?;
}

Self::process_event(&self.stream_name, &key, self.rb.clone())?;
Self::process_event(
&self.stream_name,
&key,
self.rb.clone(),
self.parsed_timestamp,
)?;

metadata::STREAM_INFO.update_stats(
&self.stream_name,
Expand All @@ -80,8 +92,9 @@ impl Event {
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), EventError> {
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb)?;
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?;
Ok(())
}
}
Expand All @@ -90,7 +103,7 @@ pub fn get_schema_key(fields: &[Arc<Field>]) -> String {
// Fields must be sorted
let mut hasher = xxhash_rust::xxh3::Xxh3::new();
for field in fields.iter().sorted_by_key(|v| v.name()) {
hasher.update(field.name().as_bytes())
hasher.update(field.name().as_bytes());
}
let hash = hasher.digest();
format!("{hash:x}")
Expand Down
36 changes: 32 additions & 4 deletions server/src/event/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ pub trait EventFormat: Sized {
fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
fn into_recordbatch(
self,
storage_schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(
storage_schema.clone(),
time_partition,
static_schema_flag.clone(),
time_partition.clone(),
)?;

if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
Expand Down Expand Up @@ -96,10 +96,11 @@ pub trait EventFormat: Sized {
)));

// prepare the record batch and new fields to be added
let new_schema = Arc::new(Schema::new(schema));
let mut new_schema = Arc::new(Schema::new(schema));
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
return Err(anyhow!("Schema mismatch"));
}
new_schema = update_field_type_in_schema(new_schema, time_partition);
let rb = Self::decode(data, new_schema.clone())?;
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
let metadata_arr =
Expand Down Expand Up @@ -143,3 +144,30 @@ pub trait EventFormat: Sized {
true
}
}

pub fn update_field_type_in_schema(
schema: Arc<Schema>,
time_partition: Option<String>,
) -> Arc<Schema> {
if time_partition.is_none() {
return schema;
}
let field_name = time_partition.unwrap();
let new_schema: Vec<Field> = schema
.fields()
.iter()
.map(|field| {
if *field.name() == field_name {
if field.data_type() == &DataType::Utf8 {
let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None);
Field::new(field.name().clone(), new_data_type, true)
} else {
Field::new(field.name(), field.data_type().clone(), true)
}
} else {
Field::new(field.name(), field.data_type().clone(), true)
}
})
.collect();
Arc::new(Schema::new(new_schema))
}
11 changes: 8 additions & 3 deletions server/src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ impl EventFormat for Event {
fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data, time_partition)?;
let data = flatten_json_body(self.data, None, false)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand All @@ -68,7 +68,12 @@ impl EventFormat for Event {
let schema = match derive_arrow_schema(&stream_schema, fields) {
Ok(schema) => schema,
Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) {
Ok(infer_schema) => {
Ok(mut infer_schema) => {
let new_infer_schema = super::super::format::update_field_type_in_schema(
Arc::new(infer_schema),
time_partition,
);
infer_schema = Schema::new(new_infer_schema.fields().clone());
if let Err(err) = Schema::try_merge(vec![
Schema::new(stream_schema.values().cloned().collect::<Fields>()),
infer_schema.clone(),
Expand Down
31 changes: 19 additions & 12 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use std::{
sync::{Arc, Mutex, RwLock},
};

use crate::utils;

use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter};
use crate::utils;
use arrow_array::{RecordBatch, TimestampMillisecondArray};
use arrow_schema::Schema;
use chrono::NaiveDateTime;
use chrono::Utc;
use derive_more::{Deref, DerefMut};
use once_cell::sync::Lazy;
Expand All @@ -48,6 +48,7 @@ impl Writer {
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
let rb = utils::arrow::replace_columns(
rb.schema(),
Expand All @@ -56,7 +57,8 @@ impl Writer {
&[Arc::new(get_timestamp_array(rb.num_rows()))],
);

self.disk.push(stream_name, schema_key, &rb)?;
self.disk
.push(stream_name, schema_key, &rb, parsed_timestamp)?;
self.mem.push(schema_key, rb);
Ok(())
}
Expand All @@ -72,29 +74,34 @@ impl WriterTable {
stream_name: &str,
schema_key: &str,
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
let hashmap_guard = self.read().unwrap();

match hashmap_guard.get(stream_name) {
Some(stream_writer) => {
stream_writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
stream_writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
)?;
}
None => {
drop(hashmap_guard);
let mut map = self.write().unwrap();
// check for race condition
// if map contains entry then just
if let Some(writer) = map.get(stream_name) {
writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
)?;
} else {
let mut writer = Writer::default();
writer.push(stream_name, schema_key, record)?;
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
map.insert(stream_name.to_owned(), Mutex::new(writer));
}
}
Expand Down
11 changes: 7 additions & 4 deletions server/src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::path::PathBuf;

use crate::storage::staging::StorageDir;

use super::errors::StreamWriterError;
use crate::storage::staging::StorageDir;
use chrono::NaiveDateTime;

pub struct ArrowWriter {
pub file_path: PathBuf,
Expand All @@ -43,6 +43,7 @@ impl FileWriter {
stream_name: &str,
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
match self.get_mut(schema_key) {
Some(writer) => {
Expand All @@ -54,7 +55,8 @@ impl FileWriter {
// entry is not present thus we create it
None => {
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?;
let (path, writer) =
init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?;
self.insert(
schema_key.to_owned(),
ArrowWriter {
Expand All @@ -79,9 +81,10 @@ fn init_new_stream_writer_file(
stream_name: &str,
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(PathBuf, StreamWriter<std::fs::File>), StreamWriterError> {
let dir = StorageDir::new(stream_name);
let path = dir.path_by_current_time(schema_key);
let path = dir.path_by_current_time(schema_key, parsed_timestamp);
std::fs::create_dir_all(dir.data_path)?;

let file = OpenOptions::new().create(true).append(true).open(&path)?;
Expand Down
Loading
Loading