Skip to content

Commit

Permalink
Add MetadataStore module
Browse files Browse the repository at this point in the history
  • Loading branch information
ichorid committed Sep 17, 2018
1 parent 0e41d5c commit 8abb90f
Show file tree
Hide file tree
Showing 41 changed files with 1,392 additions and 49 deletions.
1 change: 1 addition & 0 deletions Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ def init(self):
tunnel_community_ports = self.session.config.get_tunnel_community_socks5_listen_ports()
self.session.config.set_anon_proxy_settings(2, ("127.0.0.1", tunnel_community_ports))


if self.session.config.get_channel_search_enabled() and self.session.config.get_dispersy_enabled():
self.session.readable_status = STATE_INITIALIZE_CHANNEL_MGR
from Tribler.Core.Modules.channel.channel_manager import ChannelManager
Expand Down
1 change: 1 addition & 0 deletions Tribler/Core/Config/config.spec
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ ec_keypair_filename = string(default='')
megacache = boolean(default=True)
log_dir = string(default=None)
testnet = boolean(default=False)
chant_channel_edit = boolean(default=False)

[allchannel_community]
enabled = boolean(default=True)
Expand Down
5 changes: 5 additions & 0 deletions Tribler/Core/Config/tribler_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ def _get_random_port(self, path):
return self.selected_ports[path]

# General
def set_chant_channel_edit(self, value):
self.config['general']['chant_channel_edit'] = bool(value)

def get_chant_channel_edit(self):
return self.config['general'].as_bool('chant_channel_edit')

def set_family_filter_enabled(self, value):
self.config['general']['family_filter'] = bool(value)
Expand Down
3 changes: 3 additions & 0 deletions Tribler/Core/Libtorrent/LibtorrentDownloadImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ def __init__(self, session, tdef):
self.deferred_added = Deferred()
self.deferred_removed = Deferred()

self.deferred_finished = Deferred() # for chant notification

self.handle_check_lc = self.register_task("handle_check", LoopingCall(self.check_handle))

def __str__(self):
Expand Down Expand Up @@ -643,6 +645,7 @@ def on_torrent_checked_alert(self, alert):
def on_torrent_finished_alert(self, alert):
self.update_lt_status(self.handle.status())
progress = self.get_state().get_progress()
self.deferred_finished.callback(self)
if self.get_mode() == DLMODE_VOD:
if progress == 1.0:
self.handle.set_sequential_download(False)
Expand Down
4 changes: 4 additions & 0 deletions Tribler/Core/Modules/MetadataStore/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""
Torrent metadata storage with ORM, signing it, serializing it to disk and
making torrents of it.
"""
103 changes: 103 additions & 0 deletions Tribler/Core/Modules/MetadataStore/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from datetime import datetime
from pony import orm
from pony.orm import db_session
from Tribler.Core.Modules.MetadataStore.serialization import serialize_metadata_gossip, MetadataTypes
from Tribler.Core.Modules.MetadataStore.torrents import define_torrent_md
from Tribler.Core.Modules.MetadataStore.channels import define_channel_md

METADATA_DB_RELATIVE_PATH = "chant.db"


def define_signed_gossip(db):
class SignedGossip(db.Entity):
rowid = orm.PrimaryKey(int, auto=True)
type = orm.Discriminator(int)
_discriminator_ = MetadataTypes.TYPELESS.value
signature = orm.Optional(buffer)
timestamp = orm.Optional(datetime, default=datetime.utcnow)
tc_pointer = orm.Optional(int, size=64, default=0)
public_key = orm.Optional(buffer)
addition_timestamp = orm.Optional(datetime, default=datetime.utcnow)

# TODO: override __init__ to automatically create the signature
# TODO: use signature as PrimaryKey. Relevant bugfix is in Pony 0.7.6

def serialized(self):
md = self.to_dict()
return serialize_metadata_gossip(md)

def to_file(self, filename):
with open(filename, 'w') as f:
f.write(self.serialized())

# TODO: automate me with Pony's entity hooks
def sign(self, key):
md_dict = self.to_dict()
serialize_metadata_gossip(md_dict, key)
self.signature = md_dict["signature"]
self.public_key = buffer(key.pub().key_to_bin())

@classmethod
def from_dict(cls, key, md_dict):
md = cls(**md_dict)
md.sign(key)
return md


def define_deleted_md(db):
class DeletedMD(db.SignedGossip):
_discriminator_ = MetadataTypes.DELETED.value
delete_signature = orm.Required(buffer)


def start_orm(db_filename, create_db=False):
# We have to dynamically define/init ORM-managed entities here to be able to support
# multiple sessions in Tribler. Each session has its own db member object which
# has it's own instances of ORM-managed classes.
db = orm.Database()

define_signed_gossip(db)
define_deleted_md(db)
define_torrent_md(db)
define_channel_md(db)

db.bind(provider='sqlite', filename=db_filename, create_db=create_db)
if create_db:
with db_session:
db.execute(sql_create_fts_table)
db.generate_mapping(create_tables=create_db)
if create_db:
with db_session:
db.execute(sql_add_fts_trigger_insert)
db.execute(sql_add_fts_trigger_delete)
db.execute(sql_add_fts_trigger_update)
return db


# This table should never be used from ORM directly.
# It is created as a VIRTUAL table by raw SQL and
# maintained by SQL triggers.
sql_create_fts_table = """
CREATE VIRTUAL TABLE FtsIndex USING FTS4
(title, tags, content='SignedGossip',
tokenize='porter' 'unicode61');"""

sql_add_fts_trigger_insert = """
CREATE TRIGGER fts_ai AFTER INSERT ON SignedGossip
BEGIN
INSERT INTO FtsIndex(rowid, title, tags) VALUES
(new.rowid, new.title, new.tags);
END;"""

sql_add_fts_trigger_delete = """
CREATE TRIGGER fts_ad AFTER DELETE ON SignedGossip
BEGIN
DELETE FROM FtsIndex WHERE rowid = old.rowid;
END;"""

sql_add_fts_trigger_update = """
CREATE TRIGGER fts_au AFTER UPDATE ON SignedGossip BEGIN
DELETE FROM FtsIndex WHERE rowid = old.rowid;
INSERT INTO FtsIndex(rowid, title, tags) VALUES (new.rowid, new.title,
new.tags);
END;"""
167 changes: 167 additions & 0 deletions Tribler/Core/Modules/MetadataStore/channels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import os
from datetime import datetime
from libtorrent import add_files, bencode, create_torrent, file_storage, set_piece_hashes, torrent_info

from pony import orm
from pony.orm import db_session

from Tribler.Core.DownloadConfig import DownloadStartupConfig
from Tribler.Core.Modules.MetadataStore.serialization import serialize_metadata_gossip, deserialize_metadata_gossip, \
MetadataTypes
from Tribler.Core.TorrentDef import TorrentDefNoMetainfo

CHANNELS_DIR_RELATIVE_PATH = "channels"
CHANNEL_DIR_NAME_LENGTH = 60 # Its not 40 to be distinct from infohash


def define_channel_md(db):
class ChannelMD(db.TorrentMD):
_discriminator_ = MetadataTypes.CHANNEL_TORRENT.value
version = orm.Optional(int, size=64, default=0)
subscribed = orm.Optional(bool, default=False)
votes = orm.Optional(int, size=64, default=0)

@classmethod
def from_dict(cls, key, md_dict):
return super(ChannelMD, cls).from_dict(key, md_dict)

def commit_to_torrent(
self,
key,
seeding_dir,
update_dict=None,
md_list=None,
buf_list=None):
# TODO: split this into update_metadata and commit_to_torrent?
buf_list = buf_list or []
buf_list.extend([e.serialized() for e in md_list or []])

(infohash, version) = create_channel_torrent(
seeding_dir, self.get_dirname(), buf_list, self.version)
# TODO: do not recreate the torrent when its contents don't change
now = datetime.utcnow()
channel_dict = self.to_dict()
channel_dict.update(update_dict or {})
channel_dict.update({"infohash": infohash,
"size": len(self.list_contents()),
"timestamp": now,
"version": version,
"torrent_date": now})
serialize_metadata_gossip(channel_dict, key)
self.set(**channel_dict)
self.garbage_collect()

def list_contents(self):
return db.TorrentMD.select(
lambda g: g.public_key == self.public_key and g != self)[:]

# TODO: use some timer independent way to get determine newer entries?
# Could try do this based on entry id, since it is increase monotonically
def newer_entries(self):
return db.SignedGossip.select(
lambda g: g.timestamp > self.timestamp and g.public_key == self.public_key)[:]

def older_entries(self):
return db.SignedGossip.select(
lambda g: g.timestamp < self.timestamp and g.public_key == self.public_key)[:]

def get_dirname(self):
# Have to limit this to support Windows file path length limit
return str(self.public_key).encode('hex')[-CHANNEL_DIR_NAME_LENGTH:]

# TODO: Pony ver. 0.7.4 introduces hybrid methods to optimize this
def garbage_collect(self):
for g in self.older_entries():
if g.type == MetadataTypes.DELETED.value:
g.delete()


def create_torrent_from_dir(directory, torrent_filename):
fs = file_storage()
add_files(fs, directory)
t = create_torrent(fs)
# FIXME: for a torrent created with flags=19 with 200+ small files
# libtorrent client_test can't see its files on disk.
# optimize_alignment + merke + mutable_torrent_support = 19
# t = create_torrent(fs, flags=19) # BUG?
t.set_priv(False)
set_piece_hashes(t, os.path.dirname(directory))
torrent = t.generate()
with open(torrent_filename, 'wb') as f:
f.write(bencode(torrent))

infohash = torrent_info(torrent).info_hash().to_bytes()
return infohash


def create_channel_torrent(channels_store_dir, title, buf_list, version):
# Create dir for metadata files
channel_dir = os.path.abspath(os.path.join(channels_store_dir, title))
if not os.path.isdir(channel_dir):
os.makedirs(channel_dir)

# FIXME: maybe add check to avoid writing unsigned entries?
# TODO: Smash together new metadata entries belonging to a single update into one giant file-blob
# Write serialized and signed metadata into files
for buf in buf_list:
version += 1
with open(os.path.join(channel_dir, str(version).zfill(9)), 'wb') as f:
f.write(buf)

# Make torrent out of dir with metadata files
torrent_filename = os.path.join(channels_store_dir, title + ".torrent")
infohash = create_torrent_from_dir(channel_dir, torrent_filename)

return infohash, version


@db_session
def process_channel_dir(db, dirname):
# TODO: add skip on file numbers for efficiency of updates.
# TODO: add error checking
for filename in sorted(os.listdir(dirname)):
load_blob(db, os.path.join(dirname, filename))


# TODO: this should probably be moved to either the torrent manager, or ChannelMD method
def download_channel(session, infohash, title):
dcfg = DownloadStartupConfig()
dcfg.set_dest_dir(session.channels_dir)
tdef = TorrentDefNoMetainfo(infohash=str(infohash), name=title)
download = session.start_download_from_tdef(tdef, dcfg)

def err(f):
print ("we got an exception: %s" % (f.getTraceback(),))
f.trap(RuntimeError)

# FIXME! Twisted eats all error messages here!
download.deferred_finished.addErrback(err)
download.deferred_finished.addCallback(
lambda handle: process_channel_dir(
session.mds, handle.get_content_dest()))
return download.deferred_finished


@db_session
def load_blob(db, filename):
with open(filename, 'rb') as f:
gsp = deserialize_metadata_gossip(f.read())
# TODO: add gossip blob checks here
if db.SignedGossip.exists(signature=gsp["signature"]):
# We already have this gossip.
return db.SignedGossip.get(signature=gsp["signature"])
if gsp["type"] == MetadataTypes.DELETED.value:
# We only allow people to delete their own entries, thus PKs must
# match
md = db.SignedGossip.get(
signature=gsp["delete_signature"],
public_key=gsp["public_key"])
if md:
md.delete()
# FIXME: return some temporary object here?
return None
elif gsp["type"] == MetadataTypes.REGULAR_TORRENT.value:
return db.TorrentMD(**gsp)
elif gsp["type"] == MetadataTypes.CHANNEL_TORRENT.value:
return db.ChannelMD(**gsp)
return None
Loading

0 comments on commit 8abb90f

Please sign in to comment.