Skip to content

Commit

Permalink
Merge pull request #6 from G-Core/variable_unix_socket_reconnect_timeout
Browse files Browse the repository at this point in the history
setting variable reconnect timeout
  • Loading branch information
ruslanti authored Jul 3, 2024
2 parents e808984 + 9104e8a commit b43f646
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions nginx_module/src/unix_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{
borrow::Cow, cell::RefCell, collections::VecDeque, marker::PhantomPinned, mem::MaybeUninit,
os::unix::ffi::OsStrExt, path::Path, pin::Pin,
};

use libc::{c_void, sockaddr_un};

use crate::{
Expand All @@ -28,6 +27,7 @@ enum State {
},
Disconnected {
event: Box<ngx_event_t>,
reconnect_timeout: usize
},
}

Expand Down Expand Up @@ -57,7 +57,8 @@ struct WriteBuffer {
end: u32,
}

const TIMEOUT_MS: usize = 500;
const MIN_TIMEOUT_MS: usize = 1;
const TIMEOUT_MS: usize = 255;

impl UnixSocket {
pub fn connect(
Expand All @@ -82,17 +83,17 @@ impl UnixSocket {
Box::new(MaybeUninit::zeroed().assume_init());
ev.handler = Some(on_reconnect_timeout);
ev.log = (*ngx_cycle).log;
ngx_event_add_timer(&mut *ev, TIMEOUT_MS);
State::Disconnected { event: ev }
ngx_event_add_timer(&mut *ev, MIN_TIMEOUT_MS);
State::Disconnected { event: ev, reconnect_timeout: MIN_TIMEOUT_MS }
},
}
}
None => unsafe {
let mut ev: Box<ngx_event_t> = Box::new(MaybeUninit::zeroed().assume_init());
ev.handler = Some(on_reconnect_timeout);
ev.log = (*ngx_cycle).log;
ngx_event_add_timer(&mut *ev, TIMEOUT_MS);
State::Disconnected { event: ev }
ngx_event_add_timer(&mut *ev, MIN_TIMEOUT_MS);
State::Disconnected { event: ev, reconnect_timeout: MIN_TIMEOUT_MS }
},
};

Expand Down Expand Up @@ -120,7 +121,7 @@ impl UnixSocket {
(*(**conn).write).handler = Some(on_write);
(*(**conn).read).handler = Some(on_read);
},
State::Disconnected { event } => {
State::Disconnected { event, .. } => {
event.data = (&*inner as *const Inner as *mut Inner).cast();
(inner.after_handshake.borrow_mut())(); // discard send data, it is disconnected
}
Expand All @@ -139,7 +140,7 @@ impl UnixSocket {
let mut dummy_ev: Box<ngx_event_t> = Box::new(MaybeUninit::zeroed().assume_init());
dummy_ev.handler = None;
dummy_ev.log = (*ngx_cycle).log;
*self.0.state.borrow_mut() = State::Disconnected { event: dummy_ev }
*self.0.state.borrow_mut() = State::Disconnected { event: dummy_ev, reconnect_timeout: MIN_TIMEOUT_MS}
}
}
}
Expand Down Expand Up @@ -221,7 +222,7 @@ impl Drop for State {
ngx_close_connection(*conn);
}
},
Self::Disconnected { event } => unsafe {
Self::Disconnected { event, .. } => unsafe {
if event.active() != 0 {
ngx_event_del_timer(event.as_mut());
}
Expand Down Expand Up @@ -278,6 +279,7 @@ impl Inner {
if let Err(Disconnected) = unsafe { buffers.send(*conn) } {
*state = State::Disconnected {
event: self.create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS
};
}
}
Expand All @@ -290,7 +292,7 @@ impl Inner {
ev.log = (*ngx_cycle).log;
ev.data = (self as *const Self as *mut Self).cast();

ngx_event_add_timer(&mut *ev, TIMEOUT_MS);
ngx_event_add_timer(&mut *ev, 0);

ev
}
Expand Down Expand Up @@ -472,6 +474,7 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) {
Err(_) => {
*state = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
}
}
}
Expand All @@ -488,6 +491,7 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) {
if !back_data.is_empty() && (*data).write(&back_data).is_err() {
*(*data).state.borrow_mut() = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
}
}
Expand All @@ -497,26 +501,30 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) {
if !back_data.is_empty() && (*data).write(&back_data).is_err() {
*(*data).state.borrow_mut() = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
}
}
} else if result == 0 {
// error, signal connection close
*(*data).state.borrow_mut() = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
break;
} else if result == NGX_AGAIN as isize {
if ngx_handle_read_event(conn.read, 0) != NGX_OK as isize {
*(*data).state.borrow_mut() = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
}
break;
} else {
// error, retry reconnect
*(*data).state.borrow_mut() = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
break;
}
Expand All @@ -530,7 +538,10 @@ unsafe extern "C" fn on_reconnect_timeout(ev: *mut ngx_event_t) {
if !data.is_null() {
let data = &*data;
let mut state = data.state.borrow_mut();
if let State::Disconnected { .. } = &*state {
if let State::Disconnected { reconnect_timeout, .. } = &mut *state {
if *reconnect_timeout < TIMEOUT_MS {
*reconnect_timeout = (*reconnect_timeout) * 2
};
if let Some(conn) = State::try_connect(
&data.path,
&data.name,
Expand All @@ -554,17 +565,18 @@ unsafe extern "C" fn on_reconnect_timeout(ev: *mut ngx_event_t) {
&& ngx_exiting == 0
&& ngx_terminate == 0
{
ngx_event_add_timer(&mut *ev, TIMEOUT_MS);
ngx_event_add_timer(&mut *ev, *reconnect_timeout);
}
State::Disconnected { event: ev }

State::Disconnected { event: ev, reconnect_timeout: *reconnect_timeout }
},
};
} else if (*ev).timer_set() == 0
&& ngx_quit == 0
&& ngx_exiting == 0
&& ngx_terminate == 0
{
ngx_event_add_timer(ev, TIMEOUT_MS);
ngx_event_add_timer(ev, *reconnect_timeout);
}
}
}
Expand Down

0 comments on commit b43f646

Please sign in to comment.