Skip to content

Commit

Permalink
Merge pull request #187 from IQuOD/postgres
Browse files Browse the repository at this point in the history
Postgres
  • Loading branch information
bkatiemills authored Aug 24, 2016
2 parents 707340b + 30d47e1 commit b1711a9
Show file tree
Hide file tree
Showing 103 changed files with 1,542 additions and 16,648 deletions.
111 changes: 49 additions & 62 deletions AutoQC.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from wodpy import wod
import glob, time
import numpy as np
import sys, os, json, data.ds
import sys, os, data.ds
import util.main as main
import pandas
import psycopg2
from multiprocessing import Pool
import tempfile

def run(test, profiles):
def run(test, profiles, parameters):
'''
run <test> on a list of <profiles>, return an array summarizing when exceptions were raised
'''
qcResults = []
verbose = []
exec('from qctests import ' + test)
for profile in profiles:
exec('result = ' + test + '.test(profile)')
exec('result = ' + test + '.test(profile, parameters)')

#demand tests returned bools:
for i in result:
Expand All @@ -23,50 +26,6 @@ def run(test, profiles):
verbose.append(result)
return [qcResults, verbose]

def processFile(fName):
# run each test on each profile, and record its summary & verbose performance
testResults = []
testVerbose = []
trueResults = []
trueVerbose = []
profileIDs = []
firstProfile = True
currentFile = ''
f = None

# keep a list of only the profiles in this thread
data.ds.threadProfiles = main.extractProfiles([fName])
data.ds.threadFile = fName

for iprofile, pinfo in enumerate(data.ds.threadProfiles):
# Load the profile data.
p, currentFile, f = main.profileData(pinfo, currentFile, f)
# Check that there are temperature data in the profile, otherwise skip.
if p.var_index() is None:
continue
main.catchFlags(p)
if np.sum(p.t().mask == False) == 0:
continue
# Run each test.
for itest, test in enumerate(testNames):
result = run(test, [p])
if firstProfile:
testResults.append(result[0])
testVerbose.append(result[1])
else:
testResults[itest].append(result[0][0])
testVerbose[itest].append(result[1][0])
firstProfile = False
# Read the reference result.
truth = main.referenceResults([p])
trueResults.append(truth[0][0])
trueVerbose.append(truth[1][0])
profileIDs.append(p.uid())
# testResults[i][j] now contains a flag indicating the exception raised by test i on profile j

return trueResults, testResults, profileIDs


########################################
# main
########################################
Expand All @@ -81,24 +40,52 @@ def processFile(fName):
for testName in testNames:
print(' {}'.format(testName))

# Identify data files and create a profile list.
filenames = main.readInput('datafiles.json')
profiles = main.extractProfiles(filenames)
data.ds.profiles = profiles
print('\n{} file(s) will be read containing {} profiles'.format(len(filenames), len(profiles)))

# Parallel processing.
print('\nPlease wait while QC is performed\n')
processFile.parallel = main.parallel_function(processFile, sys.argv[2])
parallel_result = processFile.parallel(filenames)

# Recombine results
truth, results, profileIDs = main.combineArrays(parallel_result)
def process_row(uid):
'''run all tests on the indicated database row'''

# extract profile
profile = main.get_profile_from_db(cur, uid)

# Check that there are temperature data in the profile, otherwise skip.
if profile.var_index() is None:
return
main.catchFlags(profile)
if np.sum(profile.t().mask == False) == 0:
return

# run tests
for itest, test in enumerate(testNames):
result = run(test, [profile], parameterStore)
query = "UPDATE " + sys.argv[1] + " SET " + test.lower() + " = " + str(result[0][0]) + " WHERE uid = " + str(profile.uid()) + ";"
cur.execute(query)

# Print summary statistics and write output file.
main.printSummary(truth, results, testNames)
main.generateCSV(truth, results, testNames, profileIDs, sys.argv[1])
# set up global parmaeter store
parameterStore = {}
for test in testNames:
exec('from qctests import ' + test)
try:
exec(test + '.loadParameters(parameterStore)')
except:
print 'No parameters to load for', test

# connect to database & fetch list of all uids
conn = psycopg2.connect("dbname='root' user='root'")
conn.autocommit = True
cur = conn.cursor()
cur.execute('SELECT uid FROM ' + sys.argv[1])
uids = cur.fetchall()

# launch async processes
pool = Pool(processes=int(sys.argv[2]))
for i in range(len(uids)):
pool.apply_async(process_row, (uids[i][0],))
pool.close()
pool.join()

else:
print 'Please add command line arguments to name your output file and set parallelization:'
print 'python AutoQC myFile 4'
print 'will result in output written to results-myFile.csv, and will run the calculation parallelized across 4 cores.'
print 'python AutoQC <test group> <database table>'
print 'will write qc results to <database table> in the database, and run the calculation parallelized across <number of threads> cores.'
72 changes: 54 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ docker pull iquod/autoqc
Start the image via

```
docker run -i -t iquod/autoqc /bin/bash
docker run --sysctl "kernel.shmmax=18446744073692774399" -v $PWD:/rawdata -i -t iquod/autoqc /bin/bash
```

And you'll find AutoQC all set up and ready to use in the directory `/AutoQC`. Note that the version of AutoQC that ships with the docker image may be behind master on GitHub; you can always do `git pull origin master` from the `/AutoQC` directory inside the image, if you need an update.
And you'll find AutoQC all set up and ready to use in the directory `/AutoQC`. Note that the version of AutoQC that ships with the docker image may be behind master on GitHub; you can always do `git pull origin master` from the `/AutoQC` directory inside the container, if you need an update. Also, whatever directory you launched this command from will be mounted on `/rawdata` inside your Docker container; use this to bring data into the container, or copy logs and files from within the container to this location to access them after Docker exits.

If you want to run AutoQC without Docker, have a look at the setup steps in `docker/Dockerfile`; these correspond to the same setup steps you'll need to do on a similar machine (i.e. on Debian with miniconda already installed).

Expand Down Expand Up @@ -65,26 +65,64 @@ cd data
Finally, launch your docker image with the `data` directory mounted inside it at `/rawdata`:

```
sudo docker run -v $PWD:/rawdata -i -t iquod/autoqc /bin/bash
sudo docker run --sysctl "kernel.shmmax=18446744073692774399" -v $PWD:/rawdata -i -t iquod/autoqc /bin/bash
```

And once again, AutoQC will be all set up in `/AutoQC`. Remember to `git pull` if necessary, and add any external data or parameter files to the correct places.

##Usage
To execute the quality control checks,
`python AutoQC.py name nProcessors`

where `name` names the output csv naming as `result-name.csv`, and `nProcessors` is the number of cores to parallelize over.
AutoQC runs in three steps: database construction, qc running, and result summarization.

##Structure
`AutoQC.py` performs the following:
- automatically detects all quality control tests found in `/qctests`
- takes the list of raw data files from `datafiles.json`, and decodes their contents into an array of profile objects
- runs all the automatically detected tests over each of these profiles
- return an array for each test indicating which profiles exceptions were raised for, and an array indicating the expected result for each profile
### Database Construction

```
python build-db.py filename tablename
```

Where `filename` is the name of a WOD-ascii file to read profiles from, and `tablename` is the name of a postgres table to write the results to; `tablename` will be created if it doesn't
exist, or appended to if it does. `tablename` will have the following columns:

column name | description
------------|-----------
`raw` | the raw WOD-ASCII text originally found in the input file
`truth` | whether any temperature qc levels were flagged at 3 or greater
`uid` | unique profile serial number
`year` | timestamp year
`month` | timestamp month, integers [1,12]
`day` | timestamp day, integers [1,31]
`time` | timestamp walltime, real [0,24)
`lat` | profile latitude
`long` | profile longitude
`cruise` | cruise id
`probe` | probe index, per WOD specifications

Additionally, there is a column in the table for the qc results of every test found in the `/qctests` directory; these columns are filled in in the next step.

### QC Execution

```
python AutoQC.py tablename nProcessors
```

where `tablename` is the postgres table to pull profiles from (probably the same as `tablename` in the last step), and `nProcessors` is how many processors you'd like to parallelize over

### Result Summary

```
python summarize-results.py tablename
```

where `tablename` is the postgres table used in the previous steps. A summary of true flags, true passes, false positives and false negatives is generated for each test.


##Testing

###Testing Data
Each quality control test must be written as its own file in `/qctests`, of the form `def test(p)`, where `p` is a profile object; each test returns a bool, where `True` indicates the test has *failed*.
Each quality control test must be written as its own file in `/qctests`, of the form `def test(p, parameters)`, where `p` is a profile object; each test returns a bool, where `True` indicates the test has *failed*.
`parameters` is a dictionary for conveniently persisting *static* parameters and sharing them between threads; if your test has a great deal of parameters to load before it runs, include alongside its definition a `loadParmaeters(dict)` method, which writes those
parameters to keys of your choosing on the dictionary passed in as an argument to `loadParameters`. That dictionary will subsequently be passed into every qc test as the `parameters` argument. Calling this `loadParameters` function is done automatically by the qc framework;
it is enough for you to just write it, and the parameters you want will be available in your qc test on the keys you defined on the `parameters` object.

###Testing Code
To run the code tests:
Expand All @@ -94,11 +132,9 @@ pip install nose
nosetests tests/*.py
```

###Data
Each data file listed in `datafiles.json` is in the World Ocean Database (WOD; http://www.nodc.noaa.gov/OC5/WOD/pr_wod.html) ASCII format.

###Profile Objects Specification
See [the docs](https://github.com/IQuOD/AutoQC/blob/master/dataio/README.md) for the WodProfile class, a decoding helper for the WOD ASCII format.
##Profile Objects Specification
See [wodpy package](https://github.com/IQuOD/wodpy) for more information on the WodProfile class, a decoding helper for the WOD ASCII format.

##Contributing
Quality control checks waiting to be implemented are listed in the Issues. If you would like to work on coding up a check, please assign yourself to the issue to avoid others duplicating the effort.
If you have an idea for a new QC check, please open an issue and let us know, so we can help get you started on the right track.
91 changes: 91 additions & 0 deletions build-db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# usage: python build-db.py <wod ascii file name> <table name to append to>

from wodpy import wod
import sys, psycopg2
import util.main as main

if len(sys.argv) == 3:

# connect to database and create a cursor by which to interact with it.
try:
conn = psycopg2.connect("dbname='root' user='root'")
except:
print "I am unable to connect to the database"

cur = conn.cursor()

# Identify tests
testNames = main.importQC('qctests')
testNames.sort()

# set up our table
query = "CREATE TABLE IF NOT EXISTS " + sys.argv[2] + """(
raw text,
truth boolean,
uid integer,
year integer,
month integer,
day integer,
time real,
lat real,
long real,
cruise integer,
probe integer,
"""
for i in range(len(testNames)):
query += testNames[i].lower() + ' boolean'
if i<len(testNames)-1:
query += ','
else:
query += ');'

cur.execute(query)

# populate table from wod-ascii data
fid = open(sys.argv[1])

while True:
# extract profile as wodpy object and raw text
start = fid.tell()
profile = wod.WodProfile(fid)
end = fid.tell()
fid.seek(start)
raw = fid.read(end-start)
fid.seek(end)

# set up dictionary for populating query string
wodDict = profile.npdict()
wodDict['raw'] = "'" + raw + "'"
# Below avoids failures if all profile data are missing.
# We have no use for this profile in that case so skip it.
try:
wodDict['truth'] = sum(profile.t_level_qc(originator=True) >= 3) >= 1
except:
if profile.is_last_profile_in_file(fid) == True:
break
continue

query = "INSERT INTO " + sys.argv[2] + " (raw, truth, uid, year, month, day, time, lat, long, cruise, probe) " + """ VALUES(
{p[raw]},
{p[truth]},
{p[uid]},
{p[year]},
{p[month]},
{p[day]},
{p[time]},
{p[latitude]},
{p[longitude]},
{p[cruise]},
{p[probe_type]}
)""".format(p=wodDict)
query = query.replace('--', 'NULL')
query = query.replace('None', 'NULL')
cur.execute(query)
if profile.is_last_profile_in_file(fid) == True:
break

conn.commit()

else:

print 'Usage: python build-db.py inputdatafile databasetable'
3 changes: 3 additions & 0 deletions cotede_qc/cotede_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def get_qc(p, config, test):
test not in cotede_results[2] or
p.uid() is None):
inputs = Wod4CoTeDe(p)
dt = inputs.attributes['datetime']
if dt.year < 1900:
inputs.attributes['datetime'] = dt.replace(year=1900)

# If config is a dictionary, use it.
if type(config) is not dict:
Expand Down
3 changes: 0 additions & 3 deletions datafiles.json

This file was deleted.

9 changes: 6 additions & 3 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ RUN conda config --set always_yes yes --set changeps1 no
RUN conda update -q conda
RUN apt-get update

# dependencies!
RUN apt-get -y install libhdf5-serial-dev libnetcdf-dev unzip
# dependencies
RUN apt-get -y install libhdf5-serial-dev libnetcdf-dev unzip postgresql libpq-dev python-dev nano
RUN conda install --yes python=2.7 pip nose Shapely netCDF4 matplotlib numpy scipy pyproj pandas

RUN pip install seabird>=0.6.3 gsw scikit-fuzzy
RUN pip install seabird>=0.6.3 gsw scikit-fuzzy psycopg2
# Pypi is not working well. Temporary solution
RUN pip install git+https://github.com/castelao/oceansdb.git@master#egg=oceansdb
RUN pip install git+https://github.com/castelao/CoTeDe.git@master#egg=CoTeDe
Expand All @@ -30,4 +30,7 @@ ADD woa13_decav_s16_5dv2.nc /AutoQC/data/.
ADD etopo5.nc /AutoQC/data/.
ADD climatological_t_median_and_amd_for_aqc.nc /AutoQC/data/.

# set up database; load quota_subset.dat into a table 'demo'.
RUN /etc/init.d/postgresql start && su postgres -c 'createuser -s root' && su postgres -c 'createdb root'

ADD bashrc /.bashrc
1 change: 1 addition & 0 deletions docker/bashrc
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export OCEANSDB_DIR=/AutoQC/data/
/etc/init.d/postgresql start
Loading

0 comments on commit b1711a9

Please sign in to comment.