diff --git a/noaa/README.txt b/noaa/README.txt new file mode 100644 index 00000000..5aa9cb1f --- /dev/null +++ b/noaa/README.txt @@ -0,0 +1,201 @@ +Dataset containing daily weather measurement from NOAA: +ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/ + +Instructions on how to recreate the json documents: +1) Download the following files: +* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2016.csv.gz +* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt +* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-countries.txt +* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-states.txt +2) Execute: gunzip 2016.csv.gz +3) Sort the file by station: sort --field-separator=',' --key=1,2 -o 2016-sorted.csv 2016.csv +4) Execute a script like is specified at the end of the file to create json doccuments. +5) Compress the documents json file: + bzip2 -9 -c documents.json > documents.json.bz2 + + + +Python script that process the csv file into Elasticsearch json documents: + +---------------------------------------------------------------------------------------------------- + +import os +import csv +import json +from datetime import datetime + +stationsFile = 'ghcnd-stations.txt' +countriesFile = 'ghcnd-countries.txt' +statesFile = 'ghcnd-states.txt' + +weatherDataFile = '2016-sorted.csv' +indexPrefix = 'weather-data' +docType = 'summary' + +def loadStatesFile(statesFile): + statesMap = {} + with open(statesFile, 'r') as file: + csvreader = csv.reader(file, delimiter=' ', quotechar='"') + for row in csvreader: + statesMap[row[0].strip()] = row[1].strip() + return statesMap + +def loadCountriesFile(countriesFile): + countriesMap = {} + with open(countriesFile, 'r') as file: + csvreader = csv.reader(file, delimiter=' ', quotechar='"') + for row in csvreader: + countriesMap[row[0].strip()] = row[1].strip() + return countriesMap + +def loadStationsFile(stationsFile, statesFile, countriesFile): + statesMap = loadStatesFile(statesFile) + countriesMap = loadCountriesFile(countriesFile) + stationsMap = {} + with open(stationsFile, 'r') as file: + for row in file: + try: + station = {} + station['id'] = row[0:11].strip() + countryCode = row[0:2].strip() + if len(countryCode) > 0: + station['country_code'] = countryCode + station['country'] = countriesMap[countryCode] + station['location'] = { + 'lat': float(row[12:20].strip()), + 'lon': float(row[21:30].strip()) + } + station['elevation'] = float(row[31:37].strip()) + if countryCode == 'US': + stateCode = row[38:40].strip() + if len(stateCode) > 0: + station['state_code'] = stateCode + station['state'] = statesMap[stateCode] + station['name'] = row[41:71].strip() + gsn_flag = row[72:75].strip() + if len(gsn_flag) > 0: + station['gsn_flag'] = gsn_flag + hcn_crn_flag = row[76:78].strip() + if len(hcn_crn_flag) > 0: + station['hcn_crn_flag'] = hcn_crn_flag + wmo_id = row[80:85].strip() + if len(wmo_id) > 0: + station['wmo_id'] = wmo_id + stationsMap[station['id']] = station + except: + print(row) + raise e + return stationsMap + +def processWeatherDoc(currentStationDoc): + if 'TMAX' in currentStationDoc: + currentStationDoc['TMAX'] = float(currentStationDoc['TMAX']) / 10.0 + if 'TMIN' in currentStationDoc: + currentStationDoc['TMIN'] = float(currentStationDoc['TMIN']) / 10.0 + if 'PRCP' in currentStationDoc: + currentStationDoc['PRCP'] = float(currentStationDoc['PRCP']) / 10.0 + if 'AWND' in currentStationDoc: + currentStationDoc['AWND'] = float(currentStationDoc['AWND']) / 10.0 + if 'EVAP' in currentStationDoc: + currentStationDoc['EVAP'] = float(currentStationDoc['EVAP']) / 10.0 + if 'MDEV' in currentStationDoc: + currentStationDoc['MDEV'] = float(currentStationDoc['MDEV']) / 10.0 + if 'MDPR' in currentStationDoc: + currentStationDoc['MDPR'] = float(currentStationDoc['MDPR']) / 10.0 + if 'MDTN' in currentStationDoc: + currentStationDoc['MDTN'] = float(currentStationDoc['MDTN']) / 10.0 + if 'MDTX' in currentStationDoc: + currentStationDoc['MDTX'] = float(currentStationDoc['MDTX']) / 10.0 + if 'MNPN' in currentStationDoc: + currentStationDoc['MNPN'] = float(currentStationDoc['MNPN']) / 10.0 + if 'MXPN' in currentStationDoc: + currentStationDoc['MXPN'] = float(currentStationDoc['MXPN']) / 10.0 + if 'TAVG' in currentStationDoc: + currentStationDoc['TAVG'] = float(currentStationDoc['TAVG']) / 10.0 + if 'THIC' in currentStationDoc: + currentStationDoc['THIC'] = float(currentStationDoc['THIC']) / 10.0 + if 'TOBS' in currentStationDoc: + currentStationDoc['TOBS'] = float(currentStationDoc['TOBS']) / 10.0 + if 'WESD' in currentStationDoc: + currentStationDoc['WESD'] = float(currentStationDoc['WESD']) / 10.0 + if 'WESF' in currentStationDoc: + currentStationDoc['WESF'] = float(currentStationDoc['WESF']) / 10.0 + if 'WSF1' in currentStationDoc: + currentStationDoc['WSF1'] = float(currentStationDoc['WSF1']) / 10.0 + if 'WSF2' in currentStationDoc: + currentStationDoc['WSF2'] = float(currentStationDoc['WSF2']) / 10.0 + if 'WSF5' in currentStationDoc: + currentStationDoc['WSF5'] = float(currentStationDoc['WSF5']) / 10.0 + if 'WSFG' in currentStationDoc: + currentStationDoc['WSFG'] = float(currentStationDoc['WSFG']) / 10.0 + if 'WSFI' in currentStationDoc: + currentStationDoc['WSFI'] = float(currentStationDoc['WSFI']) / 10.0 + if 'WSFM' in currentStationDoc: + currentStationDoc['WSFM'] = float(currentStationDoc['WSFM']) / 10.0 + + if 'TMIN' in currentStationDoc and 'TMAX' in currentStationDoc: + if currentStationDoc['TMIN'] > currentStationDoc['TMAX']: + tmp = currentStationDoc['TMIN'] + currentStationDoc['TMIN'] = currentStationDoc['TMAX'] + currentStationDoc['TMAX'] = tmp + currentStationDoc['TRANGE'] = { + "gte" : currentStationDoc['TMIN'], + "lte" : currentStationDoc['TMAX'] + } + if 'MDTN' in currentStationDoc and 'MDTX' in currentStationDoc: + if currentStationDoc['MDTN'] > currentStationDoc['MDTX']: + tmp = currentStationDoc['MDTN'] + currentStationDoc['MDTN'] = currentStationDoc['MDTX'] + currentStationDoc['MDTX'] = tmp + currentStationDoc['MDTRANGE'] = { + "gte" : currentStationDoc['MDTN'], + "lte" : currentStationDoc['MDTX'] + } + + indexDoc = { + '_op_type': 'create', + '_index': indexPrefix + '-' + str(currentStationDoc['date'].year), + '_type': docType, + '_id': currentStationDoc['date'].strftime('%Y-%m-%d') + '-' + currentStationDoc['station']['id'], + '_source': currentStationDoc + } + return indexDoc + +def processWeatherFile(weatherDataFile, stationsMap): + with open(weatherDataFile, 'r') as file: + csvreader = csv.reader(file, delimiter=',', quotechar='"') + currentStationDoc = None + stationDocsProcessed = 0 + for row in csvreader: + station = stationsMap[row[0]] + date = datetime.strptime(row[1], '%Y%m%d') + elementType = row[2] + elementValue = row[3] + if currentStationDoc == None: + currentStationDoc = { + 'station': station, + 'date': date, + elementType: elementValue + } + elif currentStationDoc['station'] != station or currentStationDoc['date'] != date: + yield processWeatherDoc(currentStationDoc) + stationDocsProcessed = stationDocsProcessed + 1 + currentStationDoc = { + 'station': station, + 'date': date, + elementType: elementValue + } + else: + currentStationDoc[elementType] = elementValue + +stationsMap = loadStationsFile(stationsFile, statesFile, countriesFile) +outFile = 'documents.json' +with open(outFile, 'w+') as file: + count = 0 + for doc in processWeatherFile(weatherDataFile, stationsMap): + doc['_source']['date'] = doc['_source']['date'].isoformat() + file.write(json.dumps(doc['_source'])) + file.write('\n') + count = count + 1 +print('Wrote ' + str(count) + ' entries') +---------------------------------------------------------------------------------------------------- diff --git a/noaa/challenges/default.json b/noaa/challenges/default.json new file mode 100644 index 00000000..f6127bff --- /dev/null +++ b/noaa/challenges/default.json @@ -0,0 +1,88 @@ + { + "name": "append-no-conflicts", + "description": "Indexes the whole document corpus using Elasticsearch default settings. We only adjust the number of replicas as we benchmark a single node cluster and Rally will only start the benchmark if the cluster turns green and we want to ensure that we don't use the query cache. Document ids are unique so all index operations are append only. After that a couple of queries are run.", + "default": true, + "index-settings": { + "index.number_of_shards": 1, + "index.number_of_replicas": 0, + "index.queries.cache.enabled": false + }, + "schedule": [ + { + "operation": "index", + "warmup-time-period": 240, + "clients": 8 + }, + { + "operation": "range_field_big_range", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_small_range", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_conjunction_big_range_small_term_query", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_conjunction_small_range_small_term_query", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_conjunction_small_range_big_term_query", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_conjunction_big_range_big_term_query", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_disjunction_small_range_small_term_query", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + }, + { + "operation": "range_field_disjunction_big_range_small_term_query", + "clients": 1, + "warmup-iterations": 100, + "iterations": 100, + "target-throughput": 50 + } + ] + }, + { + "name": "append-no-conflicts-index-only", + "description": "Indexes the whole document corpus using Elasticsearch default settings.", + "index-settings": { + "index.number_of_replicas": 0 + }, + "schedule": [ + { + "operation": "index", + "warmup-time-period": 240 , + "clients": 8 + } + ] + } + \ No newline at end of file diff --git a/noaa/mappings.json b/noaa/mappings.json new file mode 100644 index 00000000..1d126797 --- /dev/null +++ b/noaa/mappings.json @@ -0,0 +1,21 @@ +{ + "summary": { + "properties": { + "TRANGE": { + "type": "double_range" + }, + "MDTRANGE": { + "type": "double_range" + }, + "station": { + "type": "object", + "properties": { + "location" : { + "type": "geo_point" + } + } + } + } + } +} + diff --git a/noaa/operations/default.json b/noaa/operations/default.json new file mode 100644 index 00000000..37c8c3f4 --- /dev/null +++ b/noaa/operations/default.json @@ -0,0 +1,184 @@ + { + "name": "index", + "operation-type": "index", + "bulk-size": 5000 + }, + { + "name": "range_field_big_range", + "operation-type": "search", + "body": { + "query": { + "range": { + "TRANGE": { + "gte": 0, + "lte": 30 + } + } + } + } + }, + { + "name": "range_field_small_range", + "operation-type": "search", + "body": { + "query": { + "range": { + "TRANGE": { + "gte": -20, + "lte": -10 + } + } + } + } + }, + { + "name": "range_field_conjunction_big_range_small_term_query", + "operation-type": "search", + "body": { + "query": { + "bool": { + "must": [ + { + "term": { + "station.country_code": "JA" + } + }, + { + "range": { + "TRANGE": { + "gte": 0, + "lte": 30 + } + } + } + ] + } + } + } + }, + { + "name": "range_field_conjunction_small_range_small_term_query", + "operation-type": "search", + "body": { + "query": { + "bool": { + "must": [ + { + "term": { + "station.country_code": "JA" + } + }, + { + "range": { + "TRANGE": { + "gte": -20, + "lte": -10 + } + } + } + ] + } + } + } + }, + { + "name": "range_field_conjunction_small_range_big_term_query", + "operation-type": "search", + "body": { + "query": { + "bool": { + "must": [ + { + "term": { + "station.country_code": "US" + } + }, + { + "range": { + "TRANGE": { + "gte": -20, + "lte": -10 + } + } + } + ] + } + } + } + }, + { + "name": "range_field_conjunction_big_range_big_term_query", + "operation-type": "search", + "body": { + "query": { + "bool": { + "must": [ + { + "term": { + "station.country_code": "US" + } + }, + { + "range": { + "TRANGE": { + "gte": 0, + "lte": 30 + } + } + } + ] + } + } + } + }, + { + "name": "range_field_disjunction_small_range_small_term_query", + "operation-type": "search", + "body": { + "query": { + "bool": { + "should": [ + { + "term": { + "station.country_code": "JA" + } + }, + { + "range": { + "TRANGE": { + "gte": -20, + "lte": -10 + } + } + } + ] + } + } + } + }, + { + "name": "range_field_disjunction_big_range_small_term_query", + "operation-type": "search", + "body": { + "query": { + "bool": { + "should": [ + { + "term": { + "station.country_code": "JA" + } + }, + { + "range": { + "TRANGE": { + "gte": 0, + "lte": 30 + } + } + } + ] + } + } + } + } + \ No newline at end of file diff --git a/noaa/track.json b/noaa/track.json new file mode 100644 index 00000000..2c723d9d --- /dev/null +++ b/noaa/track.json @@ -0,0 +1,28 @@ +{% import "rally.helpers" as rally with context %} + +{ + "short-description": "Daily weather measurement summaries from around the globe.", + "description": "Indexes 10M+ weather measurement summaries from NOAA.", + "data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/noaa", + "indices": [ + { + "name": "weather-data-2016", + "types": [ + { + "name": "summary", + "mapping": "mappings.json", + "documents": "documents.json.bz2", + "document-count": 10914068, + "compressed-bytes": 57359944, + "uncompressed-bytes": 3150540222 + } + ] + } + ], + "operations": [ + {{ rally.collect(parts="operations/*.json") }} + ], + "challenges": [ + {{ rally.collect(parts="challenges/*.json") }} + ] +}