Skip to content

Commit

Permalink
Added noaa benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed Jul 4, 2017
1 parent 33cb08c commit bf5b21b
Show file tree
Hide file tree
Showing 5 changed files with 522 additions and 0 deletions.
201 changes: 201 additions & 0 deletions noaa/README.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
Dataset containing daily weather measurement from NOAA:
ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/

Instructions on how to recreate the json documents:
1) Download the following files:
* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2016.csv.gz
* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt
* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-countries.txt
* ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-states.txt
2) Execute: gunzip 2016.csv.gz
3) Sort the file by station: sort --field-separator=',' --key=1,2 -o 2016-sorted.csv 2016.csv
4) Execute a script like is specified at the end of the file to create json doccuments.
5) Compress the documents json file:
bzip2 -9 -c documents.json > documents.json.bz2



Python script that process the csv file into Elasticsearch json documents:

----------------------------------------------------------------------------------------------------

import os
import csv
import json
from datetime import datetime

stationsFile = 'ghcnd-stations.txt'
countriesFile = 'ghcnd-countries.txt'
statesFile = 'ghcnd-states.txt'

weatherDataFile = '2016-sorted.csv'
indexPrefix = 'weather-data'
docType = 'summary'

def loadStatesFile(statesFile):
statesMap = {}
with open(statesFile, 'r') as file:
csvreader = csv.reader(file, delimiter=' ', quotechar='"')
for row in csvreader:
statesMap[row[0].strip()] = row[1].strip()
return statesMap

def loadCountriesFile(countriesFile):
countriesMap = {}
with open(countriesFile, 'r') as file:
csvreader = csv.reader(file, delimiter=' ', quotechar='"')
for row in csvreader:
countriesMap[row[0].strip()] = row[1].strip()
return countriesMap

def loadStationsFile(stationsFile, statesFile, countriesFile):
statesMap = loadStatesFile(statesFile)
countriesMap = loadCountriesFile(countriesFile)
stationsMap = {}
with open(stationsFile, 'r') as file:
for row in file:
try:
station = {}
station['id'] = row[0:11].strip()
countryCode = row[0:2].strip()
if len(countryCode) > 0:
station['country_code'] = countryCode
station['country'] = countriesMap[countryCode]
station['location'] = {
'lat': float(row[12:20].strip()),
'lon': float(row[21:30].strip())
}
station['elevation'] = float(row[31:37].strip())
if countryCode == 'US':
stateCode = row[38:40].strip()
if len(stateCode) > 0:
station['state_code'] = stateCode
station['state'] = statesMap[stateCode]
station['name'] = row[41:71].strip()
gsn_flag = row[72:75].strip()
if len(gsn_flag) > 0:
station['gsn_flag'] = gsn_flag
hcn_crn_flag = row[76:78].strip()
if len(hcn_crn_flag) > 0:
station['hcn_crn_flag'] = hcn_crn_flag
wmo_id = row[80:85].strip()
if len(wmo_id) > 0:
station['wmo_id'] = wmo_id
stationsMap[station['id']] = station
except:
print(row)
raise e
return stationsMap

def processWeatherDoc(currentStationDoc):
if 'TMAX' in currentStationDoc:
currentStationDoc['TMAX'] = float(currentStationDoc['TMAX']) / 10.0
if 'TMIN' in currentStationDoc:
currentStationDoc['TMIN'] = float(currentStationDoc['TMIN']) / 10.0
if 'PRCP' in currentStationDoc:
currentStationDoc['PRCP'] = float(currentStationDoc['PRCP']) / 10.0
if 'AWND' in currentStationDoc:
currentStationDoc['AWND'] = float(currentStationDoc['AWND']) / 10.0
if 'EVAP' in currentStationDoc:
currentStationDoc['EVAP'] = float(currentStationDoc['EVAP']) / 10.0
if 'MDEV' in currentStationDoc:
currentStationDoc['MDEV'] = float(currentStationDoc['MDEV']) / 10.0
if 'MDPR' in currentStationDoc:
currentStationDoc['MDPR'] = float(currentStationDoc['MDPR']) / 10.0
if 'MDTN' in currentStationDoc:
currentStationDoc['MDTN'] = float(currentStationDoc['MDTN']) / 10.0
if 'MDTX' in currentStationDoc:
currentStationDoc['MDTX'] = float(currentStationDoc['MDTX']) / 10.0
if 'MNPN' in currentStationDoc:
currentStationDoc['MNPN'] = float(currentStationDoc['MNPN']) / 10.0
if 'MXPN' in currentStationDoc:
currentStationDoc['MXPN'] = float(currentStationDoc['MXPN']) / 10.0
if 'TAVG' in currentStationDoc:
currentStationDoc['TAVG'] = float(currentStationDoc['TAVG']) / 10.0
if 'THIC' in currentStationDoc:
currentStationDoc['THIC'] = float(currentStationDoc['THIC']) / 10.0
if 'TOBS' in currentStationDoc:
currentStationDoc['TOBS'] = float(currentStationDoc['TOBS']) / 10.0
if 'WESD' in currentStationDoc:
currentStationDoc['WESD'] = float(currentStationDoc['WESD']) / 10.0
if 'WESF' in currentStationDoc:
currentStationDoc['WESF'] = float(currentStationDoc['WESF']) / 10.0
if 'WSF1' in currentStationDoc:
currentStationDoc['WSF1'] = float(currentStationDoc['WSF1']) / 10.0
if 'WSF2' in currentStationDoc:
currentStationDoc['WSF2'] = float(currentStationDoc['WSF2']) / 10.0
if 'WSF5' in currentStationDoc:
currentStationDoc['WSF5'] = float(currentStationDoc['WSF5']) / 10.0
if 'WSFG' in currentStationDoc:
currentStationDoc['WSFG'] = float(currentStationDoc['WSFG']) / 10.0
if 'WSFI' in currentStationDoc:
currentStationDoc['WSFI'] = float(currentStationDoc['WSFI']) / 10.0
if 'WSFM' in currentStationDoc:
currentStationDoc['WSFM'] = float(currentStationDoc['WSFM']) / 10.0

if 'TMIN' in currentStationDoc and 'TMAX' in currentStationDoc:
if currentStationDoc['TMIN'] > currentStationDoc['TMAX']:
tmp = currentStationDoc['TMIN']
currentStationDoc['TMIN'] = currentStationDoc['TMAX']
currentStationDoc['TMAX'] = tmp
currentStationDoc['TRANGE'] = {
"gte" : currentStationDoc['TMIN'],
"lte" : currentStationDoc['TMAX']
}
if 'MDTN' in currentStationDoc and 'MDTX' in currentStationDoc:
if currentStationDoc['MDTN'] > currentStationDoc['MDTX']:
tmp = currentStationDoc['MDTN']
currentStationDoc['MDTN'] = currentStationDoc['MDTX']
currentStationDoc['MDTX'] = tmp
currentStationDoc['MDTRANGE'] = {
"gte" : currentStationDoc['MDTN'],
"lte" : currentStationDoc['MDTX']
}

indexDoc = {
'_op_type': 'create',
'_index': indexPrefix + '-' + str(currentStationDoc['date'].year),
'_type': docType,
'_id': currentStationDoc['date'].strftime('%Y-%m-%d') + '-' + currentStationDoc['station']['id'],
'_source': currentStationDoc
}
return indexDoc

def processWeatherFile(weatherDataFile, stationsMap):
with open(weatherDataFile, 'r') as file:
csvreader = csv.reader(file, delimiter=',', quotechar='"')
currentStationDoc = None
stationDocsProcessed = 0
for row in csvreader:
station = stationsMap[row[0]]
date = datetime.strptime(row[1], '%Y%m%d')
elementType = row[2]
elementValue = row[3]
if currentStationDoc == None:
currentStationDoc = {
'station': station,
'date': date,
elementType: elementValue
}
elif currentStationDoc['station'] != station or currentStationDoc['date'] != date:
yield processWeatherDoc(currentStationDoc)
stationDocsProcessed = stationDocsProcessed + 1
currentStationDoc = {
'station': station,
'date': date,
elementType: elementValue
}
else:
currentStationDoc[elementType] = elementValue

stationsMap = loadStationsFile(stationsFile, statesFile, countriesFile)
outFile = 'documents.json'
with open(outFile, 'w+') as file:
count = 0
for doc in processWeatherFile(weatherDataFile, stationsMap):
doc['_source']['date'] = doc['_source']['date'].isoformat()
file.write(json.dumps(doc['_source']))
file.write('\n')
count = count + 1
print('Wrote ' + str(count) + ' entries')
----------------------------------------------------------------------------------------------------
88 changes: 88 additions & 0 deletions noaa/challenges/default.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
{
"name": "append-no-conflicts",
"description": "Indexes the whole document corpus using Elasticsearch default settings. We only adjust the number of replicas as we benchmark a single node cluster and Rally will only start the benchmark if the cluster turns green and we want to ensure that we don't use the query cache. Document ids are unique so all index operations are append only. After that a couple of queries are run.",
"default": true,
"index-settings": {
"index.number_of_shards": 1,
"index.number_of_replicas": 0,
"index.queries.cache.enabled": false
},
"schedule": [
{
"operation": "index",
"warmup-time-period": 240,
"clients": 8
},
{
"operation": "range_field_big_range",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_small_range",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_conjunction_big_range_small_term_query",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_conjunction_small_range_small_term_query",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_conjunction_small_range_big_term_query",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_conjunction_big_range_big_term_query",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_disjunction_small_range_small_term_query",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
},
{
"operation": "range_field_disjunction_big_range_small_term_query",
"clients": 1,
"warmup-iterations": 100,
"iterations": 100,
"target-throughput": 50
}
]
},
{
"name": "append-no-conflicts-index-only",
"description": "Indexes the whole document corpus using Elasticsearch default settings.",
"index-settings": {
"index.number_of_replicas": 0
},
"schedule": [
{
"operation": "index",
"warmup-time-period": 240 ,
"clients": 8
}
]
}

21 changes: 21 additions & 0 deletions noaa/mappings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"summary": {
"properties": {
"TRANGE": {
"type": "double_range"
},
"MDTRANGE": {
"type": "double_range"
},
"station": {
"type": "object",
"properties": {
"location" : {
"type": "geo_point"
}
}
}
}
}
}

Loading

0 comments on commit bf5b21b

Please sign in to comment.