Skip to content

Commit

Permalink
instrumented parquet builder with traces
Browse files Browse the repository at this point in the history
  • Loading branch information
jpswinski committed Jul 31, 2023
1 parent df54c08 commit 65ca86e
Show file tree
Hide file tree
Showing 14 changed files with 52 additions and 27 deletions.
6 changes: 3 additions & 3 deletions clients/python/utils/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ def display_trace(trace, depth):
thread_id = trace["start"]['tid']
start_time = trace["start"]['time']
stop_time = trace["stop"]['time']
sec_from_origin = start_time / 1e3
sec_duration = (stop_time - start_time) / 1e3
dt = sec_from_origin # sliderule.gps2utc(sec_from_origin)
sec_from_origin = start_time / 1e6
sec_duration = (stop_time - start_time) / 1e6
dt = sec_from_origin
name = trace["start"]['name']
attributes = trace["start"]['attr']
# Print trace
Expand Down
4 changes: 2 additions & 2 deletions packages/arrow/ParquetBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -704,13 +704,13 @@ void ParquetBuilder::processRecordBatch (int num_rows)

/* Start Trace */
uint32_t parent_trace_id = EventLib::grabId();
uint32_t trace_id = start_trace(INFO, parent_trace_id, "process_batch", "%s", "{}");
uint32_t trace_id = start_trace(INFO, parent_trace_id, "process_batch", "{\"num_rows\": %d}", num_rows);

/* Loop Through Fields in Schema */
vector<shared_ptr<arrow::Array>> columns;
for(int i = 0; i < fieldIterator->length; i++)
{
uint32_t field_trace_id = start_trace(INFO, trace_id, "append_field", "{\"field\": \"%d\", \"rows\": %d}", i, batch.rows);
uint32_t field_trace_id = start_trace(INFO, trace_id, "append_field", "{\"field\": %d}", i);
RecordObject::field_t field = (*fieldIterator)[i];
shared_ptr<arrow::Array> column;

Expand Down
2 changes: 1 addition & 1 deletion packages/arrow/ParquetBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ParquetBuilder: public LuaObject

static const int LIST_BLOCK_SIZE = 32;
static const int FILE_NAME_MAX_LEN = 128;
static const int FILE_BUFFER_RSPS_SIZE = 0x1000000; // 16MB
static const int FILE_BUFFER_RSPS_SIZE = 0x2000000; // 32MB
static const int ROW_GROUP_SIZE = 0x4000000; // 64MB
static const int QUEUE_BUFFER_FACTOR = 3;

Expand Down
1 change: 1 addition & 0 deletions packages/core/EndpointObject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ EndpointObject::Request::Request (const char* _id):
headers(EXPECTED_MAX_HEADER_FIELDS)
{
id = StringLib::duplicate(_id);
trace_id = ORIGIN;
path = NULL;
resource = NULL;
verb = UNRECOGNIZED;
Expand Down
1 change: 1 addition & 0 deletions packages/core/EndpointObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class EndpointObject: public LuaObject
Dictionary<const char*> headers;
uint8_t* body;
long length; // of body
uint32_t trace_id;
const char* id; // must be unique

Request (const char* _id);
Expand Down
5 changes: 2 additions & 3 deletions packages/core/EventLib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ uint32_t EventLib::startTrace(uint32_t parent, const char* name, event_level_t l
if(lvl < trace_level) return parent;

/* Initialize Trace */
event.systime = TimeLib::gpstime();
event.systime = TimeLib::latchtime() * 1000000; // us
event.tid = Thread::getId();
event.id = trace_id++;;
event.parent = parent;
Expand Down Expand Up @@ -280,7 +280,7 @@ void EventLib::stopTrace(uint32_t id, event_level_t lvl)
if(lvl < trace_level) return;

/* Initialize Trace */
event.systime = TimeLib::gpstime();
event.systime = TimeLib::latchtime() * 1000000; // us
event.tid = 0;
event.id = id;
event.parent = ORIGIN;
Expand All @@ -293,7 +293,6 @@ void EventLib::stopTrace(uint32_t id, event_level_t lvl)
/* Copy IP Address */
StringLib::copy(event.ipv4, SockLib::sockipv4(), SockLib::IPV4_STR_LEN);


/* Send Event */
sendEvent(&event, 1);
}
Expand Down
28 changes: 24 additions & 4 deletions packages/core/HttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ HttpClient::~HttpClient(void)
*----------------------------------------------------------------------------*/
HttpClient::rsps_t HttpClient::request (EndpointObject::verb_t verb, const char* resource, const char* data, bool keep_alive, Publisher* outq, int timeout)
{
if(sock->isConnected() && makeRequest(verb, resource, data, keep_alive))
uint32_t trace_id = start_trace(INFO, traceId, "http_client", "{\"verb\": \"%s\", \"resource\": \"%s\"}", EndpointObject::verb2str(verb), resource);
if(sock->isConnected() && makeRequest(verb, resource, data, keep_alive, trace_id))
{
rsps_t rsps = parseResponse(outq, timeout);
rsps_t rsps = parseResponse(outq, timeout, trace_id);
stop_trace(INFO, trace_id);
return rsps;
}
else
Expand All @@ -175,6 +177,7 @@ HttpClient::rsps_t HttpClient::request (EndpointObject::verb_t verb, const char*
.response = NULL,
.size = 0
};
stop_trace(INFO, trace_id);
return rsps;
}
}
Expand Down Expand Up @@ -208,8 +211,11 @@ TcpSocket* HttpClient::initializeSocket(const char* _ip_addr, int _port)
/*----------------------------------------------------------------------------
* makeRequest
*----------------------------------------------------------------------------*/
bool HttpClient::makeRequest (EndpointObject::verb_t verb, const char* resource, const char* data, bool keep_alive)
bool HttpClient::makeRequest (EndpointObject::verb_t verb, const char* resource, const char* data, bool keep_alive, int32_t parent_trace_id)
{
/* Start Trace */
uint32_t trace_id = start_trace(INFO, parent_trace_id, "make_request", "%s", "{}");

bool status = true;
int rqst_len = 0;

Expand Down Expand Up @@ -289,14 +295,21 @@ bool HttpClient::makeRequest (EndpointObject::verb_t verb, const char* resource,
status = false;
}

/* Stop Trace */
stop_trace(INFO, trace_id);

/* Return Status */
return status;
}
/*----------------------------------------------------------------------------
* parseResponse
*----------------------------------------------------------------------------*/
HttpClient::rsps_t HttpClient::parseResponse (Publisher* outq, int timeout)
HttpClient::rsps_t HttpClient::parseResponse (Publisher* outq, int timeout, int32_t parent_trace_id)
{
/* Start Trace */
uint32_t trace_id = start_trace(INFO, parent_trace_id, "parse_response", "%s", "{}");

/* Initialize Response */
rsps_t rsps = {
.code = EndpointObject::OK,
.response = NULL,
Expand All @@ -322,6 +335,7 @@ HttpClient::rsps_t HttpClient::parseResponse (Publisher* outq, int timeout)
while(active && !response_complete)
{
int bytes_read = sock->readBuffer(&rspsBuf[rsps_buf_index], MAX_RSPS_BUF_LEN-rsps_buf_index, timeout);
uint32_t sock_trace_id = start_trace(DEBUG, trace_id, "sock_read_buffer", "{\"bytes_read\": %d", bytes_read);
if(bytes_read > 0)
{
int line_start = 0;
Expand Down Expand Up @@ -560,6 +574,9 @@ HttpClient::rsps_t HttpClient::parseResponse (Publisher* outq, int timeout)
{
throw RunTimeException(CRITICAL, RTE_ERROR, "Failed to read socket: %d", bytes_read);
}

/* Stop Trace */
stop_trace(DEBUG, sock_trace_id);
}
}
catch(const RunTimeException& e)
Expand All @@ -568,6 +585,9 @@ HttpClient::rsps_t HttpClient::parseResponse (Publisher* outq, int timeout)
rsps.code = EndpointObject::Internal_Server_Error;
}

/* Stop Trace */
stop_trace(DEBUG, trace_id);

/* Return Response */
return rsps;
}
Expand Down
4 changes: 2 additions & 2 deletions packages/core/HttpClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ class HttpClient: public LuaObject
*--------------------------------------------------------------------*/

TcpSocket* initializeSocket (const char* _ip_addr, int _port);
bool makeRequest (EndpointObject::verb_t verb, const char* resource, const char* data, bool keep_alive);
rsps_t parseResponse (Publisher* outq, int timeout);
bool makeRequest (EndpointObject::verb_t verb, const char* resource, const char* data, bool keep_alive, int32_t parent_trace_id);
rsps_t parseResponse (Publisher* outq, int timeout, int32_t parent_trace_id);
long parseLine (int start, int end);
status_line_t parseStatusLine (int start, int term);
hdr_kv_t parseHeaderLine (int start, int term);
Expand Down
14 changes: 13 additions & 1 deletion packages/core/HttpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,10 @@ void HttpServer::initConnection (connection_t* connection)
connection->keep_alive = false;
connection->id = new char [REQUEST_ID_LEN];
StringLib::format(connection->id, REQUEST_ID_LEN, "%s.%ld", getName(), cnt);
connection->trace_id = start_trace(DEBUG, ORIGIN, "http_server", "{\"rqst_id\":\"%s\"}", connection->id);
connection->rsps_state.rspq = new Subscriber(connection->id);
connection->request = new EndpointObject::Request(connection->id);
connection->request->trace_id = connection->trace_id;
}

/*----------------------------------------------------------------------------
Expand All @@ -185,6 +187,9 @@ void HttpServer::deinitConnection (connection_t* connection)

/* Request freed only if present, o/w memory owned by EndpointObject */
if(connection->request) delete connection->request;

/* Stop Trace */
stop_trace(DEBUG, connection->trace_id);
}

/*----------------------------------------------------------------------------
Expand Down Expand Up @@ -374,7 +379,7 @@ int HttpServer::onRead(int fd)
int status = 0;
connection_t* connection = connections[fd];
rqst_state_t* state = &connection->rqst_state;

uint32_t trace_id = start_trace(DEBUG, connection->trace_id, "on_read", "%s", "{}");

/* Determine Buffer to Read Into */
uint8_t* buf; // pointer to buffer to read into
Expand Down Expand Up @@ -499,6 +504,9 @@ int HttpServer::onRead(int fd)
status = INVALID_RC; // will close socket
}

/* Stop Trace */
stop_trace(DEBUG, trace_id);

return status;
}

Expand All @@ -512,6 +520,7 @@ int HttpServer::onWrite(int fd)
int status = 0;
connection_t* connection = connections[fd];
rsps_state_t* state = &connection->rsps_state;
uint32_t trace_id = start_trace(DEBUG, connection->trace_id, "on_write", "%s", "{}");
bool ref_complete = false;

uint8_t* buffer;
Expand Down Expand Up @@ -634,6 +643,9 @@ int HttpServer::onWrite(int fd)
}
}

/* Stop Trace */
stop_trace(DEBUG, trace_id);

return status;
}

Expand Down
1 change: 1 addition & 0 deletions packages/core/HttpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class HttpServer: public LuaObject

typedef struct {
char* id;
uint32_t trace_id;
rqst_state_t rqst_state;
rsps_state_t rsps_state;
double start_time;
Expand Down
2 changes: 1 addition & 1 deletion packages/core/LuaEndpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ void* LuaEndpoint::requestThread (void* parm)
const char* script_pathname = LuaEngine::sanitize(request->resource);

/* Start Trace */
uint32_t trace_id = start_trace(INFO, lua_endpoint->getTraceId(), "lua_endpoint", "{\"rqst_id\":\"%s\", \"verb\":\"%s\", \"resource\":\"%s\"}", request->id, verb2str(request->verb), request->resource);
uint32_t trace_id = start_trace(INFO, request->trace_id, "lua_endpoint", "{\"verb\":\"%s\", \"resource\":\"%s\"}", verb2str(request->verb), request->resource);

/* Log Request */
mlog(lua_endpoint->logLevel, "%s %s: %s", verb2str(request->verb), request->resource, request->body);
Expand Down
8 changes: 0 additions & 8 deletions packages/core/LuaObject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,6 @@ const char* LuaObject::getName (void)
else return "<unnamed>";
}

/*----------------------------------------------------------------------------
* getTraceId
*----------------------------------------------------------------------------*/
uint32_t LuaObject::getTraceId (void)
{
return traceId;
}

/*----------------------------------------------------------------------------
* getLuaNumParms
*----------------------------------------------------------------------------*/
Expand Down
1 change: 0 additions & 1 deletion packages/core/LuaObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ class LuaObject

const char* getType (void);
const char* getName (void);
uint32_t getTraceId (void);

static int luaGetByName (lua_State* L);
static int getLuaNumParms (lua_State* L);
Expand Down
2 changes: 1 addition & 1 deletion packages/core/LuaScript.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ LuaScript::LuaScript(lua_State* L, const char* script, const char* arg):
SafeString safe_filename("%s", script);
safe_filename.replace("..", "_");
SafeString safe_pathname("%s%c%s.lua", CONFDIR, PATH_DELIMETER, safe_filename.str());
engine = new LuaEngine(safe_pathname.str(), arg, ORIGIN, LuaEngine::abortHook, false);
engine = new LuaEngine(safe_pathname.str(), arg, traceId, LuaEngine::abortHook, false);
}
else
{
Expand Down

0 comments on commit 65ca86e

Please sign in to comment.