-
Notifications
You must be signed in to change notification settings - Fork 7
/
filter_map
executable file
·90 lines (81 loc) · 2.87 KB
/
filter_map
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python
from __future__ import division, print_function
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("imap")
parser.add_argument("hitmap", nargs="?", default=None)
parser.add_argument("omap")
parser.add_argument("-F", "--filter", action="append", help="Filter specification. Can be specified multiple times. Formats: g:lcut = gaussian filter, lowpass for positive lcut, highpass for negative. b:lcut:alpha = butterworth filter. Lowpass for positive alpha, highpass for negative.")
parser.add_argument("-s", "--slice", type=str, default=None)
parser.add_argument("-L", "--limit", type=float, default=0.10)
parser.add_argument( "--lmax", type=float, default=None, help="Don't filter anything above this l")
parser.add_argument("-e", "--extent-model", type=str, default=None)
parser.add_argument("-v", action="count")
args = parser.parse_args()
import numpy as np, sys
from scipy import ndimage
from enlib import enmap
np.seterr(all="ignore")
filter_specs = args.filter or ["b:200:-5"]
if args.extent_model is not None:
enmap.extent_model.append(args.extent_model)
m = enmap.read_map(args.imap)
if args.slice:
m = eval("m"+args.slice)
ishape = m.shape
m = m.preflat
if args.hitmap != None:
w = enmap.read_map(args.hitmap).preflat[0]
else:
w = enmap.zeros(m.shape[-2:], m.wcs, dtype=m.dtype)+1
samps = w.reshape(-1)[::100]
samps = samps[samps!=0]
typ = np.median(samps)
mask = np.any(np.isnan(m),0)|np.isnan(w)|(w<typ*1e-3)
m[:,mask] = 0
w[mask] = 0
# The function of the mask is to apodize edges, so smooth out the middle
w0 = typ*0.10
w = (1/(1+w0/w)).astype(m.dtype)
wm = m*w
del samps
print(np.std(m), np.std(w), np.std(wm))
# Pad to fft-friendly size
def filter_gauss(l, lsigma):
f = np.exp(-0.5*(l/lsigma)**2)
return f if lsigma > 0 else 1-f
def filter_butter(l, lknee, alpha):
return 1.0/(1+(l/lknee)**alpha)
# Perform the filtering
fmap = enmap.map2harm(wm)
lmap = fmap.lmap()
for i, fspec in enumerate(filter_specs):
toks = fspec.split(":")
desc, fargs = toks[0], toks[1:]
dirs = [c for c in "vha" if c in desc] or ["a"]
comps = [c for c in range(10) if str(c) in desc] or list(range(fmap.shape[-3]))
for dir in dirs:
if dir == "v": l = np.abs(lmap[0])
elif dir == "h": l = np.abs(lmap[1])
elif dir == "a": l = np.sum(lmap**2,0)**0.5
else: raise ValueError("Invalid direction '%s'" % dir)
if "g" in desc:
lsigma = float(fargs[0])
filter = filter_gauss(l, lsigma)
elif "b" in desc:
lknee = float(fargs[0])
alpha = float(fargs[1])
filter = filter_butter(l, lknee, alpha)
else: raise ValueError("No known filter specified in " + desc)
if args.lmax is not None:
lmask = np.sum(lmap**2,0)**0.5 > args.lmax
filter[lmask] = 1
for comp in comps:
fmap[comp] *= filter
wm = enmap.harm2map(fmap)
# And recover filtered map
m = wm/w
del wm, w
m[:,mask] = np.nan
m = np.reshape(m, ishape)
enmap.write_map(args.omap, m)