Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Woude, Auke van der
CTDAS
Commits
e79032dc
Commit
e79032dc
authored
Sep 15, 2020
by
Woude, Auke van der
Browse files
write true, prior and optimised fluxes to file
parent
2216a271
Changes
10
Hide whitespace changes
Inline
Side-by-side
.project
deleted
100644 → 0
View file @
2216a271
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>
ctdas_light_refactor_new
</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>
org.python.pydev.PyDevBuilder
</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>
org.python.pydev.pythonNature
</nature>
</natures>
</projectDescription>
.pydevproject
deleted
100644 → 0
View file @
2216a271
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?>
<pydev_project>
<pydev_pathproperty
name=
"org.python.pydev.PROJECT_SOURCE_PATH"
>
<path>
/ctdas_light_refactor_new
</path>
</pydev_pathproperty>
<pydev_property
name=
"org.python.pydev.PYTHON_PROJECT_VERSION"
>
python 2.7
</pydev_property>
<pydev_property
name=
"org.python.pydev.PYTHON_PROJECT_INTERPRETER"
>
Default
</pydev_property>
</pydev_project>
da/ccffdas/category_info.py
View file @
e79032dc
...
...
@@ -59,7 +59,7 @@ categories = {
'CO2.uncertainty'
:
'n'
,
'CO'
:
-
4.32
,
'CO.uncertainty'
:
'l'
},
'heavy duty'
:
{
'name'
:
'heavy duty
highway
'
,
'heavy duty'
:
{
'name'
:
'heavy duty'
,
'model'
:
1
,
'spatial'
:
'Road transport'
,
'temporal'
:
't_road'
,
...
...
da/ccffdas/emissionmodel.py
View file @
e79032dc
...
...
@@ -92,7 +92,6 @@ class EmisModel(object):
key
=
station
+
'.'
+
cat
else
:
key
=
station
+
'.'
+
cat
+
'.'
+
name
#print('Looking for {}'.format(key))
if
key
in
self
.
paramdict
:
i_in_state
=
int
(
self
.
paramdict
[
key
])
...
...
@@ -103,7 +102,7 @@ class EmisModel(object):
elif
return_index
:
return
False
else
:
return
1
def
get_emis
(
self
,
dacycle
,
samples
,
indices
,
do_pseudo
):
def
get_emis
(
self
,
dacycle
,
indices
):
"""set up emission information for pseudo-obs (do_pseudo=1) and ensemble runs (do_pseudo=0)"""
self
.
timestartkey
=
self
.
dacycle
[
'time.sample.start'
]
...
...
@@ -149,10 +148,19 @@ class EmisModel(object):
def
get_emissions
(
self
,
dacycle
,
time_profiles
,
member
):
"""read in proxy data used for spatial distribution of the gridded emissions, disaggregate yearly totals for the area"""
prmfile
=
os
.
path
.
join
(
dacycle
[
'dir.input'
],
'parameters.%03d.nc'
%
member
)
f
=
io
.
ct_read
(
prmfile
,
'read'
)
params
=
f
.
get_variable
(
'parametervalues'
)
f
.
close
()
if
isinstance
(
member
,
int
)
or
member
==
'optimised'
:
if
isinstance
(
member
,
int
):
prmfile
=
os
.
path
.
join
(
dacycle
[
'dir.input'
],
'parameters.%03d.nc'
%
member
)
prmname
=
'parametervalues'
elif
member
==
'optimised'
:
prmfile
=
os
.
path
.
join
(
dacycle
[
'dir.output'
],
'optimizer.%s.nc'
%
dacycle
[
'time.start'
].
strftime
(
'%Y%m%d'
))
prmname
=
'statevectormean_optimized'
f
=
io
.
ct_read
(
prmfile
,
'read'
)
params
=
f
.
get_variable
(
prmname
)
f
.
close
()
elif
member
==
'true'
:
params
=
np
.
ones
(
100
)
yremis
=
self
.
get_yearly_emissions
(
params
)
# Create a recalculation factor from kg/km2/yr to umol/m2/sec
M_mass
=
np
.
array
([
44e-9
,
28e-9
][:
self
.
nrspc
])
...
...
@@ -188,32 +196,77 @@ class EmisModel(object):
emissions
.
append
(
spatial_emissions
[
i
,
:,
:,
:]
*
temporal_profile
[
None
,
:,
:,
:])
self
.
emissions
=
emissions
emissions
=
np
.
array
(
emissions
)
emissions
=
np
.
swapaxes
(
emissions
,
0
,
1
)
# Indices: species, category, time, lat, lon
emissions
=
np
.
array
(
emissions
)
# [cat, species, time, lat, lon]
emissions
=
np
.
swapaxes
(
emissions
,
0
,
2
)
# [time, cat, species, lat, lon]
emissions
=
np
.
swapaxes
(
emissions
,
1
,
2
)
# [time, species, cat, lat, lon]
# Recalculate spatial emissions to umol/sec/m2
emissions
=
emissions
/
kgperkmperyr2umolperm2pers
[
:,
None
,
None
,
:,
:]
emissions
=
emissions
/
kgperkmperyr2umolperm2pers
[
None
,
:,
None
,
:,
:]
## create output file
prior_file
=
os
.
path
.
join
(
self
.
inputdir
,
'prior_spatial_{0:03d}.nc'
.
format
(
member
))
f
=
io
.
CT_CDF
(
prior_file
,
method
=
'create'
)
dimid
=
f
.
add_dim
(
'ncat'
,
self
.
nrcat
)
dimid2
=
f
.
add_dim
(
'ops'
,
2
)
dimtime
=
f
.
add_dim
(
'time'
,
emissions
.
shape
[
2
])
dimlat
=
f
.
add_dim
(
'lat'
,
self
.
area
.
shape
[
0
])
dimlon
=
f
.
add_dim
(
'lon'
,
self
.
area
.
shape
[
1
])
#loop over all tracers
for
i
,
species
in
enumerate
(
self
.
species
):
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
species
savedict
[
'long_name'
]
=
"Spatially distributed emissions"
savedict
[
'units'
]
=
"micromole/m2/s"
savedict
[
'dims'
]
=
dimid
+
dimtime
+
dimlat
+
dimlon
savedict
[
'values'
]
=
emissions
[
i
,
:,
:,
:]
savedict
[
'dtype'
]
=
'float'
f
.
add_data
(
savedict
)
f
.
close
()
if
not
isinstance
(
member
,
str
):
prior_file
=
os
.
path
.
join
(
self
.
inputdir
,
'prior_spatial_{0:03d}.nc'
.
format
(
member
))
f
=
io
.
CT_CDF
(
prior_file
,
method
=
'create'
)
if
self
.
dacycle
.
dasystem
[
'cat.sum_emissions'
]:
emissions
=
emissions
.
sum
(
axis
=
2
)
# [time, species, lat, lon]
logging
.
debug
(
'Summed emissions'
)
else
:
dimid
=
f
.
add_dim
(
'ncat'
,
self
.
nrcat
)
dimid2
=
f
.
add_dim
(
'ops'
,
2
)
dimtime
=
f
.
add_dim
(
'time'
,
emissions
.
shape
[
0
])
dimlat
=
f
.
add_dim
(
'lat'
,
self
.
area
.
shape
[
0
])
dimlon
=
f
.
add_dim
(
'lon'
,
self
.
area
.
shape
[
1
])
#loop over all tracers
for
i
,
species
in
enumerate
(
self
.
species
):
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
species
savedict
[
'long_name'
]
=
"Spatially distributed emissions"
savedict
[
'units'
]
=
"micromole/m2/s"
if
self
.
dacycle
.
dasystem
[
'cat.sum_emissions'
]:
dims
=
dimtime
+
dimlat
+
dimlon
else
:
dims
=
dimtime
+
dimid
+
dimlat
+
dimlon
savedict
[
'dims'
]
=
dims
if
self
.
dacycle
.
dasystem
[
'cat.sum_emissions'
]:
savedict
[
'values'
]
=
emissions
[:,
i
]
else
:
savedict
[
'values'
]
=
emissions
[:,
:,
i
]
savedict
[
'dtype'
]
=
'float'
f
.
add_data
(
savedict
)
f
.
close
()
if
member
==
0
or
isinstance
(
member
,
str
):
if
member
==
0
:
qual
=
'prior'
elif
member
==
'optimised'
:
qual
=
'optimised'
elif
member
==
'true'
:
qual
=
'true'
name
=
'ff_emissions_{}_{}.nc'
.
format
(
qual
,
self
.
dacycle
[
'time.sample.start'
].
strftime
(
'%Y%m%d'
))
emisfile
=
os
.
path
.
join
(
dacycle
[
'dir.output'
],
name
)
f
=
io
.
CT_CDF
(
emisfile
,
method
=
'create'
)
if
self
.
dacycle
.
dasystem
[
'cat.sum_emissions'
]:
emissions
=
emissions
.
sum
(
axis
=
2
)
# [time, species, lat, lon]
else
:
dimid
=
f
.
add_dim
(
'ncat'
,
self
.
nrcat
)
dimid2
=
f
.
add_dim
(
'ops'
,
2
)
backtime
=
int
(
dacycle
.
dasystem
[
'run.backtime'
])
dimtime
=
f
.
add_dim
(
'time'
,
emissions
.
shape
[
0
]
-
backtime
)
dimlat
=
f
.
add_dim
(
'lat'
,
self
.
area
.
shape
[
0
])
dimlon
=
f
.
add_dim
(
'lon'
,
self
.
area
.
shape
[
1
])
#loop over all tracers
for
i
,
species
in
enumerate
(
self
.
species
):
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
species
savedict
[
'long_name'
]
=
"Spatially distributed emissions"
savedict
[
'units'
]
=
"micromole/m2/s"
if
self
.
dacycle
.
dasystem
[
'cat.sum_emissions'
]:
dims
=
dimtime
+
dimlat
+
dimlon
else
:
dims
=
dimtime
+
dimid
+
dimlat
+
dimlon
savedict
[
'dims'
]
=
dims
if
self
.
dacycle
.
dasystem
[
'cat.sum_emissions'
]:
savedict
[
'values'
]
=
emissions
[
backtime
:,
i
]
else
:
savedict
[
'values'
]
=
emissions
[
backtime
:,
:,
i
]
savedict
[
'dtype'
]
=
'float'
f
.
add_data
(
savedict
)
f
.
close
()
def
make_time_profiles
(
self
,
indices
):
"""Function that calculates the time profiles based on pre-specified
monthly, daily and hourly profiles. Temperature and radiation affect
...
...
@@ -232,6 +285,7 @@ class EmisModel(object):
times
=
times
[
indices
]
self
.
times
=
times
numdays
=
(
times
[
-
1
]
-
times
[
0
]).
days
+
1
day_start
=
times
[
0
].
timetuple
().
tm_yday
datapath
=
'/projects/0/ctdas/RINGO/EmissionInventories/DynEmMod_TimeProfiles'
infilename
=
'{}/RINGO_ECMWF_DailyMeteo{}.nc'
.
format
(
datapath
,
year
)
with
nc
.
Dataset
(
infilename
)
as
infile
:
...
...
@@ -274,10 +328,10 @@ class EmisModel(object):
HC_coal
=
HDC_coal
.
mean
(
axis
=
0
)
HC_gas
=
HDC_gas
.
mean
(
axis
=
0
)
t_consd
=
(
HDC_cons
+
fr_cons
*
HC_cons
)
/
((
1
+
fr_cons
)
*
HC_cons
)
# daily time profile for consumer/household heating
t_glsd
=
(
HDC_gls
+
fr_gls
*
HC_gls
)
/
((
1
+
fr_gls
)
*
HC_gls
)
# glasshouse
t_coald
=
(
HDC_coal
+
fr_coal
*
HC_coal
)
/
((
1
+
fr_coal
)
*
HC_coal
)
# coal-fired powerplant
t_gasd
=
(
HDC_gas
+
fr_gas
*
HC_gas
)
/
((
1
+
fr_gas
)
*
HC_gas
)
# gas-fired powerplant
t_consd
=
(
(
HDC_cons
+
fr_cons
*
HC_cons
)
/
((
1
+
fr_cons
)
*
HC_cons
)
)[
day_start
-
1
:]
# daily time profile for consumer/household heating
, starting from the day of the inversion
t_glsd
=
(
(
HDC_gls
+
fr_gls
*
HC_gls
)
/
((
1
+
fr_gls
)
*
HC_gls
)
)[
day_start
-
1
:]
# glasshouse
t_coald
=
(
(
HDC_coal
+
fr_coal
*
HC_coal
)
/
((
1
+
fr_coal
)
*
HC_coal
)
)[
day_start
-
1
:]
# coal-fired powerplant
t_gasd
=
(
(
HDC_gas
+
fr_gas
*
HC_gas
)
/
((
1
+
fr_gas
)
*
HC_gas
)
)[
day_start
-
1
:]
# gas-fired powerplant
#### Get the time profiles for all sectors:
with
nc
.
Dataset
(
self
.
time_prof_file
)
as
ds
:
...
...
@@ -310,14 +364,16 @@ class EmisModel(object):
t_road
=
np
.
zeros_like
(
t_public_power_coal
)
t_ship
=
np
.
zeros_like
(
t_public_power_coal
)
for
i
,
t
in
enumerate
(
times_add1
):
month
=
t
.
month
-
1
# Make index
day
=
(
t
.
day
-
1
)
%
7
# Make weekly index
day_ind
=
t
.
day
-
1
# Make index, starting at time 0
day_ind
=
t
.
timetuple
().
tm_yday
-
day_start
hour
=
t
.
hour
# Hours start at 0
weekday
=
t
.
weekday
()
<
6
saturday
=
t
.
weekday
()
==
6
sunday
=
t
.
weekday
()
==
7
self
.
t
=
t
# Monthly time profiles
public_power_month_mul
=
public_power_monthly
[
month
,
:,
:]
...
...
@@ -330,8 +386,8 @@ class EmisModel(object):
public_power_day_mul_coal
=
t_coald
[
day_ind
,
:,
:]
public_power_day_mul_gas
=
t_gasd
[
day_ind
,
:,
:]
industry_day_mul
=
industry_weekly
[
day
]
other_stat_day_mul_cons
=
t_consd
[
t
.
day
,
:,
:]
# Index should start at startday, check!
other_stat_day_mul_gls
=
t_glsd
[
t
.
day
,
:,
:]
other_stat_day_mul_cons
=
t_consd
[
day
_ind
,
:,
:]
# Index should start at startday, check!
other_stat_day_mul_gls
=
t_glsd
[
day
_ind
,
:,
:]
road_transport_day_mul
=
road_transport_weekly
[
day
,
:,
:]
shipping_day_mul
=
shipping_weekly
...
...
@@ -367,7 +423,8 @@ class EmisModel(object):
't_cons'
:
t_other_stat_cons
,
't_gls'
:
t_other_stat_gls
,
't_road'
:
t_road
,
't_ship'
:
t_ship
}
't_ship'
:
t_ship
}
# Roll the time profiles to be consisten with time zones
with
nc
.
Dataset
(
self
.
country_mask_file
)
as
ds
:
...
...
da/ccffdas/observationoperator.py
View file @
e79032dc
...
...
@@ -30,8 +30,6 @@ from numpy import array, logical_and
import
glob
import
subprocess
import
netCDF4
as
nc
import
rasterio
from
rasterio.enums
import
Resampling
from
functools
import
partial
from
multiprocessing
import
Process
,
Pool
...
...
@@ -43,6 +41,8 @@ from da.tools.general import create_dirs, to_datetime
from
da.ccffdas.emissionmodel
import
EmisModel
from
da.baseclasses.observationoperator
import
ObservationOperator
#import matplotlib.pyplot as plt
try
:
# Import memoization. This speeds up the functions a lot.
from
memoization
import
cached
except
:
# The memoization package is not always included. Import the memoization from #functools
...
...
@@ -101,7 +101,6 @@ def run_STILT(dacycle, footprint, datepoint, species, i_member):
footprint : np.array: the footprint of the station
datepoint : datetime: the datepoint for which the concentrations should be calculated
species : str: the species
path : path: path to the files containing the emissions
i_member : int: index of the ensemblemember
Returns:
float: the concentration increase at the respective location due to the emissions from the respective species"""
...
...
@@ -109,7 +108,10 @@ def run_STILT(dacycle, footprint, datepoint, species, i_member):
spatial_emissions
=
get_spatial_emissions
(
dacycle
,
i_member
,
species
,
datepoint
)
# multiply footprint with emissions for each hour in the back trajectory. Indices: Time, Category, lat, lon
foot_flux
=
(
footprint
[
None
,
:,
:,
:]
*
spatial_emissions
[:,
:,
:,
:]).
sum
()
if
dacycle
.
dasystem
[
'cat.sum_emissions'
]:
foot_flux
=
(
footprint
*
spatial_emissions
).
sum
()
else
:
foot_flux
=
(
footprint
[:,
None
,
:,
:]
*
spatial_emissions
[:,
:,
:,
:]).
sum
()
concentration_increase
=
float
(
recalc_factors
[
species
])
*
foot_flux
return
concentration_increase
...
...
@@ -133,15 +135,14 @@ def get_spatial_emissions(dacycle, i_member, species, datepoint):
emisfile
=
path
+
'prior_spatial_{0:03d}.nc'
.
format
(
i_member
)
#format: species[SNAP, time, lon, lat]
f
=
io
.
ct_read
(
emisfile
,
method
=
'read'
)
emis
=
f
.
get_variable
(
species
)
emis
=
emis
[
:,
indices
,
:,
:
].
astype
(
np
.
float32
)
# index the interesting times
emis
=
emis
[
indices
].
astype
(
np
.
float32
)
# index the interesting times
f
.
close
()
#plt.figure()
#plt.imshow(np.log(emis[0, 0, :, :]), cmap='binary')
#plt.savefig('ff.png')
return
emis
#def average2d(arr, new_shape):
# """ Function to average the paint-by-colour NEE to the domain size"""
# shape = (new_shape[0], arr.shape[0] // new_shape[0],
# new_shape[1], arr.shape[1] // new_shape[1])
# return arr.reshape(shape).mean(-1).mean(1)
def
get_time_index_nc
(
time
=
None
,
startdate
=
None
):
"""Function that gets the time index from the flux files
...
...
@@ -205,11 +206,7 @@ class STILTObservationOperator(ObservationOperator):
self
.
categories
=
categories
# Imported from file at top
self
.
inputdir
=
dacycle
.
dasystem
[
'inputdir'
]
self
.
paramdict
=
rc
.
read
(
dacycle
.
dasystem
[
'paramdict'
])
self
.
pftfile
=
dacycle
.
dasystem
[
'file.pft'
]
with
rasterio
.
Env
(
logging
.
getLogger
().
setLevel
(
logging
.
INFO
)):
with
rasterio
.
open
(
self
.
pftfile
)
as
dataset
:
self
.
pft_shape_orig
=
dataset
.
read
().
shape
[
-
2
:]
logging
.
getLogger
().
setLevel
(
logging
.
DEBUG
)
self
.
lon_start
=
float
(
dacycle
.
dasystem
[
'domain.lon.start'
])
...
...
@@ -293,35 +290,16 @@ class STILTObservationOperator(ObservationOperator):
emismodel
=
EmisModel
()
emismodel
.
setup
(
self
.
dacycle
)
self
.
emismodel
=
emismodel
emismodel
.
get_emis
(
self
.
dacycle
,
self
.
allsamples
,
indices
=
self
.
indices
,
do_pseudo
=
0
)
emismodel
.
get_emis
(
self
.
dacycle
,
indices
=
self
.
indices
)
logging
.
info
(
'Emissions calculated'
)
def
get_biosphere_emissions
(
self
):
with
nc
.
Dataset
(
self
.
dacycle
.
dasystem
[
'biosphere_fluxdir'
])
as
biosphere_fluxes
:
lonsib
=
biosphere_fluxes
[
'lonsib'
][:]
latsib
=
biosphere_fluxes
[
'latsib'
][:]
nee
=
biosphere_fluxes
[
'nee'
][
self
.
indices
,
:,
:]
nee
=
np
.
array
(
nee
,
dtype
=
np
.
float32
)
lu_pref
=
biosphere_fluxes
[
'lu_pref'
][:]
lon_ind_start
=
st
.
lonindex
(
self
.
lon_start
)
lon_ind_end
=
st
.
lonindex
(
self
.
lon_end
)
lat_ind_start
=
st
.
latindex
(
self
.
lat_start
)
lat_ind_end
=
st
.
latindex
(
self
.
lat_end
)
# The SiB data is read in correctly, whilst the remainder
# Of the data is read in upside down. THerefore, we flip
# The SiB data also upside down.
nee_2d
=
np
.
flip
(
st
.
convert_2d
(
nee
,
latsib
,
lonsib
),
axis
=
2
)
nee_2d
=
nee_2d
[:,
:,
lat_ind_end
:
lat_ind_start
,
lon_ind_start
:
lon_ind_end
].
astype
(
np
.
float32
)
nee_2d
=
np
.
flip
(
nee_2d
,
axis
=
2
)
self
.
nee
=
nee_2d
lu_pref
=
np
.
flip
(
st
.
convert_2d
(
lu_pref
,
latsib
,
lonsib
),
axis
=
1
)
self
.
lu_pref
=
np
.
flip
(
lu_pref
[:,
lat_ind_end
:
lat_ind_start
,
lon_ind_start
:
lon_ind_end
],
axis
=
1
)
nee
=
np
.
flip
(
biosphere_fluxes
[
'NEE'
][
self
.
indices
,
:,
:].
astype
(
np
.
float32
),
axis
=
1
)
self
.
nee
=
nee
#plt.figure()
#plt.imshow(nee[0, :, :], cmap='binary')
#plt.savefig('nee.png')
def
prepare_biosphere
(
self
):
"""Function that prepares the biosphere fluxes on a map
...
...
@@ -334,54 +312,51 @@ class STILTObservationOperator(ObservationOperator):
# First, get the time indices
indices
=
self
.
indices
# We cannot, because of memory, use all detail of CORINE.
# The grids have to be compatible with eachother
# Therefore, we will coarsen it, by the following factors:
# Coarsen in x and y:
sibshape
=
self
.
lu_pref
.
shape
[
1
:]
coarsen
=
[
None
,
None
]
for
i
in
range
(
15
,
100
):
COARSEN_x
=
i
COARSEN_y
=
i
new_shape
=
tuple
(
int
(
self
.
pft_shape_orig
[
i
]
/
COARSEN_x
-
(
self
.
pft_shape_orig
[
i
]
/
COARSEN_y
%
sibshape
[
i
]))
for
i
in
range
(
2
))
scaling_factors
=
np
.
array
(
np
.
array
(
new_shape
)
/
np
.
array
(
self
.
domain_shape
))
for
j
,
k
in
enumerate
(
scaling_factors
):
if
k
%
1
==
0
:
if
not
coarsen
[
j
]:
coarsen
[
j
]
=
i
if
not
None
in
coarsen
:
break
new_shape
=
tuple
([
int
(
self
.
pft_shape_orig
[
i
]
/
coarsen
[
i
]
-
(
self
.
pft_shape_orig
[
i
]
/
coarsen
[
i
]
%
sibshape
[
i
]))
for
i
in
range
(
2
)])
logging
.
debug
(
'Shape for paint-by-number: {}'
.
format
(
new_shape
))
with
rasterio
.
Env
(
logging
.
getLogger
().
setLevel
(
logging
.
INFO
)):
with
rasterio
.
open
(
self
.
pftfile
)
as
dataset
:
pftdata
=
dataset
.
read
(
out_shape
=
new_shape
,
resampling
=
Resampling
.
mode
)
pftdata
=
np
.
flipud
(
pftdata
.
reshape
(
pftdata
.
shape
[
1
:]))
logging
.
getLogger
().
setLevel
(
logging
.
DEBUG
)
scaling_factors
=
np
.
array
(
np
.
array
(
new_shape
)
/
np
.
array
(
self
.
lu_pref
[
0
].
shape
),
dtype
=
int
)
with
nc
.
Dataset
(
self
.
pftfile
)
as
ds
:
pftdata
=
np
.
flipud
(
ds
[
'pft'
][:]).
astype
(
int
)
self
.
pftdata
=
pftdata
#plt.figure()
#plt.imshow(pftdata[:, :], cmap='binary')
#plt.savefig('pft.png')
logging
.
debug
(
'Starting paint-by-number'
)
times
=
len
(
self
.
nee
)
timeslist
=
list
(
range
(
times
))
lus_prefs
=
np
.
kron
(
self
.
lu_pref
,
np
.
ones
((
scaling_factors
),
dtype
=
self
.
lu_pref
.
dtype
))
pool
=
Pool
(
times
)
pool
=
Pool
(
self
.
forecast_nmembers
)
# Create the function that calculates the conentration
func
=
partial
(
self
.
paint_by_number
,
self
.
nee
,
self
.
param_values
,
scaling_factors
,
lus_prefs
,
func
=
partial
(
self
.
paint_by_number
,
self
.
nee
,
self
.
param_values
,
pftdata
,
self
.
emismodel
.
find_in_state
,
self
.
average2d
,
self
.
domain_shape
)
scaled_nees
=
pool
.
map
(
func
,
timeslist
)
scaled_nees
=
pool
.
map
(
func
,
list
(
range
(
self
.
forecast_nmembers
))
)
pool
.
close
()
pool
.
join
()
self
.
nee_mem
=
np
.
asarray
(
scaled_nees
).
swapaxes
(
0
,
1
)
# Indices: mem, time, *domain_shape
self
.
nee_mem
=
np
.
asarray
(
scaled_nees
)
# Indices: mem, time, *domain_shape
logging
.
debug
(
'Paint-by-number done, biosphere prepared'
)
self
.
write_biosphere_fluxes
(
self
.
nee_mem
[
0
,
self
.
btime
:,
:,
:])
def
write_biosphere_fluxes
(
self
,
values
,
qual
=
'prior'
):
# Write the biosphere fluxes to a file
bio_emis_file
=
os
.
path
.
join
(
self
.
outputdir
,
'bio_emissions_{}_{}.nc'
.
format
(
qual
,
self
.
dacycle
[
'time.sample.start'
].
strftime
(
'%Y%m%d'
)))
f
=
io
.
CT_CDF
(
bio_emis_file
,
method
=
'create'
)
dimtime
=
f
.
add_dim
(
'time'
,
values
.
shape
[
0
])
dimlat
=
f
.
add_dim
(
'lat'
,
values
.
shape
[
1
])
dimlon
=
f
.
add_dim
(
'lon'
,
values
.
shape
[
2
])
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
'CO2'
savedict
[
'long_name'
]
=
'Biosphere CO2 emissions'
savedict
[
'units'
]
=
"micromole/m2/s"
dims
=
dimtime
+
dimlat
+
dimlon
savedict
[
'dims'
]
=
dims
savedict
[
'values'
]
=
values
savedict
[
'dtype'
]
=
'float'
f
.
add_data
(
savedict
)
f
.
close
()
#plt.figure()
#plt.imshow(np.log(self.nee_mem[0, 0, :, :]), cmap='binary')
#plt.savefig('bio.png')
@
staticmethod
def
average2d
(
arr
,
new_shape
):
...
...
@@ -391,26 +366,23 @@ class STILTObservationOperator(ObservationOperator):
return
arr
.
reshape
(
shape
).
mean
(
-
1
).
mean
(
1
)
@
staticmethod
def
paint_by_number
(
nee
,
all_param_values
,
scaling_factors
,
lus_prefs
,
pftdata
,
find_in_state
,
average2d
,
domain_shape
,
time_ind
):
nmembers
=
len
(
all_param_values
)
scaled_nees
_time
=
np
.
zeros
((
nmembers
,
*
domain_shape
),
dtype
=
np
.
float32
)
def
paint_by_number
(
nee
,
all_param_values
,
pftdata
,
find_in_state
,
average2d
,
domain_shape
,
member
):
scaled_nees
=
np
.
zeros
((
len
(
nee
)
,
*
domain_shape
),
dtype
=
np
.
float32
)
all_lutypes
=
np
.
unique
(
pftdata
.
reshape
(
-
1
))
nee_time
=
nee
[
time_ind
]
nee
=
np
.
kron
(
nee_time
,
np
.
ones
((
scaling_factors
),
dtype
=
np
.
float32
)
)
param_values
=
all_param_values
[
member
]
new
nee
=
np
.
zeros_like
(
nee
)
for
lutype_ind
,
lutype
in
enumerate
(
all_lutypes
):
mask
=
np
.
where
(
pftdata
==
lutype
,
1
,
0
)
nee_lut
=
np
.
where
(
lus_prefs
==
lutype
,
nee
,
0
)
for
mem
,
values
in
enumerate
(
all_param_values
):
param_values
=
all_param_values
[
mem
]
index_in_state
=
find_in_state
(
param_values
,
'BIO'
,
str
(
lutype
),
return_index
=
True
)
if
index_in_state
:
param_value
=
param_values
[
index_in_state
]
else
:
param_value
=
1
nee_lut_mem
=
(
nee_lut
.
sum
(
axis
=
0
)
*
mask
)
*
param_value
nee_lut_mem_scaled
=
average2d
(
nee_lut_mem
,
domain_shape
)
scaled_nees_time
[
mem
]
+=
nee_lut_mem_scaled
return
(
scaled_nees_time
)
index_in_state
=
find_in_state
(
param_values
,
'BIO'
,
str
(
lutype
),
return_index
=
True
)
if
index_in_state
:
param_value
=
param_values
[
index_in_state
]
else
:
param_value
=
1
mask
=
np
.
where
(
pftdata
==
lutype
,
1
,
0
)
newnee
+=
(
nee
*
mask
*
param_value
)
for
i
,
n
in
enumerate
(
newnee
):
scaled_nees
[
i
]
=
average2d
(
n
,
domain_shape
)
return
scaled_nees
def
prepare_c14
(
self
):
"""Function that loads the nuclear power temporal variation"""
...
...
@@ -444,6 +416,12 @@ class STILTObservationOperator(ObservationOperator):
f
=
glob
.
glob
(
fname
)[
0
]
footprint
=
nc
.
Dataset
(
f
)[
'foot'
]
#plt.figure()
#plt.imshow(np.log(footprint[0, :, :]), cmap='binary')
#plt.title(site)
#plt.savefig('foot.png')
# Flip, because the times are negative
return
np
.
flipud
(
footprint
).
astype
(
np
.
float32
)
...
...
@@ -553,11 +531,31 @@ class STILTObservationOperator(ObservationOperator):
def
run_forecast_model
(
self
,
dacycle
,
do_pseudo
,
adv
):
logging
.
info
(
'Preparing run...'
)
#self.make_observations(dacycle)
self
.
prepare_run
(
do_pseudo
,
adv
)
logging
.
info
(
'Starting to simulate concentrations...'
)
self
.
run
(
dacycle
,
do_pseudo
)
self
.
save_data
()
def
make_observations
(
self
,
dacycle
,
do_pseudo
=
0
,
adv
=
0
):
logging
.
info
(
'Prepare run...'
)
self
.
prepare_run
(
do_pseudo
,
adv
)
logging
.
info
(
'Starting to simulate concentrations...'
)
import
pandas
as
pd
import
copy
all_samples
=
copy
.
deepcopy
(
self
.
samples
)
sites
=
set
([
sample
.
code
.
split
(
'/'
)[
1
].
split
(
'_'
)[
1
]
for
sample
in
all_samples
])
print
(
sites
)
for
site
in
sites
:
current_samples
=
[
sample
for
sample
in
all_samples
if
site
in
sample
.
code
]
self
.
samples
=
current_samples
self
.
run
(
dacycle
,
do_pseudo
)
times
=
[
sample
.
xdate
for
sample
in
current_samples
]
df
=
pd
.
DataFrame
(
self
.
calculated_concentrations
,
times
,
columns
=
[
'concentration'
])
df
.
to_csv
(
site
+
dtm
.
datetime
.
strftime
(
dacycle
[
'time.sample.start'
],
"%Y%m%d"
)
+
'.csv'
)
save_and_submit
(
dacycle
,
statevector
)
def
run
(
self
,
dacycle
,
do_pseudo
):
"""Function that calculates the simulated concentrations for all samples
Loops over all samples, looks for relevant information (e.g. footprint)
...
...
@@ -604,7 +602,6 @@ class STILTObservationOperator(ObservationOperator):
previous_day
=
datepoint
.
day
previously_done_site
=
site
logging
.
debug
(
'Cache info: {}'
.
format
(
get_spatial_emissions
.
cache_info
()))
# This is how ingrid did it, so I will too...
self
.
mod
=
np
.
array
(
calculated_concentrations
)
...
...
@@ -635,6 +632,7 @@ class STILTObservationOperator(ObservationOperator):
# First, add noise (this represents errors in the transport model
noise
=
np
.
random
.
normal
(
0
,
sample
.
mdm
)
#noise = 0
# Some different cases for different species.
# First case: Species is not c14:
...
...
da/ccffdas/pipeline.py
View file @
e79032dc
...
...
@@ -16,6 +16,7 @@ import os
import
sys
import
datetime
import
copy
import
numpy
as
np
header
=
'
\n\n
*************************************** '
footer
=
' ***************************************
\n
'
...
...
@@ -32,7 +33,7 @@ def ensemble_smoother_pipeline(dacycle, platform, dasystem, samples, statevector
invert
(
dacycle
,
statevector
,
optimizer
,
obsoperator
)
#
advance(dacycle, samples, statevector, obsoperator)
advance
(
dacycle
,
samples
,
statevector
,
obsoperator
,
optimizer
)
save_and_submit
(
dacycle
,
statevector
)
logging
.
info
(
"Cycle finished...exiting pipeline"
)
...
...
@@ -115,7 +116,7 @@ def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsopera
# Finally, we run forward with these parameters
advance
(
dacycle
,
samples
,
statevector
,
obsoperator
)
advance
(
dacycle
,
samples
,
statevector
,
obsoperator
,
optimizer
)
# In save_and_submit, the posterior statevector will be added to the savestate.nc file, and it is added to the copy list.
# This way, we have both the prior and posterior data from another run copied into this assimilation, for later analysis.
...
...
@@ -400,31 +401,41 @@ def invert(dacycle, statevector, optimizer, obsoperator):
def
advance
(
dacycle
,
samples
,
statevector
,
obsoperator
):
def
advance
(
dacycle
,
samples
,
statevector
,
obsoperator
,
optimizer
):
""" Advance the filter state to the next step """
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
# Then, restore model state from the start of the filter
logging
.
info
(
header
+
"starting advance"
+
footer
)
logging
.
info
(
"Sampling model will be run over 1 cycle"
)
obsoperator
.
get_initial_data
(
samples
)
sample_step
(
dacycle
,
samples
,
statevector
,
obsoperator
,
0
,
True
)
dacycle
.
restart_filelist
.
extend
(
obsoperator
.
restart_filelist
)
dacycle
.
output_filelist
.
extend
(
obsoperator
.
output_filelist
)
logging
.
debug
(
"Appended ObsOperator restart and output file lists to dacycle for collection "
)