Skip to content

Commit

Permalink
Compression improve (arkime#2443)
Browse files Browse the repository at this point in the history
* switching buffers mid write hosed info

Now that we can switch buffers mid write info pointed to old version.
Just go to the source

* need to update bufpos
  • Loading branch information
awick authored Oct 12, 2023
1 parent 7cd00b3 commit 7b2bd38
Showing 1 changed file with 42 additions and 30 deletions.
72 changes: 42 additions & 30 deletions capture/writer-simple.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ LOCAL void writer_simple_free(MolochSimple_t *info)
}

/******************************************************************************/
LOCAL void writer_simple_process_buf(int thread, int closing)
LOCAL MolochSimple_t *writer_simple_process_buf(int thread, int closing)
{
MolochSimple_t *info = currentInfo[thread];
static uint32_t lastError;
Expand Down Expand Up @@ -282,6 +282,8 @@ LOCAL void writer_simple_process_buf(int thread, int closing)
}
MOLOCH_COND_SIGNAL(simpleQ);
MOLOCH_UNLOCK(simpleQ);

return currentInfo[thread];
}
/******************************************************************************/
LOCAL void writer_simple_encrypt_key(char *kekId, uint8_t *dek, int deklen, char *outkeyhex)
Expand Down Expand Up @@ -385,8 +387,10 @@ LOCAL char *writer_simple_get_kekId ()
return g_strndup(okek, j);
}
/******************************************************************************/
LOCAL void writer_simple_write_output(int thread, MolochSimple_t *info, const unsigned char *data, int len)
LOCAL void writer_simple_write_output(int thread, const unsigned char *data, int len)
{
MolochSimple_t *info = currentInfo[thread];

switch(compressionMode) {
case MOLOCH_COMPRESSION_NONE:
memcpy(info->buf + info->bufpos, data, len);
Expand All @@ -401,7 +405,8 @@ LOCAL void writer_simple_write_output(int thread, MolochSimple_t *info, const un
while (info->file->z_strm.avail_in != 0) {
// The current zlib buffer is full
if (info->file->z_strm.avail_out == 0) {
writer_simple_process_buf(thread, 0);
info->bufpos = (char *)info->file->z_strm.next_out - info->buf;
info = writer_simple_process_buf(info->file->thread, 0);
}
deflate(&info->file->z_strm, Z_NO_FLUSH);
}
Expand All @@ -417,7 +422,8 @@ LOCAL void writer_simple_write_output(int thread, MolochSimple_t *info, const un
while (ZSTD_compressStream2(info->file->zstd_strm, &info->file->zstd_out, &info->file->zstd_in, ZSTD_e_continue) != 0) {
// The current zstd buffer is full
if (info->file->zstd_out.pos == info->file->zstd_out.size) {
writer_simple_process_buf(thread, 0);
info->bufpos = info->file->zstd_out.pos;
info = writer_simple_process_buf(info->file->thread, 0);
}
}
info->file->posInBlock += len;
Expand All @@ -431,18 +437,27 @@ LOCAL void writer_simple_write_output(int thread, MolochSimple_t *info, const un
info->file->packetBytesWritten += len;
}
/******************************************************************************/
LOCAL void writer_simple_gzip_make_new_block(MolochSimple_t *info)
LOCAL void writer_simple_gzip_make_new_block(int thread)
{
MolochSimple_t *info = currentInfo[thread];

deflate(&info->file->z_strm, Z_FULL_FLUSH);
info->bufpos = (char *)info->file->z_strm.next_out - info->buf;
info->file->blockStart = info->file->z_strm.total_out;
info->file->posInBlock = 0;
}
/******************************************************************************/
LOCAL void writer_simple_zstd_make_new_block(MolochSimple_t *info)
LOCAL void writer_simple_zstd_make_new_block(int thread)
{
MolochSimple_t *info = currentInfo[thread];
#ifdef HAVE_ZSTD
ZSTD_compressStream2(info->file->zstd_strm, &info->file->zstd_out, &info->file->zstd_in, ZSTD_e_end);
while (ZSTD_compressStream2(info->file->zstd_strm, &info->file->zstd_out, &info->file->zstd_in, ZSTD_e_end) != 0) {
// The current zstd buffer is full
if (info->file->zstd_out.pos == info->file->zstd_out.size) {
info->bufpos = info->file->zstd_out.pos;
info = writer_simple_process_buf(info->file->thread, 0);
}
}
info->bufpos = info->file->zstd_out.pos;
info->file->blockStart = info->file->zstd_completedBlockStart + info->file->zstd_out.pos;
info->file->posInBlock = 0;
Expand All @@ -451,8 +466,6 @@ LOCAL void writer_simple_zstd_make_new_block(MolochSimple_t *info)
/******************************************************************************/
LOCAL void writer_simple_write(const MolochSession_t * const session, MolochPacket_t * const packet)
{
MolochSimple_t *info;

if (DLL_COUNT(simple_, &simpleQ) > simpleMaxQ) {
static uint32_t lastError;
static uint32_t notSaved;
Expand Down Expand Up @@ -485,6 +498,7 @@ LOCAL void writer_simple_write(const MolochSession_t * const session, MolochPack
packetPosEncoding = "gap0";
}

MolochSimple_t *info;
info = currentInfo[thread] = writer_simple_alloc(thread, NULL);
info->file = MOLOCH_TYPE_ALLOC0(MolochSimpleFile_t);
info->file->thread = thread;
Expand Down Expand Up @@ -594,46 +608,44 @@ LOCAL void writer_simple_write(const MolochSession_t * const session, MolochPack
memcpy(&pcapFileHeader2, &pcapFileHeader, 24);
pcapFileHeader2.magic = 0xa1b2c3d5;
pcapFileHeader2.thiszone = firstPacket[thread];
writer_simple_write_output(thread, info, (unsigned char *)&pcapFileHeader2, 20);
writer_simple_write_output(thread, (unsigned char *)&pcapFileHeader2, 20);
} else {
writer_simple_write_output(thread, info, (unsigned char *)&pcapFileHeader, 20);
writer_simple_write_output(thread, (unsigned char *)&pcapFileHeader, 20);
}

uint32_t linktype = moloch_packet_dlt_to_linktype(pcapFileHeader.dlt);
writer_simple_write_output(thread, info, (unsigned char *)&linktype, 4);
writer_simple_write_output(thread, (unsigned char *)&linktype, 4);
if (config.debug)
LOG("opened %d %s %d", thread, name, info->file->fd);
g_free(name);

// Make a new block for start of packets
if (compressionMode == MOLOCH_COMPRESSION_GZIP)
writer_simple_gzip_make_new_block(info);
writer_simple_gzip_make_new_block(thread);
else if (compressionMode == MOLOCH_COMPRESSION_ZSTD)
writer_simple_zstd_make_new_block(info);
writer_simple_zstd_make_new_block(thread);
gettimeofday(&fileAge[thread], NULL);
} else {
info = currentInfo[thread];
}

packet->writerFileNum = info->file->id;
packet->writerFileNum = currentInfo[thread]->file->id;

if (compressionMode == MOLOCH_COMPRESSION_GZIP) {
if (info->file->posInBlock >= simpleCompressionBlockSize) {
writer_simple_gzip_make_new_block(info);
if (currentInfo[thread]->file->posInBlock >= simpleCompressionBlockSize) {
writer_simple_gzip_make_new_block(thread);
}

packet->writerFilePos = (info->file->blockStart << uncompressedBits) + info->file->posInBlock;
packet->writerFilePos = (currentInfo[thread]->file->blockStart << uncompressedBits) + currentInfo[thread]->file->posInBlock;
} else if (compressionMode == MOLOCH_COMPRESSION_ZSTD) {
if (info->file->posInBlock >= simpleCompressionBlockSize) {
writer_simple_zstd_make_new_block(info);
if (currentInfo[thread]->file->posInBlock >= simpleCompressionBlockSize) {
writer_simple_zstd_make_new_block(thread);
}

packet->writerFilePos = (info->file->blockStart << uncompressedBits) + info->file->posInBlock;
packet->writerFilePos = (currentInfo[thread]->file->blockStart << uncompressedBits) + currentInfo[thread]->file->posInBlock;
} else {
packet->writerFilePos = info->file->pos;
packet->writerFilePos = currentInfo[thread]->file->pos;
}

info->file->packets++;
currentInfo[thread]->file->packets++;
if (simpleShortHeader) {
char header[6];
// LLLL LLLL LLLL LLLL
Expand All @@ -651,21 +663,21 @@ LOCAL void writer_simple_write(const MolochSession_t * const session, MolochPack

memcpy(header+2, &t, 4);

writer_simple_write_output(thread, info, (unsigned char *)&header, 6);
writer_simple_write_output(thread, (unsigned char *)&header, 6);
} else {
struct moloch_pcap_sf_pkthdr hdr;

hdr.ts.tv_sec = packet->ts.tv_sec;
hdr.ts.tv_usec = packet->ts.tv_usec;
hdr.caplen = packet->pktlen;
hdr.pktlen = packet->pktlen;
writer_simple_write_output(thread, info, (unsigned char *)&hdr, 16);
writer_simple_write_output(thread, (unsigned char *)&hdr, 16);
}
writer_simple_write_output(thread, info, packet->pkt, packet->pktlen);
writer_simple_write_output(thread, packet->pkt, packet->pktlen);

if (info->bufpos > config.pcapWriteSize) {
if (currentInfo[thread]->bufpos > config.pcapWriteSize) {
writer_simple_process_buf(thread, 0);
} else if (info->file->packetBytesWritten >= config.maxFileSizeB) {
} else if (currentInfo[thread]->file->packetBytesWritten >= config.maxFileSizeB) {
writer_simple_process_buf(thread, 1);
}
}
Expand Down

0 comments on commit 7b2bd38

Please sign in to comment.