Skip to content

Commit

Permalink
Reformatted with black
Browse files Browse the repository at this point in the history
  • Loading branch information
akkashk committed Feb 3, 2022
1 parent 37c750a commit 0c4c202
Show file tree
Hide file tree
Showing 130 changed files with 8,757 additions and 7,357 deletions.
23 changes: 12 additions & 11 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,18 @@ defaults: &defaults
# command: |
# . ci/bin/activate
# flake8 notebooker tests
#- run:
# name: Run black
# command: |
# . ci/bin/activate
# black --check -l 120 notebooker tests
# if [ $? -eq 0 ]
# then
# echo "Black worked fine."
# else
# echo "Black found differences!"; exit $?
# fi
- run:
name: Run black
command: |
pip install --upgrade pip
pip install black
black --check -l 120 .
if [ $? -eq 0 ]
then
echo "Black worked fine."
else
echo "Black found differences!"; exit $?
fi
# Test
- run:
name: Run all tests
Expand Down
5 changes: 3 additions & 2 deletions arctic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

try:
from pkg_resources import get_distribution

str_version = get_distribution(__name__).version.strip()
int_parts = tuple(int(x) for x in str_version.split('.'))
num_version = sum([1000 ** i * v for i, v in enumerate(reversed(int_parts))])
int_parts = tuple(int(x) for x in str_version.split("."))
num_version = sum([1000**i * v for i, v in enumerate(reversed(int_parts))])
register_version(str_version, num_version)
except Exception:
__version__ = None
Expand Down
53 changes: 25 additions & 28 deletions arctic/_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

logger = logging.getLogger(__name__)

CACHE_COLL = 'cache'
CACHE_DB = 'meta_db'
CACHE_SETTINGS = 'settings'
CACHE_SETTINGS_KEY = 'cache'
CACHE_COLL = "cache"
CACHE_DB = "meta_db"
CACHE_SETTINGS = "settings"
CACHE_SETTINGS_KEY = "cache"
"""
Sample cache_settings collection entry:
meta_db.cache_settings.insertOne({"type": "cache", "enabled": true, "cache_expiry": 600})
Expand All @@ -32,7 +32,7 @@ def __init__(self, client, cache_expiry=DEFAULT_CACHE_EXPIRY, cache_db=CACHE_DB,

def _get_cache_settings(self):
try:
return self._cachedb[CACHE_SETTINGS].find_one({'type': CACHE_SETTINGS_KEY})
return self._cachedb[CACHE_SETTINGS].find_one({"type": CACHE_SETTINGS_KEY})
except OperationFailure as op:
logging.debug("Cannot access %s in db: %s. Error: %s" % (CACHE_SETTINGS, CACHE_DB, op))
return None
Expand All @@ -48,13 +48,11 @@ def set_caching_state(self, enabled):

if CACHE_SETTINGS not in self._cachedb.list_collection_names():
logging.info("Creating %s collection for cache settings" % CACHE_SETTINGS)
self._cachedb[CACHE_SETTINGS].insert_one({
'type': CACHE_SETTINGS_KEY,
'enabled': enabled,
'cache_expiry': DEFAULT_CACHE_EXPIRY
})
self._cachedb[CACHE_SETTINGS].insert_one(
{"type": CACHE_SETTINGS_KEY, "enabled": enabled, "cache_expiry": DEFAULT_CACHE_EXPIRY}
)
else:
self._cachedb[CACHE_SETTINGS].update_one({'type': CACHE_SETTINGS_KEY}, {'$set': {'enabled': enabled}})
self._cachedb[CACHE_SETTINGS].update_one({"type": CACHE_SETTINGS_KEY}, {"$set": {"enabled": enabled}})
logging.info("Caching set to: %s" % enabled)

def _is_not_expired(self, cached_data, newer_than_secs):
Expand All @@ -63,9 +61,9 @@ def _is_not_expired(self, cached_data, newer_than_secs):
expiry_period = newer_than_secs
else:
cache_settings = self._get_cache_settings()
expiry_period = cache_settings['cache_expiry'] if cache_settings else DEFAULT_CACHE_EXPIRY
expiry_period = cache_settings["cache_expiry"] if cache_settings else DEFAULT_CACHE_EXPIRY

return datetime.utcnow() < cached_data['date'] + timedelta(seconds=expiry_period)
return datetime.utcnow() < cached_data["date"] + timedelta(seconds=expiry_period)

def get(self, key, newer_than_secs=None):
"""
Expand All @@ -82,44 +80,43 @@ def get(self, key, newer_than_secs=None):
cached_data = self._cachecol.find_one({"type": key})
# Check that there is data in cache and it's not stale.
if cached_data and self._is_not_expired(cached_data, newer_than_secs):
return cached_data['data']
return cached_data["data"]
except OperationFailure as op:
# Fallback to uncached version without spamming.
logging.debug("Could not read from cache due to: %s. Ask your admin to give read permissions on %s:%s",
op, CACHE_DB, CACHE_COLL)
logging.debug(
"Could not read from cache due to: %s. Ask your admin to give read permissions on %s:%s",
op,
CACHE_DB,
CACHE_COLL,
)

return None

def set(self, key, data):
try:
self._cachecol.update_one(
{"type": key},
{"$set": {"type": key, "date": datetime.utcnow(), "data": data}},
upsert=True
{"type": key}, {"$set": {"type": key, "date": datetime.utcnow(), "data": data}}, upsert=True
)
except OperationFailure as op:
logging.debug("This operation is to be run with admin permissions. Should be fine: %s", op)

def append(self, key, append_data):
try:
self._cachecol.update_one(
{'type': key},
{"type": key},
{
# Add to set will not add the same library again to the list unlike set.
'$addToSet': {'data': append_data},
'$setOnInsert': {'type': key, 'date': datetime.utcnow()}
"$addToSet": {"data": append_data},
"$setOnInsert": {"type": key, "date": datetime.utcnow()},
},
upsert=True
upsert=True,
)
except OperationFailure as op:
logging.debug("Admin is required to append to the cache: %s", op)

def delete_item_from_key(self, key, item):
try:
self._cachecol.update(
{'type': key},
{"$pull": {"data": item}}
)
self._cachecol.update({"type": key}, {"$pull": {"data": item}})
except OperationFailure as op:
logging.debug("Admin is required to remove from cache: %s", op)

Expand All @@ -131,7 +128,7 @@ def update_item_for_key(self, key, old, new):
def is_caching_enabled(self, cache_enabled_in_env):
cache_settings = self._get_cache_settings()
# Caching is enabled unless explicitly disabled. Can be disabled either by an env variable or config in mongo.
if cache_settings and not cache_settings['enabled']:
if cache_settings and not cache_settings["enabled"]:
return False
# Disabling from Mongo Setting take precedence over this env variable
if not cache_enabled_in_env:
Expand Down
13 changes: 10 additions & 3 deletions arctic/_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@

try:
from lz4.block import compress as lz4_compress, decompress as lz4_decompress
lz4_compressHC = lambda _str: lz4_compress(_str, mode='high_compression')

lz4_compressHC = lambda _str: lz4_compress(_str, mode="high_compression")
except ImportError as e:
from lz4 import compress as lz4_compress, compressHC as lz4_compressHC, decompress as lz4_decompress

# ENABLE_PARALLEL mutated in global_scope. Do not remove.
from ._config import ENABLE_PARALLEL, LZ4_HIGH_COMPRESSION, LZ4_WORKERS, LZ4_N_PARALLEL, LZ4_MINSZ_PARALLEL, \
BENCHMARK_MODE # noqa # pylint: disable=unused-import
from ._config import (
ENABLE_PARALLEL,
LZ4_HIGH_COMPRESSION,
LZ4_WORKERS,
LZ4_N_PARALLEL,
LZ4_MINSZ_PARALLEL,
BENCHMARK_MODE,
) # noqa # pylint: disable=unused-import

logger = logging.getLogger(__name__)

Expand Down
36 changes: 18 additions & 18 deletions arctic/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,44 @@
# VersionStore configuration
# -----------------------------
# Controls is the write handler can only match handlers for the specific data type. No fallback to pickling if True.
STRICT_WRITE_HANDLER_MATCH = bool(os.environ.get('STRICT_WRITE_HANDLER_MATCH'))
STRICT_WRITE_HANDLER_MATCH = bool(os.environ.get("STRICT_WRITE_HANDLER_MATCH"))


# -----------------------------
# NdArrayStore configuration
# -----------------------------
# Extra sanity checks for corruption during appends. Introduces a 5-7% performance hit (off by default)
CHECK_CORRUPTION_ON_APPEND = bool(os.environ.get('CHECK_CORRUPTION_ON_APPEND'))
CHECK_CORRUPTION_ON_APPEND = bool(os.environ.get("CHECK_CORRUPTION_ON_APPEND"))


# -----------------------------
# Serialization configuration
# -----------------------------
# If a row is too large, then auto-expand the data chunk size from the default _CHUNK_SIZE (it is 2MB)
ARCTIC_AUTO_EXPAND_CHUNK_SIZE = bool(os.environ.get('ARCTIC_AUTO_EXPAND_CHUNK_SIZE'))
ARCTIC_AUTO_EXPAND_CHUNK_SIZE = bool(os.environ.get("ARCTIC_AUTO_EXPAND_CHUNK_SIZE"))

# This is the maximum size the auto-expanding can reach in an effort trying to reach the max
MAX_DOCUMENT_SIZE = int(pymongo.common.MAX_BSON_SIZE * 0.8)

# Enables the fast check for 'can_write' of the Pandas stores (significant speed-ups for large dataframes with objects)
FAST_CHECK_DF_SERIALIZABLE = bool(os.environ.get('FAST_CHECK_DF_SERIALIZABLE'))
FAST_CHECK_DF_SERIALIZABLE = bool(os.environ.get("FAST_CHECK_DF_SERIALIZABLE"))


# -------------------------------
# Forward pointers configuration
# -------------------------------
# This enum provides all the available modes of operation for Forward pointers
class FwPointersCfg(Enum):
ENABLED = 0 # use only forward pointers, don't update segment parent references
ENABLED = 0 # use only forward pointers, don't update segment parent references
DISABLED = 1 # operate in legacy mode, update segment parent references, don't add forward pointers
HYBRID = 2 # maintain both forward pointers and parent references in segments; for reads prefer fw pointers
HYBRID = 2 # maintain both forward pointers and parent references in segments; for reads prefer fw pointers


# The version document key used to store the ObjectIDs of segments
FW_POINTERS_REFS_KEY = 'SEGMENT_SHAS'
FW_POINTERS_REFS_KEY = "SEGMENT_SHAS"

# The version document key holding the FW pointers configuration used to create this version (enabled/disabled/hybrid)
FW_POINTERS_CONFIG_KEY = 'FW_POINTERS_CONFIG'
FW_POINTERS_CONFIG_KEY = "FW_POINTERS_CONFIG"

# This variable has effect only in Hybrid mode, and controls whether forward and legacy pointers are cross-verified
ARCTIC_FORWARD_POINTERS_RECONCILE = False
Expand All @@ -58,22 +58,22 @@ class FwPointersCfg(Enum):
# Compression configuration
# ---------------------------
# Use the parallel LZ4 compress (default is True)
ENABLE_PARALLEL = not os.environ.get('DISABLE_PARALLEL')
ENABLE_PARALLEL = not os.environ.get("DISABLE_PARALLEL")

# Use the high-compression configuration for LZ4 (trade runtime speed for better compression ratio)
LZ4_HIGH_COMPRESSION = bool(os.environ.get('LZ4_HIGH_COMPRESSION'))
LZ4_HIGH_COMPRESSION = bool(os.environ.get("LZ4_HIGH_COMPRESSION"))

# For a guide on how to tune the following parameters, read:
# arctic/benchmarks/lz4_tuning/README.txt
# The size of the compression thread pool.
# Rule of thumb: use 2 for non HC (VersionStore/NDarrayStore/PandasStore, and 8 for HC (TickStore).
LZ4_WORKERS = os.environ.get('LZ4_WORKERS', 2)
LZ4_WORKERS = os.environ.get("LZ4_WORKERS", 2)

# The minimum required number of chunks to use parallel compression
LZ4_N_PARALLEL = os.environ.get('LZ4_N_PARALLEL', 16)
LZ4_N_PARALLEL = os.environ.get("LZ4_N_PARALLEL", 16)

# Minimum data size to use parallel compression
LZ4_MINSZ_PARALLEL = os.environ.get('LZ4_MINSZ_PARALLEL', 0.5 * 1024 ** 2) # 0.5 MB
LZ4_MINSZ_PARALLEL = os.environ.get("LZ4_MINSZ_PARALLEL", 0.5 * 1024**2) # 0.5 MB

# Enable this when you run the benchmark_lz4.py
BENCHMARK_MODE = False
Expand All @@ -83,28 +83,28 @@ class FwPointersCfg(Enum):
# Async arctic
# ---------------------------
# Configures the size of the workers pools used for async arctic requests
ARCTIC_ASYNC_NWORKERS = os.environ.get('ARCTIC_ASYNC_NWORKERS', 4)
ARCTIC_ASYNC_NWORKERS = os.environ.get("ARCTIC_ASYNC_NWORKERS", 4)


# -------------------------------
# Flag used to convert byte column/index/column names to unicode when read back.
# -------------------------------
FORCE_BYTES_TO_UNICODE = bool(os.environ.get('FORCE_BYTES_TO_UNICODE'))
FORCE_BYTES_TO_UNICODE = bool(os.environ.get("FORCE_BYTES_TO_UNICODE"))

# -------------------------------
# Flag used for indicating caching levels. For now just for list_libraries.
# -------------------------------
ENABLE_CACHE = not bool(os.environ.get('ARCTIC_DISABLE_CACHE'))
ENABLE_CACHE = not bool(os.environ.get("ARCTIC_DISABLE_CACHE"))

# -------------------------------
# Currently we try to bson encode if the data is less than a given size and store it in
# the version collection, but pickling might be preferable if we have characters that don't
# play well with the bson encoder or if you always want your data in the data collection.
# -------------------------------
SKIP_BSON_ENCODE_PICKLE_STORE = bool(os.environ.get('SKIP_BSON_ENCODE_PICKLE_STORE'))
SKIP_BSON_ENCODE_PICKLE_STORE = bool(os.environ.get("SKIP_BSON_ENCODE_PICKLE_STORE"))

# -------------------------------
# Maximum size up to which the input will be bson encoded and stored in the version doc instead of being pickled in
# the version store. For very large input (> 10 MB) we ignore this option and fall back to using pickle.
# -------------------------------
MAX_BSON_ENCODE = os.environ.get('MAX_BSON_ENCODE', 256 * 1024) # 256 KB
MAX_BSON_ENCODE = os.environ.get("MAX_BSON_ENCODE", 256 * 1024) # 256 KB
25 changes: 13 additions & 12 deletions arctic/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import pymongo
from pandas import DataFrame

try:
from pandas.testing import assert_frame_equal
except ImportError:
Expand All @@ -12,7 +13,7 @@

logger = logging.getLogger(__name__)

NP_OBJECT_DTYPE = np.dtype('O')
NP_OBJECT_DTYPE = np.dtype("O")

# Avoid import-time extra logic
_use_new_count_api = None
Expand All @@ -24,16 +25,16 @@ def get_fwptr_config(version):

def _detect_new_count_api():
try:
mongo_v = [int(v) for v in pymongo.version.split('.')]
mongo_v = [int(v) for v in pymongo.version.split(".")]
return mongo_v[0] >= 3 and mongo_v[1] >= 7
except:
return False


def indent(s, num_spaces):
s = s.split('\n')
s = [(num_spaces * ' ') + line for line in s]
s = '\n'.join(s)
s = s.split("\n")
s = [(num_spaces * " ") + line for line in s]
s = "\n".join(s)
return s


Expand All @@ -47,7 +48,7 @@ def are_equals(o1, o2, **kwargs):
return False


def enable_sharding(arctic, library_name, hashed=True, key='symbol'):
def enable_sharding(arctic, library_name, hashed=True, key="symbol"):
"""
Enable sharding on a library
Expand All @@ -70,16 +71,16 @@ def enable_sharding(arctic, library_name, hashed=True, key='symbol'):
dbname = lib._db.name
library_name = lib.get_top_level_collection().name
try:
c.admin.command('enablesharding', dbname)
c.admin.command("enablesharding", dbname)
except pymongo.errors.OperationFailure as e:
if 'already enabled' not in str(e):
if "already enabled" not in str(e):
raise
if not hashed:
logger.info("Range sharding '" + key + "' on: " + dbname + '.' + library_name)
c.admin.command('shardCollection', dbname + '.' + library_name, key={key: 1})
logger.info("Range sharding '" + key + "' on: " + dbname + "." + library_name)
c.admin.command("shardCollection", dbname + "." + library_name, key={key: 1})
else:
logger.info("Hash sharding '" + key + "' on: " + dbname + '.' + library_name)
c.admin.command('shardCollection', dbname + '.' + library_name, key={key: 'hashed'})
logger.info("Hash sharding '" + key + "' on: " + dbname + "." + library_name)
c.admin.command("shardCollection", dbname + "." + library_name, key={key: "hashed"})


def mongo_count(collection, filter=None, **kwargs):
Expand Down
Loading

0 comments on commit 0c4c202

Please sign in to comment.