From bf5b21b95c9a38ea699a3bad3dbdd6447bd2d9d6 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 3 Jul 2017 16:37:35 +0200 Subject: [PATCH] Added noaa benchmark --- noaa/README.txt | 201 +++++++++++++++++++++++++++++++++++ noaa/challenges/default.json | 88 +++++++++++++++ noaa/mappings.json | 21 ++++ noaa/operations/default.json | 184 ++++++++++++++++++++++++++++++++ noaa/track.json | 28 +++++ 5 files changed, 522 insertions(+) create mode 100644 noaa/README.txt create mode 100644 noaa/challenges/default.json create mode 100644 noaa/mappings.json create mode 100644 noaa/operations/default.json create mode 100644 noaa/track.json diff --git a/noaa/README.txt b/noaa/README.txt new file mode 100644 index 00000000..5aa9cb1f --- /dev/null +++ b/noaa/README.txt @@ -0,0 +1,201 @@ +Dataset containing daily weather measurement from NOAA: +ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/ + +Instructions on how to recreate the json documents: +1) Download the following files: +* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2016.csv.gz +* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt +* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-countries.txt +* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-states.txt +2) Execute: gunzip 2016.csv.gz +3) Sort the file by station: sort --field-separator=',' --key=1,2 -o 2016-sorted.csv 2016.csv +4) Execute a script like is specified at the end of the file to create json doccuments. +5) Compress the documents json file: + bzip2 -9 -c documents.json > documents.json.bz2 + + + +Python script that process the csv file into Elasticsearch json documents: + +---------------------------------------------------------------------------------------------------- + +import os +import csv +import json +from datetime import datetime + +stationsFile = 'ghcnd-stations.txt' +countriesFile = 'ghcnd-countries.txt' +statesFile = 'ghcnd-states.txt' + +weatherDataFile = '2016-sorted.csv' +indexPrefix = 'weather-data' +docType = 'summary' + +def loadStatesFile(statesFile): + statesMap = {} + with open(statesFile, 'r') as file: + csvreader = csv.reader(file, delimiter=' ', quotechar='"') + for row in csvreader: + statesMap[row[0].strip()] = row[1].strip() + return statesMap + +def loadCountriesFile(countriesFile): + countriesMap = {} + with open(countriesFile, 'r') as file: + csvreader = csv.reader(file, delimiter=' ', quotechar='"') + for row in csvreader: + countriesMap[row[0].strip()] = row[1].strip() + return countriesMap + +def loadStationsFile(stationsFile, statesFile, countriesFile): + statesMap = loadStatesFile(statesFile) + countriesMap = loadCountriesFile(countriesFile) + stationsMap = {} + with open(stationsFile, 'r') as file: + for row in file: + try: + station = {} + station['id'] = row[0:11].strip() + countryCode = row[0:2].strip() + if len(countryCode) > 0: + station['country_code'] = countryCode + station['country'] = countriesMap[countryCode] + station['location'] = { + 'lat': float(row[12:20].strip()), + 'lon': float(row[21:30].strip()) + } + station['elevation'] = float(row[31:37].strip()) + if countryCode == 'US': + stateCode = row[38:40].strip() + if len(stateCode) > 0: + station['state_code'] = stateCode + station['state'] = statesMap[stateCode] + station['name'] = row[41:71].strip() + gsn_flag = row[72:75].strip() + if len(gsn_flag) > 0: + station['gsn_flag'] = gsn_flag + hcn_crn_flag = row[76:78].strip() + if len(hcn_crn_flag) > 0: + station['hcn_crn_flag'] = hcn_crn_flag + wmo_id = row[80:85].strip() + if len(wmo_id) > 0: + station['wmo_id'] = wmo_id + stationsMap[station['id']] = station + except: + print(row) + raise e + return stationsMap + +def processWeatherDoc(currentStationDoc): + if 'TMAX' in currentStationDoc: + currentStationDoc['TMAX'] = float(currentStationDoc['TMAX']) / 10.0 + if 'TMIN' in currentStationDoc: + currentStationDoc['TMIN'] = float(currentStationDoc['TMIN']) / 10.0 + if 'PRCP' in currentStationDoc: + currentStationDoc['PRCP'] = float(currentStationDoc['PRCP']) / 10.0 + if 'AWND' in currentStationDoc: + currentStationDoc['AWND'] = float(currentStationDoc['AWND']) / 10.0 + if 'EVAP' in currentStationDoc: + currentStationDoc['EVAP'] = float(currentStationDoc['EVAP']) / 10.0 + if 'MDEV' in currentStationDoc: + currentStationDoc['MDEV'] = float(currentStationDoc['MDEV']) / 10.0 + if 'MDPR' in currentStationDoc: + currentStationDoc['MDPR'] = float(currentStationDoc['MDPR']) / 10.0 + if 'MDTN' in currentStationDoc: + currentStationDoc['MDTN'] = float(currentStationDoc['MDTN']) / 10.0 + if 'MDTX' in currentStationDoc: + currentStationDoc['MDTX'] = float(currentStationDoc['MDTX']) / 10.0 + if 'MNPN' in currentStationDoc: + currentStationDoc['MNPN'] = float(currentStationDoc['MNPN']) / 10.0 + if 'MXPN' in currentStationDoc: + currentStationDoc['MXPN'] = float(currentStationDoc['MXPN']) / 10.0 + if 'TAVG' in currentStationDoc: + currentStationDoc['TAVG'] = float(currentStationDoc['TAVG']) / 10.0 + if 'THIC' in currentStationDoc: + currentStationDoc['THIC'] = float(currentStationDoc['THIC']) / 10.0 + if 'TOBS' in currentStationDoc: + currentStationDoc['TOBS'] = float(currentStationDoc['TOBS']) / 10.0 + if 'WESD' in currentStationDoc: + currentStationDoc['WESD'] = float(currentStationDoc['WESD']) / 10.0 + if 'WESF' in currentStationDoc: + currentStationDoc['WESF'] = float(currentStationDoc['WESF']) / 10.0 + if 'WSF1' in currentStationDoc: + currentStationDoc['WSF1'] = float(currentStationDoc['WSF1']) / 10.0 + if 'WSF2' in currentStationDoc: + currentStationDoc['WSF2'] = float(currentStationDoc['WSF2']) / 10.0 + if 'WSF5' in currentStationDoc: + currentStationDoc['WSF5'] = float(currentStationDoc['WSF5']) / 10.0 + if 'WSFG' in currentStationDoc: + currentStationDoc['WSFG'] = float(currentStationDoc['WSFG']) / 10.0 + if 'WSFI' in currentStationDoc: + currentStationDoc['WSFI'] = float(currentStationDoc['WSFI']) / 10.0 + if 'WSFM' in currentStationDoc: + currentStationDoc['WSFM'] = float(currentStationDoc['WSFM']) / 10.0 + + if 'TMIN' in currentStationDoc and 'TMAX' in currentStationDoc: + if currentStationDoc['TMIN'] > currentStationDoc['TMAX']: + tmp = currentStationDoc['TMIN'] + currentStationDoc['TMIN'] = currentStationDoc['TMAX'] + currentStationDoc['TMAX'] = tmp + currentStationDoc['TRANGE'] = { + "gte" : currentStationDoc['TMIN'], + "lte" : currentStationDoc['TMAX'] + } + if 'MDTN' in currentStationDoc and 'MDTX' in currentStationDoc: + if currentStationDoc['MDTN'] > currentStationDoc['MDTX']: + tmp = currentStationDoc['MDTN'] + currentStationDoc['MDTN'] = currentStationDoc['MDTX'] + currentStationDoc['MDTX'] = tmp + currentStationDoc['MDTRANGE'] = { + "gte" : currentStationDoc['MDTN'], + "lte" : currentStationDoc['MDTX'] + } + + indexDoc = { + '_op_type': 'create', + '_index': indexPrefix + '-' + str(currentStationDoc['date'].year), + '_type': docType, + '_id': currentStationDoc['date'].strftime('%Y-%m-%d') + '-' + currentStationDoc['station']['id'], + '_source': currentStationDoc + } + return indexDoc + +def processWeatherFile(weatherDataFile, stationsMap): + with open(weatherDataFile, 'r') as file: + csvreader = csv.reader(file, delimiter=',', quotechar='"') + currentStationDoc = None + stationDocsProcessed = 0 + for row in csvreader: + station = stationsMap[row[0]] + date = datetime.strptime(row[1], '%Y%m%d') + elementType = row[2] + elementValue = row[3] + if currentStationDoc == None: + currentStationDoc = { + 'station': station, + 'date': date, + elementType: elementValue + } + elif currentStationDoc['station'] != station or currentStationDoc['date'] != date: + yield processWeatherDoc(currentStationDoc) + stationDocsProcessed = stationDocsProcessed + 1 + currentStationDoc = { + 'station': station, + 'date': date, + elementType: elementValue + } + else: + currentStationDoc[elementType] = elementValue + +stationsMap = loadStationsFile(stationsFile, statesFile, countriesFile) +outFile = 'documents.json' +with open(outFile, 'w+') as file: + count = 0 + for doc in processWeatherFile(weatherDataFile, stationsMap): + doc['_source']['date'] = doc['_source']['date'].isoformat() + file.write(json.dumps(doc['_source'])) + file.write('\n') + count = count + 1 +print('Wrote ' + str(count) + ' entries') +---------------------------------------------------------------------------------------------------- diff --git a/noaa/challenges/default.json b/noaa/challenges/default.json new file mode 100644 index 00000000..f6127bff --- /dev/null +++ b/noaa/challenges/default.json @@ -0,0 +1,88 @@ + { + "name": "append-no-conflicts", + "description": "Indexes the whole document corpus using Elasticsearch default settings. We only adjust the number of replicas as we benchmark a single node cluster and Rally will only start the benchmark if the cluster turns green and we want to ensure that we don't use the query cache. Document ids are unique so all index operations are append only. After that a couple of queries are run.", + "default": true, + "index-settings": { + "index.number_of_shards": 1, + "index.number_of_replicas": 0, + "index.queries.cache.enabled": false + }, + "schedule": [ + { + "operation": "index", + "warmup-time-period": 240, + "clients": 8 + }, + { + "operation": "range_field_big_range", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_small_range", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_conjunction_big_range_small_term_query", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_conjunction_small_range_small_term_query", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_conjunction_small_range_big_term_query", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_conjunction_big_range_big_term_query", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_disjunction_small_range_small_term_query", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_disjunction_big_range_small_term_query", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + } + ] + }, + { + "name": "append-no-conflicts-index-only", + "description": "Indexes the whole document corpus using Elasticsearch default settings.", + "index-settings": { + "index.number_of_replicas": 0 + }, + "schedule": [ + { + "operation": "index", + "warmup-time-period": 240 , + "clients": 8 + } + ] + } + \ No newline at end of file diff --git a/noaa/mappings.json b/noaa/mappings.json new file mode 100644 index 00000000..1d126797 --- /dev/null +++ b/noaa/mappings.json @@ -0,0 +1,21 @@ +{ + "summary": { + "properties": { + "TRANGE": { + "type": "double_range" + }, + "MDTRANGE": { + "type": "double_range" + }, + "station": { + "type": "object", + "properties": { + "location" : { + "type": "geo_point" + } + } + } + } + } +} + diff --git a/noaa/operations/default.json b/noaa/operations/default.json new file mode 100644 index 00000000..37c8c3f4 --- /dev/null +++ b/noaa/operations/default.json @@ -0,0 +1,184 @@ + { + "name": "index", + "operation-type": "index", + "bulk-size": 5000 + }, + { + "name": "range_field_big_range", + "operation-type": "search", + "body": { + "query": { + "range": { + "TRANGE": { + "gte": 0, + "lte": 30 + } + } + } + } + }, + { + "name": "range_field_small_range", + "operation-type": "search", + "body": { + "query": { + "range": { + "TRANGE": { + "gte": -20, + "lte": -10 + } + } + } + } + }, + { + "name": "range_field_conjunction_big_range_small_term_query", + "operation-type": "search", + "body": { + "query": { + "bool": { + "must": [ + { + "term": { + "station.country_code": "JA" + } + }, + { + "range": { + "TRANGE": { + "gte": 0, + "lte": 30 + } + } + } + ] + } + } + } + }, + { + "name": "range_field_conjunction_small_range_small_term_query", + "operation-type": "search", + "body": { + "query": { + "bool": { + "must": [ + { + "term": { + "station.country_code": "JA" + } + }, + { + "range": { + "TRANGE": { + "gte": -20, + "lte": -10 + } + } + } + ] + } + } + } + }, + { + "name": "range_field_conjunction_small_range_big_term_query", + "operation-type": "search", + "body": { + "query": { + "bool": { + "must": [ + { + "term": { + "station.country_code": "US" + } + }, + { + "range": { + "TRANGE": { + "gte": -20, + "lte": -10 + } + } + } + ] + } + } + } + }, + { + "name": "range_field_conjunction_big_range_big_term_query", + "operation-type": "search", + "body": { + "query": { + "bool": { + "must": [ + { + "term": { + "station.country_code": "US" + } + }, + { + "range": { + "TRANGE": { + "gte": 0, + "lte": 30 + } + } + } + ] + } + } + } + }, + { + "name": "range_field_disjunction_small_range_small_term_query", + "operation-type": "search", + "body": { + "query": { + "bool": { + "should": [ + { + "term": { + "station.country_code": "JA" + } + }, + { + "range": { + "TRANGE": { + "gte": -20, + "lte": -10 + } + } + } + ] + } + } + } + }, + { + "name": "range_field_disjunction_big_range_small_term_query", + "operation-type": "search", + "body": { + "query": { + "bool": { + "should": [ + { + "term": { + "station.country_code": "JA" + } + }, + { + "range": { + "TRANGE": { + "gte": 0, + "lte": 30 + } + } + } + ] + } + } + } + } + \ No newline at end of file diff --git a/noaa/track.json b/noaa/track.json new file mode 100644 index 00000000..2c723d9d --- /dev/null +++ b/noaa/track.json @@ -0,0 +1,28 @@ +{% import "rally.helpers" as rally with context %} + +{ + "short-description": "Daily weather measurement summaries from around the globe.", + "description": "Indexes 10M+ weather measurement summaries from NOAA.", + "data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/noaa", + "indices": [ + { + "name": "weather-data-2016", + "types": [ + { + "name": "summary", + "mapping": "mappings.json", + "documents": "documents.json.bz2", + "document-count": 10914068, + "compressed-bytes": 57359944, + "uncompressed-bytes": 3150540222 + } + ] + } + ], + "operations": [ + {{ rally.collect(parts="operations/*.json") }} + ], + "challenges": [ + {{ rally.collect(parts="challenges/*.json") }} + ] +}