Skip to content

Commit

Permalink
Add smif csv2parquet helper for fast store conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
tomalrussell committed Jul 31, 2019
1 parent c43b991 commit f951de5
Showing 1 changed file with 49 additions and 5 deletions.
54 changes: 49 additions & 5 deletions src/smif/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,19 @@
"""
from __future__ import print_function

import glob
import logging
import os
import sys
from argparse import ArgumentParser

import pkg_resources

import pandas
import smif
import smif.cli.log
from smif.controller import (copy_project_folder, execute_model_run)
from smif.controller.run import *
from smif.controller import copy_project_folder, execute_model_run
from smif.controller.run import DAFNIRunScheduler, SubProcessRunScheduler
from smif.data_layer import Store
from smif.data_layer.file import (CSVDataStore, FileMetadataStore,
ParquetDataStore, YamlConfigStore)
Expand Down Expand Up @@ -239,6 +241,34 @@ def prepare_convert(args):
src_store.convert_initial_conditions_data(sector_model_name, tgt_store, args.noclobber)


def csv2parquet(args):
"""Convert CSV to Parquet - assuming the CSV can be parsed as a dataframe
"""
path = args.path
if ".csv" in path:
files = [path]
else:
files = glob.glob(os.path.join(path, '**', '*.csv'), recursive=True)

for csv_path in files:
parquet_path = csv_path.replace(".csv", ".parquet")
if args.noclobber and os.path.exists(parquet_path):
print("Skipping", csv_path)
else:
print("Converting", csv_path, flush=True)
try:
dataframe = pandas.read_csv(csv_path)
dataframe.to_parquet(parquet_path, engine='pyarrow', compression='gzip')
except UnicodeDecodeError:
# guess that cp1252 is next most common encoding we'll come across
dataframe = pandas.read_csv(csv_path, encoding='cp1252')
dataframe.to_parquet(parquet_path, engine='pyarrow', compression='gzip')
except pandas.errors.ParserError as ex:
# nothing we can do with ParserError - usually a data problem
print(ex)
continue


def prepare_scenario(args):
"""Update scenario configuration file to include multiple scenario variants.
Expand Down Expand Up @@ -335,7 +365,8 @@ def _get_store(args):
def _run_server(args):
app_folder = pkg_resources.resource_filename('smif', 'app/dist')
if args.scheduler == 'dafni' and args.interface != 'local_csv':
raise ValueError("Scheduler implementation {0}, is not valid when combined with {1}.".format(args.scheduler, args.interface))
msg = "Scheduler implementation {0}, is not valid when combined with {1}."
raise ValueError(msg.format(args.scheduler, args.interface))

if args.scheduler == 'default':
model_scheduler = SubProcessRunScheduler()
Expand Down Expand Up @@ -487,6 +518,17 @@ def parse_arguments():
parser_prepare_model_runs.add_argument(
'-e', '--end', type=int, help='Upper bound of the range of variants')

# CONVERT
parser_convert_format = subparsers.add_parser(
'csv2parquet', help='Convert CSV to Parquet. Pass a filename or a directory to ' +
'search recurisvely', parents=[parent_parser])
parser_convert_format.set_defaults(func=csv2parquet)
parser_convert_format.add_argument(
'path', help='Path to file')
parser_convert_format.add_argument(
'-nc', '--noclobber',
help='Skip converting data files which already exist as parquet', action='store_true')

# APP
parser_app = subparsers.add_parser(
'app', help='Open smif app', parents=[parent_parser])
Expand All @@ -500,9 +542,11 @@ def parse_arguments():
choices=['default', 'dafni'],
help="The module scheduling implementation to use")
parser_app.add_argument('-u', '--username',
help="The username for logging in to the dafni JobSubmissionAPI, only needed with the dafni job scheduler")
help="The username for logging in to the dafni JobSubmissionAPI, \
only needed with the dafni job scheduler")
parser_app.add_argument('-pw', '--password',
help="The password for logging in to the dafni JobSubmissionAPI, only needed with the dafni job scheduler")
help="The password for logging in to the dafni JobSubmissionAPI, \
only needed with the dafni job scheduler")

# RUN
parser_run = subparsers.add_parser(
Expand Down

0 comments on commit f951de5

Please sign in to comment.