diff --git a/capture/writer-simple.c b/capture/writer-simple.c index f5585fa8ea..126be19d4e 100644 --- a/capture/writer-simple.c +++ b/capture/writer-simple.c @@ -203,7 +203,7 @@ LOCAL void writer_simple_free(MolochSimple_t *info) } /******************************************************************************/ -LOCAL void writer_simple_process_buf(int thread, int closing) +LOCAL MolochSimple_t *writer_simple_process_buf(int thread, int closing) { MolochSimple_t *info = currentInfo[thread]; static uint32_t lastError; @@ -282,6 +282,8 @@ LOCAL void writer_simple_process_buf(int thread, int closing) } MOLOCH_COND_SIGNAL(simpleQ); MOLOCH_UNLOCK(simpleQ); + + return currentInfo[thread]; } /******************************************************************************/ LOCAL void writer_simple_encrypt_key(char *kekId, uint8_t *dek, int deklen, char *outkeyhex) @@ -385,8 +387,10 @@ LOCAL char *writer_simple_get_kekId () return g_strndup(okek, j); } /******************************************************************************/ -LOCAL void writer_simple_write_output(int thread, MolochSimple_t *info, const unsigned char *data, int len) +LOCAL void writer_simple_write_output(int thread, const unsigned char *data, int len) { + MolochSimple_t *info = currentInfo[thread]; + switch(compressionMode) { case MOLOCH_COMPRESSION_NONE: memcpy(info->buf + info->bufpos, data, len); @@ -401,7 +405,8 @@ LOCAL void writer_simple_write_output(int thread, MolochSimple_t *info, const un while (info->file->z_strm.avail_in != 0) { // The current zlib buffer is full if (info->file->z_strm.avail_out == 0) { - writer_simple_process_buf(thread, 0); + info->bufpos = (char *)info->file->z_strm.next_out - info->buf; + info = writer_simple_process_buf(info->file->thread, 0); } deflate(&info->file->z_strm, Z_NO_FLUSH); } @@ -417,7 +422,8 @@ LOCAL void writer_simple_write_output(int thread, MolochSimple_t *info, const un while (ZSTD_compressStream2(info->file->zstd_strm, &info->file->zstd_out, &info->file->zstd_in, ZSTD_e_continue) != 0) { // The current zstd buffer is full if (info->file->zstd_out.pos == info->file->zstd_out.size) { - writer_simple_process_buf(thread, 0); + info->bufpos = info->file->zstd_out.pos; + info = writer_simple_process_buf(info->file->thread, 0); } } info->file->posInBlock += len; @@ -431,18 +437,27 @@ LOCAL void writer_simple_write_output(int thread, MolochSimple_t *info, const un info->file->packetBytesWritten += len; } /******************************************************************************/ -LOCAL void writer_simple_gzip_make_new_block(MolochSimple_t *info) +LOCAL void writer_simple_gzip_make_new_block(int thread) { + MolochSimple_t *info = currentInfo[thread]; + deflate(&info->file->z_strm, Z_FULL_FLUSH); info->bufpos = (char *)info->file->z_strm.next_out - info->buf; info->file->blockStart = info->file->z_strm.total_out; info->file->posInBlock = 0; } /******************************************************************************/ -LOCAL void writer_simple_zstd_make_new_block(MolochSimple_t *info) +LOCAL void writer_simple_zstd_make_new_block(int thread) { + MolochSimple_t *info = currentInfo[thread]; #ifdef HAVE_ZSTD - ZSTD_compressStream2(info->file->zstd_strm, &info->file->zstd_out, &info->file->zstd_in, ZSTD_e_end); + while (ZSTD_compressStream2(info->file->zstd_strm, &info->file->zstd_out, &info->file->zstd_in, ZSTD_e_end) != 0) { + // The current zstd buffer is full + if (info->file->zstd_out.pos == info->file->zstd_out.size) { + info->bufpos = info->file->zstd_out.pos; + info = writer_simple_process_buf(info->file->thread, 0); + } + } info->bufpos = info->file->zstd_out.pos; info->file->blockStart = info->file->zstd_completedBlockStart + info->file->zstd_out.pos; info->file->posInBlock = 0; @@ -451,8 +466,6 @@ LOCAL void writer_simple_zstd_make_new_block(MolochSimple_t *info) /******************************************************************************/ LOCAL void writer_simple_write(const MolochSession_t * const session, MolochPacket_t * const packet) { - MolochSimple_t *info; - if (DLL_COUNT(simple_, &simpleQ) > simpleMaxQ) { static uint32_t lastError; static uint32_t notSaved; @@ -485,6 +498,7 @@ LOCAL void writer_simple_write(const MolochSession_t * const session, MolochPack packetPosEncoding = "gap0"; } + MolochSimple_t *info; info = currentInfo[thread] = writer_simple_alloc(thread, NULL); info->file = MOLOCH_TYPE_ALLOC0(MolochSimpleFile_t); info->file->thread = thread; @@ -594,46 +608,44 @@ LOCAL void writer_simple_write(const MolochSession_t * const session, MolochPack memcpy(&pcapFileHeader2, &pcapFileHeader, 24); pcapFileHeader2.magic = 0xa1b2c3d5; pcapFileHeader2.thiszone = firstPacket[thread]; - writer_simple_write_output(thread, info, (unsigned char *)&pcapFileHeader2, 20); + writer_simple_write_output(thread, (unsigned char *)&pcapFileHeader2, 20); } else { - writer_simple_write_output(thread, info, (unsigned char *)&pcapFileHeader, 20); + writer_simple_write_output(thread, (unsigned char *)&pcapFileHeader, 20); } uint32_t linktype = moloch_packet_dlt_to_linktype(pcapFileHeader.dlt); - writer_simple_write_output(thread, info, (unsigned char *)&linktype, 4); + writer_simple_write_output(thread, (unsigned char *)&linktype, 4); if (config.debug) LOG("opened %d %s %d", thread, name, info->file->fd); g_free(name); // Make a new block for start of packets if (compressionMode == MOLOCH_COMPRESSION_GZIP) - writer_simple_gzip_make_new_block(info); + writer_simple_gzip_make_new_block(thread); else if (compressionMode == MOLOCH_COMPRESSION_ZSTD) - writer_simple_zstd_make_new_block(info); + writer_simple_zstd_make_new_block(thread); gettimeofday(&fileAge[thread], NULL); - } else { - info = currentInfo[thread]; } - packet->writerFileNum = info->file->id; + packet->writerFileNum = currentInfo[thread]->file->id; if (compressionMode == MOLOCH_COMPRESSION_GZIP) { - if (info->file->posInBlock >= simpleCompressionBlockSize) { - writer_simple_gzip_make_new_block(info); + if (currentInfo[thread]->file->posInBlock >= simpleCompressionBlockSize) { + writer_simple_gzip_make_new_block(thread); } - packet->writerFilePos = (info->file->blockStart << uncompressedBits) + info->file->posInBlock; + packet->writerFilePos = (currentInfo[thread]->file->blockStart << uncompressedBits) + currentInfo[thread]->file->posInBlock; } else if (compressionMode == MOLOCH_COMPRESSION_ZSTD) { - if (info->file->posInBlock >= simpleCompressionBlockSize) { - writer_simple_zstd_make_new_block(info); + if (currentInfo[thread]->file->posInBlock >= simpleCompressionBlockSize) { + writer_simple_zstd_make_new_block(thread); } - packet->writerFilePos = (info->file->blockStart << uncompressedBits) + info->file->posInBlock; + packet->writerFilePos = (currentInfo[thread]->file->blockStart << uncompressedBits) + currentInfo[thread]->file->posInBlock; } else { - packet->writerFilePos = info->file->pos; + packet->writerFilePos = currentInfo[thread]->file->pos; } - info->file->packets++; + currentInfo[thread]->file->packets++; if (simpleShortHeader) { char header[6]; // LLLL LLLL LLLL LLLL @@ -651,7 +663,7 @@ LOCAL void writer_simple_write(const MolochSession_t * const session, MolochPack memcpy(header+2, &t, 4); - writer_simple_write_output(thread, info, (unsigned char *)&header, 6); + writer_simple_write_output(thread, (unsigned char *)&header, 6); } else { struct moloch_pcap_sf_pkthdr hdr; @@ -659,13 +671,13 @@ LOCAL void writer_simple_write(const MolochSession_t * const session, MolochPack hdr.ts.tv_usec = packet->ts.tv_usec; hdr.caplen = packet->pktlen; hdr.pktlen = packet->pktlen; - writer_simple_write_output(thread, info, (unsigned char *)&hdr, 16); + writer_simple_write_output(thread, (unsigned char *)&hdr, 16); } - writer_simple_write_output(thread, info, packet->pkt, packet->pktlen); + writer_simple_write_output(thread, packet->pkt, packet->pktlen); - if (info->bufpos > config.pcapWriteSize) { + if (currentInfo[thread]->bufpos > config.pcapWriteSize) { writer_simple_process_buf(thread, 0); - } else if (info->file->packetBytesWritten >= config.maxFileSizeB) { + } else if (currentInfo[thread]->file->packetBytesWritten >= config.maxFileSizeB) { writer_simple_process_buf(thread, 1); } }