Skip to content

Commit

Permalink
afmongodb: Add support for acting on type hints
Browse files Browse the repository at this point in the history
Since we have type hints, we should use them. This patch adds support
for all the type hints known so far (except for default, which is
special), along with support for the on-error() setting.

This fixes one part of elastic#6.

Signed-off-by: Gergely Nagy <[email protected]>
  • Loading branch information
algernon authored and bazsi committed Sep 19, 2013
1 parent 7efc13b commit 304912d
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 16 deletions.
133 changes: 117 additions & 16 deletions modules/afmongodb/afmongodb.c
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,13 @@ afmongodb_vp_obj_end(const gchar *name,
const gchar *prev, gpointer *prev_data,
gpointer user_data)
{
MongoDBDestDriver *self = (MongoDBDestDriver *)user_data;
bson *root;

if (prev_data)
root = (bson *)*prev_data;
else
root = (bson *)user_data;
root = self->bson;

if (prefix_data)
{
Expand All @@ -334,21 +335,105 @@ afmongodb_vp_process_value(const gchar *name, const gchar *prefix,
gpointer *prefix_data, gpointer user_data)
{
bson *o;
MongoDBDestDriver *self = (MongoDBDestDriver *)user_data;
gboolean fallback = self->template_options.on_error & ON_ERROR_FALLBACK_TO_STRING;

if (prefix_data)
o = (bson *)*prefix_data;
else
o = (bson *)user_data;
o = self->bson;

bson_append_string (o, name, value, -1);
switch (type)
{
case TYPE_HINT_BOOLEAN:
{
gboolean b;

if (type_cast_to_boolean (value, &b, NULL))
bson_append_boolean (o, name, b);
else
{
gboolean r = type_cast_drop_helper(self->template_options.on_error,
value, "boolean");

if (fallback)
bson_append_string (o, name, value, -1);
else
return r;
}
break;
}
case TYPE_HINT_INT32:
{
gint32 i;

if (type_cast_to_int32 (value, &i, NULL))
bson_append_int32 (o, name, i);
else
{
gboolean r = type_cast_drop_helper(self->template_options.on_error,
value, "int32");

if (fallback)
bson_append_string (o, name, value, -1);
else
return r;
}
break;
}
case TYPE_HINT_INT64:
{
gint64 i;

if (type_cast_to_int64 (value, &i, NULL))
bson_append_int64 (o, name, i);
else
{
gboolean r = type_cast_drop_helper(self->template_options.on_error,
value, "int64");

if (fallback)
bson_append_string(o, name, value, -1);
else
return r;
}

break;
}
case TYPE_HINT_DATETIME:
{
guint64 i;

if (type_cast_to_datetime_int (value, &i, NULL))
bson_append_utc_datetime (o, name, (gint64)i);
else
{
gboolean r = type_cast_drop_helper(self->template_options.on_error,
value, "datetime");

if (fallback)
bson_append_string(o, name, value, -1);
else
return r;
}

break;
}
case TYPE_HINT_STRING:
case TYPE_HINT_LITERAL:
bson_append_string (o, name, value, -1);
break;
default:
return TRUE;
}

return FALSE;
}

static gboolean
afmongodb_worker_insert (MongoDBDestDriver *self)
{
gboolean success;
gboolean success, need_drop = self->template_options.on_error & ON_ERROR_DROP_MESSAGE;
guint8 *oid;
LogMessage *msg;
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
Expand All @@ -367,20 +452,26 @@ afmongodb_worker_insert (MongoDBDestDriver *self)
bson_append_oid (self->bson, "_id", oid);
g_free (oid);

value_pairs_walk(self->vp,
afmongodb_vp_obj_start,
afmongodb_vp_process_value,
afmongodb_vp_obj_end,
msg, self->seq_num, self->bson);
success = value_pairs_walk(self->vp,
afmongodb_vp_obj_start,
afmongodb_vp_process_value,
afmongodb_vp_obj_end,
msg, self->seq_num, self);
bson_finish (self->bson);

if (!mongo_sync_cmd_insert_n(self->conn, self->ns, 1,
(const bson **)&self->bson))
if (!success && !need_drop)
success = TRUE;

if (success)
{
msg_error("Network error while inserting into MongoDB",
evt_tag_int("time_reopen", self->time_reopen),
NULL);
success = FALSE;
if (!mongo_sync_cmd_insert_n(self->conn, self->ns, 1,
(const bson **)&self->bson))
{
msg_error("Network error while inserting into MongoDB",
evt_tag_int("time_reopen", self->time_reopen),
NULL);
success = FALSE;
}
}

msg_set_context(NULL);
Expand All @@ -394,7 +485,15 @@ afmongodb_worker_insert (MongoDBDestDriver *self)
}
else
{
log_queue_push_head(self->queue, msg, &path_options);
if (need_drop)
{
stats_counter_inc(self->dropped_messages);
step_sequence_number(&self->seq_num);
log_msg_ack(msg, &path_options);
log_msg_unref(msg);
}
else
log_queue_push_head(self->queue, msg, &path_options);
}

return success;
Expand Down Expand Up @@ -584,6 +683,8 @@ afmongodb_dd_deinit(LogPipe *s)
afmongodb_dd_format_stats_instance(self),
SC_TYPE_DROPPED, &self->dropped_messages);
stats_unlock();


if (!log_dest_driver_deinit_method(s))
return FALSE;

Expand Down
2 changes: 2 additions & 0 deletions modules/afmongodb/afmongodb.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ void afmongodb_dd_set_path(LogDriver *d, const gchar *path);

gboolean afmongodb_dd_check_address(LogDriver *d, gboolean local);

LogTemplateOptions *afmongodb_dd_get_template_options(LogDriver *s);

#endif

0 comments on commit 304912d

Please sign in to comment.