Skip to content

Commit

Permalink
Merge pull request #355 from bazsi/add-idle-timeout-support-for-logre…
Browse files Browse the repository at this point in the history
…ader-based-sources

Add idle timeout support for logreader based sources
  • Loading branch information
MrAnno authored Oct 22, 2024
2 parents 3d33f33 + c4abbdc commit a982fb1
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 19 deletions.
3 changes: 3 additions & 0 deletions lib/cfg-grammar.y
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ main_location_print (FILE *yyo, YYLTYPE const * const yylocp)
%token KW_BATCH_LINES 10087
%token KW_BATCH_TIMEOUT 10088
%token KW_TRIM_LARGE_MESSAGES 10089

%token KW_STATS 10400
%token KW_FREQ 10401
%token KW_LEVEL 10402
Expand All @@ -273,6 +274,7 @@ main_location_print (FILE *yyo, YYLTYPE const * const yylocp)
%token KW_CHECK_HOSTNAME 10093
%token KW_BAD_HOSTNAME 10094
%token KW_LOG_LEVEL 10095
%token KW_IDLE_TIMEOUT 10096

%token KW_KEEP_TIMESTAMP 10100

Expand Down Expand Up @@ -1507,6 +1509,7 @@ source_proto_option
free($3);
}
| KW_LOG_MSG_SIZE '(' positive_integer ')' { last_proto_server_options->max_msg_size = $3; }
| KW_IDLE_TIMEOUT '(' positive_integer ')' { last_proto_server_options->idle_timeout = $3; }
| KW_TRIM_LARGE_MESSAGES '(' yesno ')' { last_proto_server_options->trim_large_messages = $3; }
;

Expand Down
1 change: 1 addition & 0 deletions lib/cfg-parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ static CfgLexerKeyword main_keywords[] =
{ "log_iw_size", KW_LOG_IW_SIZE },
{ "log_msg_size", KW_LOG_MSG_SIZE },
{ "trim_large_messages", KW_TRIM_LARGE_MESSAGES },
{ "idle_timeout", KW_IDLE_TIMEOUT },
{ "log_prefix", KW_LOG_PREFIX, KWS_OBSOLETE, "program_override" },
{ "program_override", KW_PROGRAM_OVERRIDE },
{ "host_override", KW_HOST_OVERRIDE },
Expand Down
6 changes: 3 additions & 3 deletions lib/logproto/logproto-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,20 @@ log_proto_client_options_set_drop_input(LogProtoClientOptions *options, gboolean
void
log_proto_client_options_set_timeout(LogProtoClientOptions *options, gint timeout)
{
options->timeout = timeout;
options->idle_timeout = timeout;
}

gint
log_proto_client_options_get_timeout(LogProtoClientOptions *options)
{
return options->timeout;
return options->idle_timeout;
}

void
log_proto_client_options_defaults(LogProtoClientOptions *options)
{
options->drop_input = FALSE;
options->timeout = 0;
options->idle_timeout = 0;
}

void
Expand Down
8 changes: 6 additions & 2 deletions lib/logproto/logproto-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ typedef struct _LogProtoClient LogProtoClient;
typedef struct _LogProtoClientOptions
{
gboolean drop_input;
gint timeout;
gint idle_timeout;
} LogProtoClientOptions;

typedef union _LogProtoClientOptionsStorage
Expand Down Expand Up @@ -126,7 +126,11 @@ log_proto_client_handshake(LogProtoClient *s, gboolean *handshake_finished)
static inline gboolean
log_proto_client_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout)
{
return s->prepare(s, fd, cond, timeout);
gboolean result = s->prepare(s, fd, cond, timeout);

if (!result && *timeout < 0)
*timeout = s->options->idle_timeout;
return result;
}

static inline LogProtoStatus
Expand Down
1 change: 1 addition & 0 deletions lib/logproto/logproto-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ log_proto_server_options_defaults(LogProtoServerOptions *options)
options->trim_large_messages = -1;
options->init_buffer_size = -1;
options->max_buffer_size = -1;
options->idle_timeout = -1;
options->ack_tracker_factory = instant_ack_tracker_bookmarkless_factory_new();
}

Expand Down
9 changes: 7 additions & 2 deletions lib/logproto/logproto-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct _LogProtoServerOptions
gboolean trim_large_messages;
gint max_buffer_size;
gint init_buffer_size;
gint idle_timeout;
AckTrackerFactory *ack_tracker_factory;
};

Expand Down Expand Up @@ -115,10 +116,14 @@ log_proto_server_set_options(LogProtoServer *self, const LogProtoServerOptions *
self->options = options;
}

static inline gboolean
static inline LogProtoPrepareAction
log_proto_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout)
{
return s->prepare(s, cond, timeout);
LogProtoPrepareAction result = s->prepare(s, cond, timeout);

if (result == LPPA_POLL_IO && *timeout < 0)
*timeout = s->options->idle_timeout;
return result;
}

static inline gboolean
Expand Down
7 changes: 1 addition & 6 deletions lib/logproto/logproto-text-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ log_proto_text_client_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, g
if (*cond == 0)
*cond = G_IO_OUT;

const gboolean pending_write = self->partial != NULL;

if (!pending_write && s->options->timeout > 0)
*timeout = s->options->timeout;

return pending_write;
return self->partial != NULL;
}

static LogProtoStatus
Expand Down
7 changes: 1 addition & 6 deletions modules/affile/logproto-file-writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,7 @@ log_proto_file_writer_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, g
/* if there's no pending I/O in the transport layer, then we want to do a write */
if (*cond == 0)
*cond = G_IO_OUT;
const gboolean pending_write = self->buf_count > 0 || self->partial;

if (!pending_write && s->options->timeout > 0)
*timeout = s->options->timeout;

return pending_write;
return self->buf_count > 0 || self->partial;
}

LogProtoClient *
Expand Down

0 comments on commit a982fb1

Please sign in to comment.