From 704993a7b37a135cdcc6c1121a51691b69536b11 Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Wed, 3 Jul 2024 15:32:03 +0300 Subject: [PATCH 1/2] fix: setting variable reconnect timeout --- nginx_module/src/unix_socket.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/nginx_module/src/unix_socket.rs b/nginx_module/src/unix_socket.rs index 270d583..c07dca3 100644 --- a/nginx_module/src/unix_socket.rs +++ b/nginx_module/src/unix_socket.rs @@ -2,7 +2,7 @@ use std::{ borrow::Cow, cell::RefCell, collections::VecDeque, marker::PhantomPinned, mem::MaybeUninit, os::unix::ffi::OsStrExt, path::Path, pin::Pin, }; - +use std::cell::Cell; use libc::{c_void, sockaddr_un}; use crate::{ @@ -40,6 +40,7 @@ struct Inner { path: String, name: Cow<'static, [u8]>, dummy_log: Box, + reconnect_timeout: Cell, _phantom: PhantomPinned, } @@ -57,7 +58,7 @@ struct WriteBuffer { end: u32, } -const TIMEOUT_MS: usize = 500; +const TIMEOUT_MS: usize = 255; impl UnixSocket { pub fn connect( @@ -105,6 +106,7 @@ impl UnixSocket { path, name, dummy_log, + reconnect_timeout: Cell::new(0), _phantom: PhantomPinned, }; @@ -290,7 +292,13 @@ impl Inner { ev.log = (*ngx_cycle).log; ev.data = (self as *const Self as *mut Self).cast(); - ngx_event_add_timer(&mut *ev, TIMEOUT_MS); + let reconnect_timeout = self.reconnect_timeout.get(); + + ngx_event_add_timer(&mut *ev, reconnect_timeout); + + if reconnect_timeout < TIMEOUT_MS { + self.reconnect_timeout.set((reconnect_timeout + 1) * 2); + } ev } @@ -467,7 +475,8 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) { *state = State::Connected { conn: new_conn, buffers: std::mem::take(buffers), - } + }; + (*data).reconnect_timeout.set(0); } Err(_) => { *state = State::Disconnected { From 9104e8a7d4431d9e029361aa712f00feab0f4642 Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Wed, 3 Jul 2024 18:26:07 +0300 Subject: [PATCH 2/2] fix: moved reconnect timeout to Disconnect state --- nginx_module/src/unix_socket.rs | 49 +++++++++++++++++---------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/nginx_module/src/unix_socket.rs b/nginx_module/src/unix_socket.rs index c07dca3..d8d499d 100644 --- a/nginx_module/src/unix_socket.rs +++ b/nginx_module/src/unix_socket.rs @@ -2,7 +2,6 @@ use std::{ borrow::Cow, cell::RefCell, collections::VecDeque, marker::PhantomPinned, mem::MaybeUninit, os::unix::ffi::OsStrExt, path::Path, pin::Pin, }; -use std::cell::Cell; use libc::{c_void, sockaddr_un}; use crate::{ @@ -28,6 +27,7 @@ enum State { }, Disconnected { event: Box, + reconnect_timeout: usize }, } @@ -40,7 +40,6 @@ struct Inner { path: String, name: Cow<'static, [u8]>, dummy_log: Box, - reconnect_timeout: Cell, _phantom: PhantomPinned, } @@ -58,6 +57,7 @@ struct WriteBuffer { end: u32, } +const MIN_TIMEOUT_MS: usize = 1; const TIMEOUT_MS: usize = 255; impl UnixSocket { @@ -83,8 +83,8 @@ impl UnixSocket { Box::new(MaybeUninit::zeroed().assume_init()); ev.handler = Some(on_reconnect_timeout); ev.log = (*ngx_cycle).log; - ngx_event_add_timer(&mut *ev, TIMEOUT_MS); - State::Disconnected { event: ev } + ngx_event_add_timer(&mut *ev, MIN_TIMEOUT_MS); + State::Disconnected { event: ev, reconnect_timeout: MIN_TIMEOUT_MS } }, } } @@ -92,8 +92,8 @@ impl UnixSocket { let mut ev: Box = Box::new(MaybeUninit::zeroed().assume_init()); ev.handler = Some(on_reconnect_timeout); ev.log = (*ngx_cycle).log; - ngx_event_add_timer(&mut *ev, TIMEOUT_MS); - State::Disconnected { event: ev } + ngx_event_add_timer(&mut *ev, MIN_TIMEOUT_MS); + State::Disconnected { event: ev, reconnect_timeout: MIN_TIMEOUT_MS } }, }; @@ -106,7 +106,6 @@ impl UnixSocket { path, name, dummy_log, - reconnect_timeout: Cell::new(0), _phantom: PhantomPinned, }; @@ -122,7 +121,7 @@ impl UnixSocket { (*(**conn).write).handler = Some(on_write); (*(**conn).read).handler = Some(on_read); }, - State::Disconnected { event } => { + State::Disconnected { event, .. } => { event.data = (&*inner as *const Inner as *mut Inner).cast(); (inner.after_handshake.borrow_mut())(); // discard send data, it is disconnected } @@ -141,7 +140,7 @@ impl UnixSocket { let mut dummy_ev: Box = Box::new(MaybeUninit::zeroed().assume_init()); dummy_ev.handler = None; dummy_ev.log = (*ngx_cycle).log; - *self.0.state.borrow_mut() = State::Disconnected { event: dummy_ev } + *self.0.state.borrow_mut() = State::Disconnected { event: dummy_ev, reconnect_timeout: MIN_TIMEOUT_MS} } } } @@ -223,7 +222,7 @@ impl Drop for State { ngx_close_connection(*conn); } }, - Self::Disconnected { event } => unsafe { + Self::Disconnected { event, .. } => unsafe { if event.active() != 0 { ngx_event_del_timer(event.as_mut()); } @@ -280,6 +279,7 @@ impl Inner { if let Err(Disconnected) = unsafe { buffers.send(*conn) } { *state = State::Disconnected { event: self.create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS }; } } @@ -292,13 +292,7 @@ impl Inner { ev.log = (*ngx_cycle).log; ev.data = (self as *const Self as *mut Self).cast(); - let reconnect_timeout = self.reconnect_timeout.get(); - - ngx_event_add_timer(&mut *ev, reconnect_timeout); - - if reconnect_timeout < TIMEOUT_MS { - self.reconnect_timeout.set((reconnect_timeout + 1) * 2); - } + ngx_event_add_timer(&mut *ev, 0); ev } @@ -475,12 +469,12 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) { *state = State::Connected { conn: new_conn, buffers: std::mem::take(buffers), - }; - (*data).reconnect_timeout.set(0); + } } Err(_) => { *state = State::Disconnected { event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, } } } @@ -497,6 +491,7 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) { if !back_data.is_empty() && (*data).write(&back_data).is_err() { *(*data).state.borrow_mut() = State::Disconnected { event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, }; } } @@ -506,6 +501,7 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) { if !back_data.is_empty() && (*data).write(&back_data).is_err() { *(*data).state.borrow_mut() = State::Disconnected { event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, }; } } @@ -513,12 +509,14 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) { // error, signal connection close *(*data).state.borrow_mut() = State::Disconnected { event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, }; break; } else if result == NGX_AGAIN as isize { if ngx_handle_read_event(conn.read, 0) != NGX_OK as isize { *(*data).state.borrow_mut() = State::Disconnected { event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, }; } break; @@ -526,6 +524,7 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) { // error, retry reconnect *(*data).state.borrow_mut() = State::Disconnected { event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, }; break; } @@ -539,7 +538,10 @@ unsafe extern "C" fn on_reconnect_timeout(ev: *mut ngx_event_t) { if !data.is_null() { let data = &*data; let mut state = data.state.borrow_mut(); - if let State::Disconnected { .. } = &*state { + if let State::Disconnected { reconnect_timeout, .. } = &mut *state { + if *reconnect_timeout < TIMEOUT_MS { + *reconnect_timeout = (*reconnect_timeout) * 2 + }; if let Some(conn) = State::try_connect( &data.path, &data.name, @@ -563,9 +565,10 @@ unsafe extern "C" fn on_reconnect_timeout(ev: *mut ngx_event_t) { && ngx_exiting == 0 && ngx_terminate == 0 { - ngx_event_add_timer(&mut *ev, TIMEOUT_MS); + ngx_event_add_timer(&mut *ev, *reconnect_timeout); } - State::Disconnected { event: ev } + + State::Disconnected { event: ev, reconnect_timeout: *reconnect_timeout } }, }; } else if (*ev).timer_set() == 0 @@ -573,7 +576,7 @@ unsafe extern "C" fn on_reconnect_timeout(ev: *mut ngx_event_t) { && ngx_exiting == 0 && ngx_terminate == 0 { - ngx_event_add_timer(ev, TIMEOUT_MS); + ngx_event_add_timer(ev, *reconnect_timeout); } } }