-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added header values #51
Conversation
Signed-off-by: shubham <[email protected]>
Signed-off-by: shubham <[email protected]>
Signed-off-by: shubham <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add support to set headers inside user defined source.
Signed-off-by: shubham <[email protected]>
Signed-off-by: shubham <[email protected]>
examples/simple-source/src/main.rs
Outdated
let offset = self.read_idx.load(Ordering::Relaxed); | ||
|
||
let mut headers=HashMap::new(); | ||
headers.insert(String::from("key"),String::from("key")); | ||
// send the message to the transmitter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use some UUID instead of (key, key)
examples/simple-source/src/main.rs
Outdated
event_time: chrono::offset::Utc::now(), | ||
keys: vec![], | ||
headers:headers.clone() | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to clone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah cloning is not good ,but sadly due to ownership rules i dnt have any other option except cloning or using Arc ,I have used Arc as of now its thread safe too @yhl25
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can't we just give the ownership? are we using the headers anywhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem is every for loop iteration will be new scope ,if there was only one iteration it would have worked. @yhl25
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed the for loop.
Signed-off-by: shubham <[email protected]>
Signed-off-by: shubham <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run rustfmt please
examples/simple-source/src/main.rs
Outdated
@@ -52,7 +54,9 @@ pub(crate) mod simple_source { | |||
self.read_idx | |||
.store(self.read_idx.load(Ordering::Relaxed) + 1, Ordering::Relaxed); | |||
let offset = self.read_idx.load(Ordering::Relaxed); | |||
|
|||
let mut headers=HashMap::new(); | |||
headers.insert(String::from( Uuid::new_v4()),String::from( Uuid::new_v4())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the both key and value, UUID?
src/source.rs
Outdated
@@ -320,6 +325,9 @@ mod tests { | |||
async fn read(&self, request: SourceReadRequest, transmitter: Sender<Message>) { | |||
let event_time = Utc::now(); | |||
let mut message_offsets = Vec::with_capacity(request.count); | |||
let mut headers=HashMap::new(); | |||
headers.insert(String::from( Uuid::new_v4()),String::from( Uuid::new_v4())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
src/source.rs
Outdated
@@ -332,6 +340,7 @@ mod tests { | |||
partition_id: 0, | |||
}, | |||
keys: vec![], | |||
headers:Arc::clone(&shared_headers), // Cloning the Arc, not the HashMap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
headers:Arc::clone(&shared_headers), // Cloning the Arc, not the HashMap, | |
headers:Arc::clone(&shared_headers), |
no need of that comment since we can clearly see it is Arc::clone
Signed-off-by: shubham <[email protected]>
Signed-off-by: shubham <[email protected]>
Signed-off-by: shubham <[email protected]>
examples/simple-source/src/main.rs
Outdated
let mut headers = HashMap::new(); | ||
let header_key=String::from(Uuid::new_v4()); | ||
let header_value= String::from("numaflow"); | ||
headers.insert(header_key, header_value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let mut headers = HashMap::new(); | |
let header_key=String::from(Uuid::new_v4()); | |
let header_value= String::from("numaflow"); | |
headers.insert(header_key, header_value); | |
let mut headers = HashMap::new(); | |
let header_key=String::from("x-txn-id"); | |
let header_value= String::from(Uuid::new_v4()); | |
headers.insert(header_key, header_value); |
src/source.rs
Outdated
let mut headers = HashMap::new(); | ||
let header_key=String::from(Uuid::new_v4()); | ||
let header_value = String::from("numaflow"); | ||
headers.insert(header_key, header_value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let mut headers = HashMap::new(); | |
let header_key=String::from(Uuid::new_v4()); | |
let header_value = String::from("numaflow"); | |
headers.insert(header_key, header_value); | |
let mut headers = HashMap::new(); | |
let header_key=String::from("x-txn-id"); | |
let header_value = String::from(Uuid::new_v4()); | |
headers.insert(header_key, header_value); |
also move the headers inside the for loop, ideally each message will have different txn-id
Signed-off-by: shubham <[email protected]>
Signed-off-by: shubham <[email protected]>
This Pr Addresses this issue #48
map ,reduce and sourcetransform proto files have been modified to include header in incoming request.
How it is tested
Tested in local cluster