diff --git a/nginx_module/src/unix_socket.rs b/nginx_module/src/unix_socket.rs index 270d583..d8d499d 100644 --- a/nginx_module/src/unix_socket.rs +++ b/nginx_module/src/unix_socket.rs @@ -2,7 +2,6 @@ use std::{ borrow::Cow, cell::RefCell, collections::VecDeque, marker::PhantomPinned, mem::MaybeUninit, os::unix::ffi::OsStrExt, path::Path, pin::Pin, }; - use libc::{c_void, sockaddr_un}; use crate::{ @@ -28,6 +27,7 @@ enum State { }, Disconnected { event: Box, + reconnect_timeout: usize }, } @@ -57,7 +57,8 @@ struct WriteBuffer { end: u32, } -const TIMEOUT_MS: usize = 500; +const MIN_TIMEOUT_MS: usize = 1; +const TIMEOUT_MS: usize = 255; impl UnixSocket { pub fn connect( @@ -82,8 +83,8 @@ impl UnixSocket { Box::new(MaybeUninit::zeroed().assume_init()); ev.handler = Some(on_reconnect_timeout); ev.log = (*ngx_cycle).log; - ngx_event_add_timer(&mut *ev, TIMEOUT_MS); - State::Disconnected { event: ev } + ngx_event_add_timer(&mut *ev, MIN_TIMEOUT_MS); + State::Disconnected { event: ev, reconnect_timeout: MIN_TIMEOUT_MS } }, } } @@ -91,8 +92,8 @@ impl UnixSocket { let mut ev: Box = Box::new(MaybeUninit::zeroed().assume_init()); ev.handler = Some(on_reconnect_timeout); ev.log = (*ngx_cycle).log; - ngx_event_add_timer(&mut *ev, TIMEOUT_MS); - State::Disconnected { event: ev } + ngx_event_add_timer(&mut *ev, MIN_TIMEOUT_MS); + State::Disconnected { event: ev, reconnect_timeout: MIN_TIMEOUT_MS } }, }; @@ -120,7 +121,7 @@ impl UnixSocket { (*(**conn).write).handler = Some(on_write); (*(**conn).read).handler = Some(on_read); }, - State::Disconnected { event } => { + State::Disconnected { event, .. } => { event.data = (&*inner as *const Inner as *mut Inner).cast(); (inner.after_handshake.borrow_mut())(); // discard send data, it is disconnected } @@ -139,7 +140,7 @@ impl UnixSocket { let mut dummy_ev: Box = Box::new(MaybeUninit::zeroed().assume_init()); dummy_ev.handler = None; dummy_ev.log = (*ngx_cycle).log; - *self.0.state.borrow_mut() = State::Disconnected { event: dummy_ev } + *self.0.state.borrow_mut() = State::Disconnected { event: dummy_ev, reconnect_timeout: MIN_TIMEOUT_MS} } } } @@ -221,7 +222,7 @@ impl Drop for State { ngx_close_connection(*conn); } }, - Self::Disconnected { event } => unsafe { + Self::Disconnected { event, .. } => unsafe { if event.active() != 0 { ngx_event_del_timer(event.as_mut()); } @@ -278,6 +279,7 @@ impl Inner { if let Err(Disconnected) = unsafe { buffers.send(*conn) } { *state = State::Disconnected { event: self.create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS }; } } @@ -290,7 +292,7 @@ impl Inner { ev.log = (*ngx_cycle).log; ev.data = (self as *const Self as *mut Self).cast(); - ngx_event_add_timer(&mut *ev, TIMEOUT_MS); + ngx_event_add_timer(&mut *ev, 0); ev } @@ -472,6 +474,7 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) { Err(_) => { *state = State::Disconnected { event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, } } } @@ -488,6 +491,7 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) { if !back_data.is_empty() && (*data).write(&back_data).is_err() { *(*data).state.borrow_mut() = State::Disconnected { event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, }; } } @@ -497,6 +501,7 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) { if !back_data.is_empty() && (*data).write(&back_data).is_err() { *(*data).state.borrow_mut() = State::Disconnected { event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, }; } } @@ -504,12 +509,14 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) { // error, signal connection close *(*data).state.borrow_mut() = State::Disconnected { event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, }; break; } else if result == NGX_AGAIN as isize { if ngx_handle_read_event(conn.read, 0) != NGX_OK as isize { *(*data).state.borrow_mut() = State::Disconnected { event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, }; } break; @@ -517,6 +524,7 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) { // error, retry reconnect *(*data).state.borrow_mut() = State::Disconnected { event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, }; break; } @@ -530,7 +538,10 @@ unsafe extern "C" fn on_reconnect_timeout(ev: *mut ngx_event_t) { if !data.is_null() { let data = &*data; let mut state = data.state.borrow_mut(); - if let State::Disconnected { .. } = &*state { + if let State::Disconnected { reconnect_timeout, .. } = &mut *state { + if *reconnect_timeout < TIMEOUT_MS { + *reconnect_timeout = (*reconnect_timeout) * 2 + }; if let Some(conn) = State::try_connect( &data.path, &data.name, @@ -554,9 +565,10 @@ unsafe extern "C" fn on_reconnect_timeout(ev: *mut ngx_event_t) { && ngx_exiting == 0 && ngx_terminate == 0 { - ngx_event_add_timer(&mut *ev, TIMEOUT_MS); + ngx_event_add_timer(&mut *ev, *reconnect_timeout); } - State::Disconnected { event: ev } + + State::Disconnected { event: ev, reconnect_timeout: *reconnect_timeout } }, }; } else if (*ev).timer_set() == 0 @@ -564,7 +576,7 @@ unsafe extern "C" fn on_reconnect_timeout(ev: *mut ngx_event_t) { && ngx_exiting == 0 && ngx_terminate == 0 { - ngx_event_add_timer(ev, TIMEOUT_MS); + ngx_event_add_timer(ev, *reconnect_timeout); } } }