From 43d0f3c63fd28f484c7f58ff2082993ebc953a06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Arnaud?= Date: Tue, 31 May 2016 19:20:31 -0500 Subject: [PATCH] =?UTF-8?q?Adding=20option=20-=E2=80=94settings=20to=20pas?= =?UTF-8?q?s=20JSON=20file=20when=20creating=20a=20new=20index=20*=20Allow?= =?UTF-8?q?s=20to=20setup=20custom=20analyzers=20and=20other=20settings=20?= =?UTF-8?q?only=20available=20at=20index=20creation=20(not=20during=20mapp?= =?UTF-8?q?ing)=20*=20Some=20PEP8=20Cleanup=20*=20Incremented=20version=20?= =?UTF-8?q?to=201.0.2=20*=20Updated=20HISTORY.rst=20*=20Updated=20AUTHORS.?= =?UTF-8?q?rst?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + AUTHORS.rst | 1 + HISTORY.rst | 4 ++++ csv2es.py | 45 +++++++++++++++++++++++++++++++-------------- setup.py | 2 +- 5 files changed, 38 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index 3010f49..2e34cff 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ dist *.pyc *.egg-info +build diff --git a/AUTHORS.rst b/AUTHORS.rst index f4ade95..e3b2545 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -10,3 +10,4 @@ Patches and Suggestions ``````````````````````` - Christine Doig +- Sébastien Arnaud (https://about.me/arnaudsj) diff --git a/HISTORY.rst b/HISTORY.rst index b0ba94b..fbe7a48 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,10 @@ History ------- +1.0.2 (2016-05-31) +++++++++++++++++++ +- Add option --settings to pass settings when creating new index (e.g. custom analyzers) + 1.0.1 (2015-06-02) ++++++++++++++++++ - Add option to stream from stdin diff --git a/csv2es.py b/csv2es.py index 642e3ba..529316d 100644 --- a/csv2es.py +++ b/csv2es.py @@ -26,7 +26,7 @@ from retrying import retry -__version__ = '1.0.1' +__version__ = '1.0.2' thread_local = local() @@ -52,14 +52,16 @@ def documents_from_file(es, filename, delimiter, quiet): :return: generator returning document-indexing operations """ def all_docs(): - with open(filename, 'rb') if filename != '-' else sys.stdin as doc_file: + with open(filename, 'rb') if filename != '-' else sys.stdin as doc: # delimited file should include the field names as the first row - fieldnames = doc_file.next().strip().split(delimiter) - echo('Using the following ' + str(len(fieldnames)) + ' fields:', quiet) + fieldnames = doc.next().strip().split(delimiter) + echo('Using the following ' + str(len(fieldnames)) + ' fields:', + quiet) for fieldname in fieldnames: echo(fieldname, quiet) - reader = csv.DictReader(doc_file, delimiter=delimiter, fieldnames=fieldnames) + reader = csv.DictReader(doc, delimiter=delimiter, + fieldnames=fieldnames) count = 0 for row in reader: count += 1 @@ -70,7 +72,8 @@ def all_docs(): return all_docs -@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000, stop_max_attempt_number=10) +@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000, + stop_max_attempt_number=10) def local_bulk(host, index_name, doc_type, chunk): """ Bulk upload the given chunk, creating a thread local ElasticSearch instance @@ -88,7 +91,8 @@ def local_bulk(host, index_name, doc_type, chunk): thread_local.es.bulk(chunk, index=index_name, doc_type=doc_type) -def perform_bulk_index(host, index_name, doc_type, doc_fetch, docs_per_chunk, bytes_per_chunk, parallel): +def perform_bulk_index(host, index_name, doc_type, doc_fetch, docs_per_chunk, + bytes_per_chunk, parallel): """ Chunk up documents and send them to Elasticsearch in bulk. @@ -148,6 +152,8 @@ def sanitize_delimiter(delimiter, is_tab): help='File to import (or \'-\' for stdin) ') @click.option('--mapping-file', required=False, help='JSON mapping file for index') +@click.option('--settings-file', required=False, + help='JSON settings file for index') @click.option('--delimiter', required=False, help='The field delimiter to use, defaults to CSV') @click.option('--tab', is_flag=True, required=False, @@ -165,8 +171,9 @@ def sanitize_delimiter(delimiter, is_tab): @click.option('--quiet', is_flag=True, required=False, help='Minimize console output') @click.version_option(version=__version__, ) -def cli(index_name, delete_index, mapping_file, doc_type, import_file, - delimiter, tab, host, docs_per_chunk, bytes_per_chunk, parallel, quiet): +def cli(index_name, delete_index, mapping_file, settings_file, doc_type, + import_file, delimiter, tab, host, docs_per_chunk, bytes_per_chunk, + parallel, quiet): """ Bulk import a delimited file into a target Elasticsearch instance. Common delimited files include things like CSV and TSV. @@ -176,10 +183,12 @@ def cli(index_name, delete_index, mapping_file, doc_type, import_file, csv2es --index-name potatoes --doc-type potato --import-file potatoes.csv \b For a TSV file, note the tab delimiter option - csv2es --index-name tomatoes --doc-type tomato --import-file tomatoes.tsv --tab + csv2es --index-name tomatoes --doc-type tomato \ + --import-file tomatoes.tsv --tab \b For a nifty pipe-delimited file (delimiters must be one character): - csv2es --index-name pipes --doc-type pipe --import-file pipes.psv --delimiter '|' + csv2es --index-name pipes --doc-type pipe --import-file pipes.psv \ + --delimiter '|' """ @@ -191,10 +200,17 @@ def cli(index_name, delete_index, mapping_file, doc_type, import_file, es.delete_index(index_name) echo('Deleted: ' + index_name, quiet) except ElasticHttpNotFoundError: - echo('Index ' + index_name + ' not found, nothing to delete', quiet) + echo('Index ' + index_name + ' not found, nothing to delete', + quiet) try: - es.create_index(index_name) + if settings_file: + echo('Applying mapping from: ' + settings_file, quiet) + with open(settings_file) as f: + settings = json.loads(f.read()) + es.create_index(index_name, settings) + else: + es.create_index(index_name) echo('Created new index: ' + index_name, quiet) except IndexAlreadyExistsError: echo('Index ' + index_name + ' already exists', quiet) @@ -208,7 +224,8 @@ def cli(index_name, delete_index, mapping_file, doc_type, import_file, target_delimiter = sanitize_delimiter(delimiter, tab) documents = documents_from_file(es, import_file, target_delimiter, quiet) - perform_bulk_index(host, index_name, doc_type, documents, docs_per_chunk, bytes_per_chunk, parallel) + perform_bulk_index(host, index_name, doc_type, documents, docs_per_chunk, + bytes_per_chunk, parallel) if __name__ == "__main__": diff --git a/setup.py b/setup.py index 7f1ea1e..4b528ef 100644 --- a/setup.py +++ b/setup.py @@ -58,7 +58,7 @@ def get_version(): url='https://github.com/rholder/csv2es', classifiers=classifiers, keywords='elasticsearch es pyelasticsearch csv tsv bulk import kibana', - py_modules= ['csv2es'], + py_modules=['csv2es'], entry_points=''' [console_scripts] csv2es=csv2es:cli