Skip to content

Commit

Permalink
Merge branch 'main' into subscription-doc
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Boayue committed Nov 8, 2024
2 parents 95445f8 + fffdde0 commit c453d0b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl Client {
/// }
/// ```
pub fn contract_details(&self, contract: &Contract) -> Result<Vec<contracts::ContractDetails>, Error> {
Ok(contracts::contract_details(self, contract)?)
contracts::contract_details(self, contract)
}

/// Get current [FamilyCode]s for all accessible accounts.
Expand Down
60 changes: 31 additions & 29 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ pub(crate) trait MessageBus: Send + Sync {
}
}

type Response = Result<ResponseMessage, Error>;

// For requests without an identifier, shared channels are created
// to route request/response pairs based on message type.
#[derive(Debug)]
struct SharedChannels {
// Maps an inbound reply to channel used to send responses.
senders: HashMap<IncomingMessages, Vec<Arc<Sender<Result<ResponseMessage, Error>>>>>,
senders: HashMap<IncomingMessages, Vec<Arc<Sender<Response>>>>,
// Maps an outbound request to channel used to receive responses.
receivers: HashMap<OutgoingMessages, Arc<Receiver<Result<ResponseMessage, Error>>>>,
receivers: HashMap<OutgoingMessages, Arc<Receiver<Response>>>,
}

impl SharedChannels {
Expand All @@ -83,7 +85,7 @@ impl SharedChannels {

// Maps an outgoing message to incoming message(s)
fn register(&mut self, outbound: OutgoingMessages, inbounds: &[IncomingMessages]) {
let (sender, receiver) = channel::unbounded::<Result<ResponseMessage, Error>>();
let (sender, receiver) = channel::unbounded::<Response>();

self.receivers.insert(outbound, Arc::new(receiver));

Expand All @@ -98,7 +100,7 @@ impl SharedChannels {
}

// Get receiver for specified message type. Panics if receiver not found.
fn get_receiver(&self, message_type: OutgoingMessages) -> Arc<Receiver<Result<ResponseMessage, Error>>> {
fn get_receiver(&self, message_type: OutgoingMessages) -> Arc<Receiver<Response>> {
let receiver = self
.receivers
.get(&message_type)
Expand All @@ -123,7 +125,7 @@ impl SharedChannels {
}

// Notify all senders with a given message
fn notify_all(&self, message: &Result<ResponseMessage, Error>) {
fn notify_all(&self, message: &Response) {
for senders in self.senders.values() {
for sender in senders {
if let Err(e) = sender.send(message.clone()) {
Expand All @@ -145,9 +147,9 @@ pub enum Signal {
pub struct TcpMessageBus {
connection: Connection,
handles: Mutex<Vec<JoinHandle<()>>>,
requests: SenderHash<i32, Result<ResponseMessage, Error>>,
orders: SenderHash<i32, Result<ResponseMessage, Error>>,
executions: SenderHash<String, Result<ResponseMessage, Error>>,
requests: SenderHash<i32, Response>,
orders: SenderHash<i32, Response>,
executions: SenderHash<String, Response>,
shared_channels: SharedChannels,
signals_send: Sender<Signal>,
signals_recv: Receiver<Signal>,
Expand Down Expand Up @@ -211,7 +213,7 @@ impl TcpMessageBus {
debug!("released order_id {}, orders.len()={}", order_id, self.orders.len());
}

fn read_message(&self) -> Result<ResponseMessage, Error> {
fn read_message(&self) -> Response {
self.connection.read_message()
}

Expand Down Expand Up @@ -612,18 +614,18 @@ impl<K: std::hash::Hash + Eq + std::fmt::Debug, V: std::fmt::Debug + Clone> Send
// Enables routing of response messages from TWS to Client
#[derive(Debug, Default)]
pub(crate) struct InternalSubscription {
receiver: Option<Receiver<Result<ResponseMessage, Error>>>, // requests with request ids receive responses via this channel
sender: Option<Sender<Result<ResponseMessage, Error>>>, // requests with request ids receive responses via this channel
shared_receiver: Option<Arc<Receiver<Result<ResponseMessage, Error>>>>, // this channel is for responses that share channel based on message type
signaler: Option<Sender<Signal>>, // for client to signal termination
pub(crate) request_id: Option<i32>, // initiating request id
pub(crate) order_id: Option<i32>, // initiating order id
pub(crate) message_type: Option<OutgoingMessages>, // initiating message type
receiver: Option<Receiver<Response>>, // requests with request ids receive responses via this channel
sender: Option<Sender<Response>>, // requests with request ids receive responses via this channel
shared_receiver: Option<Arc<Receiver<Response>>>, // this channel is for responses that share channel based on message type
signaler: Option<Sender<Signal>>, // for client to signal termination
pub(crate) request_id: Option<i32>, // initiating request id
pub(crate) order_id: Option<i32>, // initiating order id
pub(crate) message_type: Option<OutgoingMessages>, // initiating message type
}

impl InternalSubscription {
// Blocks until next message become available.
pub(crate) fn next(&self) -> Option<Result<ResponseMessage, Error>> {
pub(crate) fn next(&self) -> Option<Response> {
if let Some(receiver) = &self.receiver {
Self::receive(receiver)
} else if let Some(receiver) = &self.shared_receiver {
Expand All @@ -634,7 +636,7 @@ impl InternalSubscription {
}

// Returns message if available or immediately returns None.
pub(crate) fn try_next(&self) -> Option<Result<ResponseMessage, Error>> {
pub(crate) fn try_next(&self) -> Option<Response> {
if let Some(receiver) = &self.receiver {
Self::try_receive(receiver)
} else if let Some(receiver) = &self.shared_receiver {
Expand All @@ -645,7 +647,7 @@ impl InternalSubscription {
}

// Waits for next message until specified timeout.
pub(crate) fn next_timeout(&self, timeout: Duration) -> Option<Result<ResponseMessage, Error>> {
pub(crate) fn next_timeout(&self, timeout: Duration) -> Option<Response> {
if let Some(receiver) = &self.receiver {
Self::timeout_receive(receiver, timeout)
} else if let Some(receiver) = &self.shared_receiver {
Expand All @@ -664,15 +666,15 @@ impl InternalSubscription {
// TODO - shared sender
}

fn receive(receiver: &Receiver<Result<ResponseMessage, Error>>) -> Option<Result<ResponseMessage, Error>> {
fn receive(receiver: &Receiver<Response>) -> Option<Response> {
receiver.recv().ok()
}

fn try_receive(receiver: &Receiver<Result<ResponseMessage, Error>>) -> Option<Result<ResponseMessage, Error>> {
fn try_receive(receiver: &Receiver<Response>) -> Option<Response> {
receiver.try_recv().ok()
}

fn timeout_receive(receiver: &Receiver<Result<ResponseMessage, Error>>, timeout: Duration) -> Option<Result<ResponseMessage, Error>> {
fn timeout_receive(receiver: &Receiver<Response>, timeout: Duration) -> Option<Response> {
receiver.recv_timeout(timeout).ok()
}
}
Expand All @@ -692,9 +694,9 @@ impl Drop for InternalSubscription {
}

pub(crate) struct SubscriptionBuilder {
receiver: Option<Receiver<Result<ResponseMessage, Error>>>,
sender: Option<Sender<Result<ResponseMessage, Error>>>,
shared_receiver: Option<Arc<Receiver<Result<ResponseMessage, Error>>>>,
receiver: Option<Receiver<Response>>,
sender: Option<Sender<Response>>,
shared_receiver: Option<Arc<Receiver<Response>>>,
signaler: Option<Sender<Signal>>,
order_id: Option<i32>,
request_id: Option<i32>,
Expand All @@ -714,17 +716,17 @@ impl SubscriptionBuilder {
}
}

pub(crate) fn receiver(mut self, receiver: Receiver<Result<ResponseMessage, Error>>) -> Self {
pub(crate) fn receiver(mut self, receiver: Receiver<Response>) -> Self {
self.receiver = Some(receiver);
self
}

pub(crate) fn sender(mut self, sender: Sender<Result<ResponseMessage, Error>>) -> Self {
pub(crate) fn sender(mut self, sender: Sender<Response>) -> Self {
self.sender = Some(sender);
self
}

pub(crate) fn shared_receiver(mut self, shared_receiver: Arc<Receiver<Result<ResponseMessage, Error>>>) -> Self {
pub(crate) fn shared_receiver(mut self, shared_receiver: Arc<Receiver<Response>>) -> Self {
self.shared_receiver = Some(shared_receiver);
self
}
Expand Down Expand Up @@ -892,7 +894,7 @@ impl Connection {
Ok(())
}

fn read_message(&self) -> Result<ResponseMessage, Error> {
fn read_message(&self) -> Response {
let mut reader = self.reader.lock()?;

let message_size = read_header(&reader)?;
Expand Down

0 comments on commit c453d0b

Please sign in to comment.