Skip to content

Commit

Permalink
feat(Rust): Implement TypeMeta part of the compatibility features (#1789
Browse files Browse the repository at this point in the history
)

## What does this PR do? 

### Refine code struct and implement some compatibility features
1. Split fury_meta file into individual files, as the file is too large
to be easily read. There are a lot of features should be implemented by
macro, split it is necessary.
2. Implement compatibility by the macro, which will now generate a
pattern match expression.
3. Write type meta to binary when serializing objects.
4. Read type meta for the pattern match which is generated at compile
time when deserializing.
5. Replace lazy_static by Once for it has been stabled in stdlib

### Some unimplemented features
1. Add a v-table for looking up generic methods by type_id, which is
used for support the Any type.
2. Support draining unused binary data when receiving an unregistered
type.
  • Loading branch information
theweipeng authored Aug 5, 2024
1 parent 6aa7686 commit 0e18130
Show file tree
Hide file tree
Showing 37 changed files with 847 additions and 570 deletions.
4 changes: 2 additions & 2 deletions rust/fury-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ rust-version.workspace = true
proc-macro2 = { default-features = false, version = "1.0" }
syn = { default-features = false, version = "2.0", features = ["full", "fold"] }
quote = { default-features = false, version = "1.0" }
lazy_static = { version = "1.4" }
byteorder = { version = "1.4" }
chrono = "0.4"
thiserror = { default-features = false, version = "1.0" }
num_enum = "0.5.1"


[[bench]]
Expand All @@ -38,4 +38,4 @@ harness = false

[dev-dependencies]
criterion = "0.5.1"
rand = "0.8.5"
rand = "0.8.5"
4 changes: 4 additions & 0 deletions rust/fury-core/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ impl<'bf> Reader<'bf> {
self.move_next(len as usize);
}

pub fn slice(&self) -> &[u8] {
self.bf
}

pub fn bytes(&mut self, len: usize) -> &'bf [u8] {
let result = &self.bf[self.cursor..self.cursor + len];
self.move_next(len);
Expand Down
3 changes: 3 additions & 0 deletions rust/fury-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ pub enum Error {
#[error("Unsupported Language Code; receive: {code:?}")]
UnsupportedLanguageCode { code: u8 },

#[error("Unsupported encoding of field name in type meta; receive: {code:?}")]
UnsupportedTypeMetaFieldNameEncoding { code: u8 },

#[error("encoded_data cannot be empty")]
EncodedDataEmpty,

Expand Down
49 changes: 37 additions & 12 deletions rust/fury-core/src/fury.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

use crate::buffer::{Reader, Writer};
use crate::error::Error;
use crate::read_state::ReadState;
use crate::resolver::context::ReadContext;
use crate::resolver::context::WriteContext;
use crate::serializer::Serializer;
use crate::types::Mode;
use crate::write_state::WriteState;
use crate::types::{config_flags, Language, Mode, SIZE_OF_REF_AND_TYPE};

pub struct Fury {
mode: Mode,
Expand All @@ -29,32 +29,57 @@ pub struct Fury {
impl Default for Fury {
fn default() -> Self {
Fury {
mode: Mode::Compatible,
mode: Mode::SchemaConsistent,
}
}
}

impl Fury {
pub fn mode(&mut self, mode: Mode) {
pub fn mode(mut self, mode: Mode) -> Self {
self.mode = mode;
self
}

pub fn get_mode(&self) -> &Mode {
&self.mode
}

pub fn write_head<T: Serializer>(&self, writer: &mut Writer) -> usize {
const HEAD_SIZE: usize = 10;
writer.reserve(<T as Serializer>::reserved_space() + SIZE_OF_REF_AND_TYPE + HEAD_SIZE);
let mut bitmap = 0;
bitmap |= config_flags::IS_LITTLE_ENDIAN_FLAG;
bitmap |= config_flags::IS_CROSS_LANGUAGE_FLAG;
writer.u8(bitmap);
writer.u8(Language::Rust as u8);
writer.skip(4); // meta offset
writer.len() - 4
}

fn read_head(&self, reader: &mut Reader) -> Result<u32, Error> {
let _bitmap = reader.u8();
let _language: Language = reader.u8().try_into()?;
Ok(reader.u32())
}

pub fn deserialize<T: Serializer>(&self, bf: &[u8]) -> Result<T, Error> {
let reader = Reader::new(bf);
let mut deserializer = ReadState::new(self, reader);
deserializer.head()?;
<T as Serializer>::deserialize(&mut deserializer)
let mut reader = Reader::new(bf);
let meta_offset = self.read_head(&mut reader)?;
let mut context = ReadContext::new(self, reader);
if meta_offset > 0 {
context.load_meta(meta_offset as usize);
}
<T as Serializer>::deserialize(&mut context)
}

pub fn serialize<T: Serializer>(&self, record: &T) -> Vec<u8> {
let mut writer = Writer::default();
let mut serializer = WriteState::new(self, &mut writer);
serializer.head::<T>();
<T as Serializer>::serialize(record, &mut serializer);
let meta_offset = self.write_head::<T>(&mut writer);
let mut context = WriteContext::new(self, &mut writer);
<T as Serializer>::serialize(record, &mut context);
if Mode::Compatible == self.mode {
context.write_meta(meta_offset);
}
writer.dump()
}
}
4 changes: 1 addition & 3 deletions rust/fury-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
pub mod buffer;
pub mod error;
pub mod fury;
pub mod internal;
pub mod meta;
pub mod read_state;
pub mod resolver;
pub mod row;
pub mod serializer;
pub mod types;
pub mod util;
pub mod write_state;
2 changes: 0 additions & 2 deletions rust/fury-core/src/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
// specific language governing permissions and limitations
// under the License.

mod meta_store;
mod meta_string;
mod string_util;
mod type_meta;

pub use meta_store::{MetaReaderStore, MetaWriterStore};
pub use meta_string::{Encoding, MetaString, MetaStringDecoder, MetaStringEncoder};
pub use type_meta::{FieldInfo, TypeMeta};
88 changes: 68 additions & 20 deletions rust/fury-core/src/meta/type_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
use super::meta_string::MetaStringEncoder;
use crate::buffer::{Reader, Writer};
use crate::error::Error;
use crate::meta::{Encoding, MetaStringDecoder};
use crate::types::FieldType;

//todo backward/forward compatibility
#[allow(dead_code)]
pub struct FieldInfo {
tag_id: u32,
field_name: String,
field_type: FieldType,
}
Expand All @@ -33,7 +33,34 @@ impl FieldInfo {
FieldInfo {
field_name: field_name.to_string(),
field_type,
tag_id: 0,
}
}

fn u8_to_encoding(value: u8) -> Result<Encoding, Error> {
match value {
0x00 => Ok(Encoding::Utf8),
0x01 => Ok(Encoding::AllToLowerSpecial),
0x02 => Ok(Encoding::LowerUpperDigitSpecial),
_ => Err(Error::UnsupportedTypeMetaFieldNameEncoding { code: value }),
}
}

fn from_bytes(reader: &mut Reader) -> FieldInfo {
let header = reader.u8();
let encoding = Self::u8_to_encoding((header & 0b11000) >> 3).unwrap();
let mut size = (header & 0b11100000) as i32 >> 5;
size = if size == 0b111 {
reader.var_int32() + 7
} else {
size
};
let type_id = reader.i16();
let field_name = MetaStringDecoder::new()
.decode(reader.bytes(size as usize), encoding)
.unwrap();
FieldInfo {
field_name,
field_type: FieldType::try_from(type_id).unwrap(),
}
}

Expand All @@ -42,7 +69,7 @@ impl FieldInfo {
let meta_string = MetaStringEncoder::new().encode(&self.field_name)?;
let mut header = 1 << 2;
let encoded = meta_string.bytes.as_slice();
let size = (encoded.len() - 1) as u32;
let size = encoded.len() as u32;
header |= (meta_string.encoding as u8) << 3;
let big_size = size >= 7;
if big_size {
Expand All @@ -53,17 +80,29 @@ impl FieldInfo {
header |= (size << 5) as u8;
writer.u8(header);
}
writer.i16(self.field_type as i16);
writer.bytes(encoded);
Ok(writer.dump())
}
}

struct TypeMetaLayer {
pub struct TypeMetaLayer {
type_id: u32,
field_info: Vec<FieldInfo>,
}

impl TypeMetaLayer {
pub fn new(type_id: u32, field_info: Vec<FieldInfo>) -> TypeMetaLayer {
TypeMetaLayer {
type_id,
field_info,
}
}

pub fn get_field_info(&self) -> &Vec<FieldInfo> {
&self.field_info
}

fn to_bytes(&self) -> Result<Vec<u8>, Error> {
let mut writer = Writer::default();
writer.var_int32(self.field_info.len() as i32);
Expand All @@ -73,6 +112,15 @@ impl TypeMetaLayer {
}
Ok(writer.dump())
}

fn from_bytes(reader: &mut Reader) -> TypeMetaLayer {
let field_num = reader.var_int32();
let type_id = reader.var_int32() as u32;
let field_info = (0..field_num)
.map(|_| FieldInfo::from_bytes(reader))
.collect();
TypeMetaLayer::new(type_id, field_info)
}
}

pub struct TypeMeta {
Expand All @@ -81,33 +129,33 @@ pub struct TypeMeta {
}

impl TypeMeta {
pub fn get_field_info(&self) -> &Vec<FieldInfo> {
self.layers.first().unwrap().get_field_info()
}

pub fn from_fields(type_id: u32, field_info: Vec<FieldInfo>) -> TypeMeta {
TypeMeta {
hash: 0,
layers: vec![TypeMetaLayer {
type_id,
field_info,
}],
layers: vec![TypeMetaLayer::new(type_id, field_info)],
}
}

pub fn from_bytes(reader: &mut Reader) -> TypeMeta {
let header = reader.u64();
let hash = header >> 8; // high 56bits indicate hash
let layer_count = header & 0b1111; // class count
let layers: Vec<TypeMetaLayer> = (0..layer_count)
.map(|_| TypeMetaLayer::from_bytes(reader))
.collect();
TypeMeta { hash, layers }
}

pub fn to_bytes(&self) -> Result<Vec<u8>, Error> {
let mut writer = Writer::default();
writer.u64((self.hash << 4) | (self.layers.len() as u64 & 0b1111));
writer.u64((self.hash << 8) | (self.layers.len() as u64 & 0b1111));
for layer in self.layers.iter() {
writer.bytes(layer.to_bytes()?.as_slice());
}
Ok(writer.dump())
}

pub fn read_hash_from_bytes(reader: &mut Reader) -> u64 {
reader.u64() >> 7
}

pub fn from_bytes(_reader: &mut Reader) -> TypeMeta {
TypeMeta {
hash: 0,
layers: Vec::new(),
}
}
}
66 changes: 0 additions & 66 deletions rust/fury-core/src/read_state.rs

This file was deleted.

Loading

0 comments on commit 0e18130

Please sign in to comment.