Skip to content

Commit

Permalink
Add synthetic write and update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Dobson committed Jan 23, 2024
1 parent 6bf2638 commit e285d40
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 33 deletions.
118 changes: 118 additions & 0 deletions swmmanywhere/swmm_text_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,128 @@
import os
import re

import geopandas as gpd
import numpy as np
import pandas as pd
import yaml


def synthetic_write(model_dir: str):
"""Load synthetic data and write to SWMM input file.
Loads nodes, edges and subcatchments from synthetic data, assumes that
these are all located in `model_dir`. Fills in appropriate default values
for many SWMM parameters. More parameters are available to edit (see
defs/swmm_conversion.yml). Identifies outfalls and automatically ensures
that they have only one link to them (as is required by SWMM). Formats
(with format_to_swmm_dict) and writes (with data_dict_to_inp) the data to
a SWMM input (.inp) file.
Args:
model_dir (str): model directory address. Assumes a format along the
lines of 'model_2', where the number is the model number.
"""
# TODO these node/edge names are probably not good or extendible defulats
# revisit once overall software architecture is more clear.
nodes = gpd.read_file(os.path.join(model_dir,
'pipe_by_pipe_nodes.geojson'))
edges = gpd.read_file(os.path.join(model_dir,
'pipe_by_pipe_edges.geojson'))
subs = gpd.read_file(os.path.join(model_dir,
'subcatchments.geojson'))

# Extract SWMM relevant data
edges = edges[['u','v','diameter','length']]
nodes = nodes[['id',
'x',
'y',
'chamber_floor_elevation',
'surface_elevation']]
subs = subs[['id',
'geometry',
'area',
'slope',
'width',
'rc']]

# Nodes
nodes['id'] = nodes['id'].astype(str)
nodes['max_depth'] = nodes.surface_elevation - nodes.chamber_floor_elevation
nodes['surcharge_depth'] = 0
nodes['flooded_area'] = 100 # TODO arbitrary... not sure how to calc this
nodes['manhole_area'] = 0.5

# Subs
subs['id'] = subs['id'].astype(str)
subs['subcatchment'] = subs['id'] + '-sub'
subs['rain_gage'] = 1 # TODO revise when revising storms

# Edges
UNBOUNDED_CAPACITY = 1E10 # capacity in swmm is a hard limit
edges['u'] = edges['u'].astype(str)
edges['v'] = edges['v'].astype(str)
edges['roughness'] = 0.01
edges['capacity'] = UNBOUNDED_CAPACITY

# Outfalls (create new nodes that link to the stores connected tothem
outfalls = nodes.loc[~nodes.id.isin(edges.u)].copy()
outfalls['id'] = outfalls['id'] + '_outfall'

# Reduce elevation to ensure flow
outfalls['chamber_floor_elevation'] -= 1
outfalls['x'] -= 1
outfalls['y'] -= 1

# Link stores to outfalls
new_edges = edges.iloc[0:outfalls.shape[0]].copy()
new_edges['u'] = outfalls['id'].str.replace('_outfall','').values
new_edges['v'] = outfalls['id'].values
new_edges['diameter'] = 15 # TODO .. big pipe to enable all outfall...
new_edges['length'] = 1

# Append new edges
edges = pd.concat([edges, new_edges], ignore_index = True)

# Name all edges
edges['id'] = edges.u.astype(str) + '-' + edges.v.astype(str)

# Create event
# TODO will need some updating if multiple rain gages
# TODO automatically match units to storm.csv?
event = {'name' : '1',
'unit' : 'mm',
'interval' : '01:00',
'fid' : 'storm.dat' # overwritten at runtime
}

# Locate raingage(s) on the map
symbol = {'x' : nodes.x.min(),
'y' : nodes.y.min(),
'name' : '1' # matches event name(s)
}

# Template SWMM input file
existing_input_file = os.path.join(os.path.dirname(__file__),
'defs',
'basic_drainage_all_bits.inp')

# New input file
model_number = model_dir.split('_')[-1]
new_input_file = os.path.join(model_dir,
'model_{0}.inp'.format(model_number))

# Format to dict
data_dict = format_to_swmm_dict(nodes,
outfalls,
edges,
subs,
event,
symbol)

# Write new input file
data_dict_to_inp(data_dict, existing_input_file, new_input_file)


def overwrite_section(data: np.ndarray,
section: str,
fid: str):
Expand Down Expand Up @@ -306,6 +423,7 @@ def format_to_swmm_dict(nodes,
'ORIFICES' : None,
'WEIRS' : None,
'OUTLETS' : None,
'JUNCTIONS' : None,
'RAINGAGES' : event,
'SYMBOLS' : symbol
}
Expand Down
138 changes: 105 additions & 33 deletions tests/test_swmm_text_tools.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import difflib
import filecmp
import os
import shutil
import tempfile

import geopandas as gpd
import pandas as pd
Expand Down Expand Up @@ -91,33 +94,31 @@ def test_explode_polygon():
assert result.shape[0] == 5
assert result.loc[3,'y'] == 1

def test_format_format_to_swmm_dict():
"""Test the format_format_to_swmm_dict function.
Writes a formatted dict to a model and checks that it runs without
error.
"""
def generate_data_dict():
"""Generate a data dict for testing."""
nodes = gpd.GeoDataFrame({'id' : ['node1', 'node2'],
'x' : [0, 1],
'y' : [0, 1],
'max_depth' : [1, 1],
'chamber_floor_elevation' : [1, 1],
'manhole_area' : [1,1]
'surface_elevation' : [2, 2],
'manhole_area' : [0.5,0.5]
})
outfalls = gpd.GeoDataFrame({'id' : ['outfall3'],
'chamber_floor_elevation' : [1],
outfalls = gpd.GeoDataFrame({'id' : ['node2_outfall'],
'chamber_floor_elevation' : [0],
'surface_elevation' : [0],
'x' : [0],
'y' : [0]})
conduits = gpd.GeoDataFrame({'id' : ['link1','link2'],
conduits = gpd.GeoDataFrame({'id' : ['node1-node2','node2-node2_outfall'],
'u' : ['node1','node2'],
'v' : ['node2','outfall3'],
'v' : ['node2','node2_outfall'],
'length' : [1,1],
'roughness' : [1,1],
'roughness' : [0.01,0.01],
'shape_swmm' : ['CIRCULAR','CIRCULAR'],
'diameter' : [1,1],
'capacity' : [0.1,0.1]
'diameter' : [1,15],
'capacity' : [1E10,1E10]
})
subs = gpd.GeoDataFrame({'subcatchment' : ['sub1'],
subs = gpd.GeoDataFrame({'subcatchment' : ['node1-sub'],
'rain_gage' : ['1'],
'id' : ['node1'],
'area' : [1],
Expand All @@ -126,32 +127,104 @@ def test_format_format_to_swmm_dict():
'slope' : [0.001],
'geometry' : [sgeom.Polygon([(0,0), (1,0),
(1,1), (0,1)])]})

rain_fid = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'..',
'swmmanywhere',
'defs',
'storm.dat')
rain_fid = 'storm.dat'
event = {'name' : '1',
'unit' : 'mm',
'interval' : 1,
'interval' : '01:00',
'fid' : rain_fid}
symbol = {'x' : 0,
'y' : 0,
'name' : 'name'}
data_dict = stt.format_to_swmm_dict(nodes,
outfalls,
conduits,
subs,
event,
symbol)
'name' : '1'}

return {'nodes' : nodes,
'outfalls' : outfalls,
'conduits' : conduits,
'subs' : subs,
'event' : event,
'symbol' : symbol}

def test_synthetic_write():
"""Test the synthetic_write function."""
data_dict = generate_data_dict()
model_dir = tempfile.mkdtemp('_1')
try:
# Write the model with synthetic_write
nodes = gpd.GeoDataFrame(data_dict['nodes'])
nodes.geometry = gpd.points_from_xy(nodes.x, nodes.y)
nodes.to_file(
os.path.join(
model_dir,
'pipe_by_pipe_nodes.geojson'))
nodes = nodes.set_index('id')
edges = gpd.GeoDataFrame(pd.DataFrame(data_dict['conduits']).iloc[[0]])
edges.geometry = [sgeom.LineString([nodes.loc[u,'geometry'],
nodes.loc[v,'geometry']])
for u,v in zip(edges.u, edges.v)]
edges.to_file(
os.path.join(
model_dir,
'pipe_by_pipe_edges.geojson'))
subs = data_dict['subs'].copy()
subs['subcatchment'] = ['node1']
subs.to_file(os.path.join(model_dir,
'subcatchments.geojson'))
stt.synthetic_write(model_dir)

# Write the model with data_dict_to_inp
comparison_file = os.path.join(model_dir, "model_base.inp")
template_fid = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'..',
'swmmanywhere',
'defs',
'basic_drainage_all_bits.inp')
stt.data_dict_to_inp(stt.format_to_swmm_dict(**data_dict),
template_fid,
comparison_file)

# Compare
new_input_file = os.path.join(model_dir, "model_1.inp")
are_files_identical = filecmp.cmp(new_input_file,
comparison_file,
shallow=False)
if not are_files_identical:
with open(new_input_file,
'r') as file1, open(comparison_file,
'r') as file2:
diff = difflib.unified_diff(
file1.readlines(),
file2.readlines(),
fromfile=new_input_file,
tofile=comparison_file,
)
print(''.join(diff))
assert are_files_identical, "The files are not identical"
finally:
pass
# shutil.rmtree(model_dir)

def test_format_to_swmm_dict():
"""Test the format_format_to_swmm_dict function.
Writes a formatted dict to a model and checks that it runs without
error.
"""
data_dict = generate_data_dict()
data_dict = stt.format_to_swmm_dict(**data_dict)


fid = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'..',
'swmmanywhere',
'defs',
'basic_drainage_all_bits.inp')
temp_fid = 'temp.inp'
shutil.copy(fid, temp_fid)
rain_fid = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'..',
'swmmanywhere',
'defs',
'storm.dat')
temp_dir = tempfile.mkdtemp()
shutil.copy(rain_fid, os.path.join(temp_dir,'storm.dat'))
temp_fid = os.path.join(temp_dir,'temp.inp')
try:
stt.data_dict_to_inp(data_dict,
fid,
Expand All @@ -160,6 +233,5 @@ def test_format_format_to_swmm_dict():
for ind, step in enumerate(sim):
pass
finally:
os.remove(temp_fid.replace('.inp','.rpt'))
os.remove(temp_fid)
shutil.rmtree(temp_dir)

0 comments on commit e285d40

Please sign in to comment.