Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NOAA benchmark #30

Merged
merged 1 commit into from
Jul 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 201 additions & 0 deletions noaa/README.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
Dataset containing daily weather measurement from NOAA:
ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/

Instructions on how to recreate the json documents:
1) Download the following files:
* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2016.csv.gz
* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt
* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-countries.txt
* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-states.txt
2) Execute: gunzip 2016.csv.gz
3) Sort the file by station: sort --field-separator=',' --key=1,2 -o 2016-sorted.csv 2016.csv
4) Execute a script like is specified at the end of the file to create json doccuments.
5) Compress the documents json file:
bzip2 -9 -c documents.json > documents.json.bz2



Python script that process the csv file into Elasticsearch json documents:

----------------------------------------------------------------------------------------------------

import os
import csv
import json
from datetime import datetime

stationsFile = 'ghcnd-stations.txt'
countriesFile = 'ghcnd-countries.txt'
statesFile = 'ghcnd-states.txt'

weatherDataFile = '2016-sorted.csv'
indexPrefix = 'weather-data'
docType = 'summary'

def loadStatesFile(statesFile):
statesMap = {}
with open(statesFile, 'r') as file:
csvreader = csv.reader(file, delimiter=' ', quotechar='"')
for row in csvreader:
statesMap[row[0].strip()] = row[1].strip()
return statesMap

def loadCountriesFile(countriesFile):
countriesMap = {}
with open(countriesFile, 'r') as file:
csvreader = csv.reader(file, delimiter=' ', quotechar='"')
for row in csvreader:
countriesMap[row[0].strip()] = row[1].strip()
return countriesMap

def loadStationsFile(stationsFile, statesFile, countriesFile):
statesMap = loadStatesFile(statesFile)
countriesMap = loadCountriesFile(countriesFile)
stationsMap = {}
with open(stationsFile, 'r') as file:
for row in file:
try:
station = {}
station['id'] = row[0:11].strip()
countryCode = row[0:2].strip()
if len(countryCode) > 0:
station['country_code'] = countryCode
station['country'] = countriesMap[countryCode]
station['location'] = {
'lat': float(row[12:20].strip()),
'lon': float(row[21:30].strip())
}
station['elevation'] = float(row[31:37].strip())
if countryCode == 'US':
stateCode = row[38:40].strip()
if len(stateCode) > 0:
station['state_code'] = stateCode
station['state'] = statesMap[stateCode]
station['name'] = row[41:71].strip()
gsn_flag = row[72:75].strip()
if len(gsn_flag) > 0:
station['gsn_flag'] = gsn_flag
hcn_crn_flag = row[76:78].strip()
if len(hcn_crn_flag) > 0:
station['hcn_crn_flag'] = hcn_crn_flag
wmo_id = row[80:85].strip()
if len(wmo_id) > 0:
station['wmo_id'] = wmo_id
stationsMap[station['id']] = station
except:
print(row)
raise e
return stationsMap

def processWeatherDoc(currentStationDoc):
if 'TMAX' in currentStationDoc:
currentStationDoc['TMAX'] = float(currentStationDoc['TMAX']) / 10.0
if 'TMIN' in currentStationDoc:
currentStationDoc['TMIN'] = float(currentStationDoc['TMIN']) / 10.0
if 'PRCP' in currentStationDoc:
currentStationDoc['PRCP'] = float(currentStationDoc['PRCP']) / 10.0
if 'AWND' in currentStationDoc:
currentStationDoc['AWND'] = float(currentStationDoc['AWND']) / 10.0
if 'EVAP' in currentStationDoc:
currentStationDoc['EVAP'] = float(currentStationDoc['EVAP']) / 10.0
if 'MDEV' in currentStationDoc:
currentStationDoc['MDEV'] = float(currentStationDoc['MDEV']) / 10.0
if 'MDPR' in currentStationDoc:
currentStationDoc['MDPR'] = float(currentStationDoc['MDPR']) / 10.0
if 'MDTN' in currentStationDoc:
currentStationDoc['MDTN'] = float(currentStationDoc['MDTN']) / 10.0
if 'MDTX' in currentStationDoc:
currentStationDoc['MDTX'] = float(currentStationDoc['MDTX']) / 10.0
if 'MNPN' in currentStationDoc:
currentStationDoc['MNPN'] = float(currentStationDoc['MNPN']) / 10.0
if 'MXPN' in currentStationDoc:
currentStationDoc['MXPN'] = float(currentStationDoc['MXPN']) / 10.0
if 'TAVG' in currentStationDoc:
currentStationDoc['TAVG'] = float(currentStationDoc['TAVG']) / 10.0
if 'THIC' in currentStationDoc:
currentStationDoc['THIC'] = float(currentStationDoc['THIC']) / 10.0
if 'TOBS' in currentStationDoc:
currentStationDoc['TOBS'] = float(currentStationDoc['TOBS']) / 10.0
if 'WESD' in currentStationDoc:
currentStationDoc['WESD'] = float(currentStationDoc['WESD']) / 10.0
if 'WESF' in currentStationDoc:
currentStationDoc['WESF'] = float(currentStationDoc['WESF']) / 10.0
if 'WSF1' in currentStationDoc:
currentStationDoc['WSF1'] = float(currentStationDoc['WSF1']) / 10.0
if 'WSF2' in currentStationDoc:
currentStationDoc['WSF2'] = float(currentStationDoc['WSF2']) / 10.0
if 'WSF5' in currentStationDoc:
currentStationDoc['WSF5'] = float(currentStationDoc['WSF5']) / 10.0
if 'WSFG' in currentStationDoc:
currentStationDoc['WSFG'] = float(currentStationDoc['WSFG']) / 10.0
if 'WSFI' in currentStationDoc:
currentStationDoc['WSFI'] = float(currentStationDoc['WSFI']) / 10.0
if 'WSFM' in currentStationDoc:
currentStationDoc['WSFM'] = float(currentStationDoc['WSFM']) / 10.0

if 'TMIN' in currentStationDoc and 'TMAX' in currentStationDoc:
if currentStationDoc['TMIN'] > currentStationDoc['TMAX']:
tmp = currentStationDoc['TMIN']
currentStationDoc['TMIN'] = currentStationDoc['TMAX']
currentStationDoc['TMAX'] = tmp
currentStationDoc['TRANGE'] = {
"gte" : currentStationDoc['TMIN'],
"lte" : currentStationDoc['TMAX']
}
if 'MDTN' in currentStationDoc and 'MDTX' in currentStationDoc:
if currentStationDoc['MDTN'] > currentStationDoc['MDTX']:
tmp = currentStationDoc['MDTN']
currentStationDoc['MDTN'] = currentStationDoc['MDTX']
currentStationDoc['MDTX'] = tmp
currentStationDoc['MDTRANGE'] = {
"gte" : currentStationDoc['MDTN'],
"lte" : currentStationDoc['MDTX']
}

indexDoc = {
'_op_type': 'create',
'_index': indexPrefix + '-' + str(currentStationDoc['date'].year),
'_type': docType,
'_id': currentStationDoc['date'].strftime('%Y-%m-%d') + '-' + currentStationDoc['station']['id'],
'_source': currentStationDoc
}
return indexDoc

def processWeatherFile(weatherDataFile, stationsMap):
with open(weatherDataFile, 'r') as file:
csvreader = csv.reader(file, delimiter=',', quotechar='"')
currentStationDoc = None
stationDocsProcessed = 0
for row in csvreader:
station = stationsMap[row[0]]
date = datetime.strptime(row[1], '%Y%m%d')
elementType = row[2]
elementValue = row[3]
if currentStationDoc == None:
currentStationDoc = {
'station': station,
'date': date,
elementType: elementValue
}
elif currentStationDoc['station'] != station or currentStationDoc['date'] != date:
yield processWeatherDoc(currentStationDoc)
stationDocsProcessed = stationDocsProcessed + 1
currentStationDoc = {
'station': station,
'date': date,
elementType: elementValue
}
else:
currentStationDoc[elementType] = elementValue

stationsMap = loadStationsFile(stationsFile, statesFile, countriesFile)
outFile = 'documents.json'
with open(outFile, 'w+') as file:
count = 0
for doc in processWeatherFile(weatherDataFile, stationsMap):
doc['_source']['date'] = doc['_source']['date'].isoformat()
file.write(json.dumps(doc['_source']))
file.write('\n')
count = count + 1
print('Wrote ' + str(count) + ' entries')
----------------------------------------------------------------------------------------------------
88 changes: 88 additions & 0 deletions noaa/challenges/default.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
{
"name": "append-no-conflicts",
"description": "Indexes the whole document corpus using Elasticsearch default settings. We only adjust the number of replicas as we benchmark a single node cluster and Rally will only start the benchmark if the cluster turns green and we want to ensure that we don't use the query cache. Document ids are unique so all index operations are append only. After that a couple of queries are run.",
"default": true,
"index-settings": {
"index.number_of_shards": 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"index.number_of_replicas": 0,
"index.queries.cache.enabled": false
},
"schedule": [
{
"operation": "index",
"warmup-time-period": 240,
"clients": 8
},
{
"operation": "range_field_big_range",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_small_range",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_conjunction_big_range_small_term_query",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_conjunction_small_range_small_term_query",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_conjunction_small_range_big_term_query",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_conjunction_big_range_big_term_query",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_disjunction_small_range_small_term_query",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_disjunction_big_range_small_term_query",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
}
]
},
{
"name": "append-no-conflicts-index-only",
"description": "Indexes the whole document corpus using Elasticsearch default settings.",
"index-settings": {
"index.number_of_replicas": 0
},
"schedule": [
{
"operation": "index",
"warmup-time-period": 240 ,
"clients": 8
}
]
}

21 changes: 21 additions & 0 deletions noaa/mappings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"summary": {
"properties": {
"TRANGE": {
"type": "double_range"
},
"MDTRANGE": {
"type": "double_range"
},
"station": {
"type": "object",
"properties": {
"location" : {
"type": "geo_point"
}
}
}
}
}
}

Loading