Skip to content

Commit

Permalink
Buffer chunks and write them from old to new to the JSONL file (#74)
Browse files Browse the repository at this point in the history
The dumper base class and JSON dumper are partly rewritten and the old
dumper directory tree removed.

Formatters have yet to be adapted to the new order, using them with this
commit is pointless.
  • Loading branch information
tvdstaaij committed Mar 2, 2017
1 parent 14b3d67 commit 706776a
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 77 deletions.
20 changes: 0 additions & 20 deletions dumpers/json.rb

This file was deleted.

23 changes: 6 additions & 17 deletions dumpers/lib/dumper_interface.rb → lib/dumper_base.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
require_relative '../../lib/util'
require_relative 'util'

# To implement a dumper:
# * Create a .rb file named after the dumper in 'dumpers'
# * Require this file: require_relative 'lib/dumper_interface'
# * Declare a class SomeDumper, inheriting from DumperInterface
# * Implement one or more of the functions listed below (at least dump_msg)

# Note:
# Dumpers are a low-level construct and as of v2.0.0 they are no longer used
# for implementing custom output formats. Instead, custom formatters have been
# introduced for this purpose (see /formatters/lib/formatter_base.rb).

class DumperInterface
class DumperBase

# Will be called before backing up the first dialog
# Can be used for initialization
Expand All @@ -34,13 +23,13 @@ def msg_fresh?(msg, progress)
!progress.newest_id || MsgId.new(msg['id']) > progress.newest_id
end

# Will be called for each message to dump (from newest to oldest)
# Will be called for each chunk of messages (from newest to oldest)
# See the python binding documentation to get an idea of the msg attributes:
# https://github.com/vysheng/tg/blob/master/README-PY.md#attributes-1
# Returning boolean false causes an early abort (skips to the next dialog)
def dump_msg(dialog, msg)
# dialog, msg: Hash
raise 'dump_msg must be implemented'
def dump_chunk(dialog, messages)
# dialog: Hash, messages: Array of Hash
raise 'dump_chunk must be implemented'
end

# Will be called just after dumping a dialog's messages
Expand Down
44 changes: 27 additions & 17 deletions dumpers/lib/single_file_line_dumper.rb → lib/json_lines_dumper.rb
Original file line number Diff line number Diff line change
@@ -1,34 +1,48 @@
require 'fileutils'
require_relative 'dumper_interface'
require 'json'
require 'tempfile'
require_relative 'dumper_base'

class SingleFileLineDumper < DumperInterface
class JsonLinesDumper < DumperBase
OUTPUT_SUBDIR = 'json'
FILE_EXTENSION = '.jsonl'

def start_dialog(dialog, progress)
@prepender = nil
@rename_to = nil
@output_dir = File.join(get_backup_dir, get_output_type)
@progress = progress
@chunk_buffer = []
@output_dir = File.join(get_backup_dir, OUTPUT_SUBDIR)
@state = progress.dumper_state ? progress.dumper_state.clone : {}
output_basename = $config['friendly_data_filenames'] == false ?
dialog['id'].to_s : get_safe_name(dialog['print_name'])
output_filename = output_basename + get_file_extension
output_filename = output_basename + FILE_EXTENSION
@current_outfile = @state['outfile']
if @current_outfile
current_basename = File.basename(@current_outfile, get_file_extension)
current_basename = File.basename(@current_outfile, FILE_EXTENSION)
@rename_to = output_filename if current_basename != output_basename
@current_outfile = File.join(get_backup_dir, @current_outfile)
@prepender = DumpPrepender.new(@current_outfile)
else
FileUtils.mkdir_p(@output_dir)
@current_outfile = File.join(@output_dir, output_filename)
@state['outfile'] = relativize_output_path(@current_outfile)
end
@stream = File.open(@current_outfile, 'w:UTF-8')
end

def dump_chunk(dialog, messages)
tmpfile_prefix = "telegram-history-dump[chunk#{@chunk_buffer.length}]"
tmpfile = Tempfile.new(tmpfile_prefix, :encoding => 'UTF-8')
@chunk_buffer.push(tmpfile)
messages.each { |msg| tmpfile.puts(JSON.generate(msg)) }
end

def end_dialog(dialog)
@stream.close
@stream = nil
@prepender.merge if @prepender
File.open(@current_outfile, 'a:UTF-8') do |outstream|
@chunk_buffer.reverse_each do |f|
f.rewind
IO.copy_stream(f, outstream)
f.close!
end
end
@chunk_buffer = nil

if @rename_to && $config['update_data_filenames']
new_outfile = File.join(@output_dir, @rename_to)
begin
Expand All @@ -44,8 +58,4 @@ def end_dialog(dialog)
@state
end

def get_file_extension
raise 'get_file_extension must be implemented'
end

end
44 changes: 21 additions & 23 deletions telegram-history-dump.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
require 'socket'
require 'timeout'
require 'yaml'
require_relative 'dumpers/json'
require_relative 'formatters/lib/formatter_base'
require_relative 'lib/cli_parser'
require_relative 'lib/json_lines_dumper'
require_relative 'lib/dump_progress'
require_relative 'lib/util'
require_relative 'lib/tg_def'
Expand Down Expand Up @@ -103,15 +103,15 @@ def dump_dialog(dialog)
end
raise 'Expected array' unless msg_chunk.is_a?(Array)

fresh_messages = []
msg_chunk.reverse_each do |msg|
dump_msg = true
offset += 1

if msg['id'].to_s.empty?
$log.warn('Dropping message without id: %s' % msg)
dump_msg = false
msg_id = nil
else
msg_id = MsgId.new(msg['id'])
next
end
msg_id = MsgId.new(msg['id'])
if msg_id && prev_msg_id && msg_id >= prev_msg_id
$log.warn('Message ids are not sequential (%s[%s] -> %s[%s])' % [
prev_msg_id.raw_hex, prev_msg_id.sequence_hex,
Expand All @@ -120,38 +120,36 @@ def dump_dialog(dialog)
end
unless msg['date']
$log.warn('Dropping message without date: %s' % msg)
dump_msg = false
next
end

prev_msg_id = msg_id
cur_progress.update(msg)

if msg['text'] && filter_regex && filter_regex =~ msg['text']
dump_msg = false
end

unless $dumper.msg_fresh?(msg, old_progress)
if keep_dumping
$log.info('Reached end of new messages since last backup')
end
dump_msg = false
$log.info('Reached end of new messages since last backup')
keep_dumping = false
break
end

if dump_msg
process_media(dialog, msg)
if $dumper.dump_msg(dialog, msg) == false
keep_dumping = false
end
end
next if msg['text'] && filter_regex && filter_regex =~ msg['text']

fresh_messages.unshift(msg)

offset += 1
if $config['backlog_limit'] > 0 && offset >= $config['backlog_limit']
$log.info('Reached backlog_limit')
keep_dumping = false
break
end
end

fresh_messages.each { |msg| process_media(dialog, msg) }
unless fresh_messages.empty?
if $dumper.dump_chunk(dialog, fresh_messages) == false
keep_dumping = false
end
end

keep_dumping = false if offset < cur_offset + $config['chunk_size']
sleep($config['chunk_delay']) if keep_dumping
end
Expand Down Expand Up @@ -277,7 +275,7 @@ def save_progress

FileUtils.mkdir_p(get_backup_dir)

$dumper = JsonDumper.new
$dumper = JsonLinesDumper.new
$progress = {}
$progress_snapshot = {}
if $config['track_progress']
Expand Down

0 comments on commit 706776a

Please sign in to comment.