Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Woude, Auke van der
CTDAS
Commits
dbb92848
Commit
dbb92848
authored
Jul 17, 2020
by
Woude, Auke van der
Browse files
Improve simple arithmatics in .rc files
parent
faa0f782
Changes
7
Expand all
Hide whitespace changes
Inline
Side-by-side
da/ccffdas/emissionmodel.py
View file @
dbb92848
...
...
@@ -24,6 +24,10 @@ import math
import
da.tools.rc
as
rc
from
da.tools.general
import
create_dirs
,
to_datetime
import
netCDF4
as
nc
from
multiprocessing
import
Pool
from
functools
import
partial
identifier
=
'EmissionModel ensemble '
version
=
'1.0'
...
...
@@ -58,7 +62,6 @@ class EmisModel(object):
self
.
nrspc
=
int
(
dacycle
.
dasystem
[
'obs.spec.nr'
])
self
.
species
=
dacycle
.
dasystem
[
'obs.spec.name'
].
split
(
','
)
self
.
nrcat
=
int
(
dacycle
.
dasystem
[
'obs.cat.nr'
])
self
.
nparams
=
int
(
dacycle
.
dasystem
[
'nparameters'
])
self
.
nmembers
=
int
(
dacycle
[
'da.optimizer.nmembers'
])
self
.
pparam
=
dacycle
.
dasystem
[
'emis.pparam'
]
self
.
paramfile
=
dacycle
.
dasystem
[
'emis.paramfiles'
]
...
...
@@ -88,7 +91,7 @@ class EmisModel(object):
self
.
Rinyr_av
=
infile
.
variables
[
'Rsin_avg'
][:
ndays
]
def
find_in_state
(
self
,
station
,
cat
,
name
=
None
,
return_index
=
False
):
def
find_in_state
(
self
,
params
,
station
,
cat
,
name
=
None
,
return_index
=
False
):
"""Function that finds the index in the state vector"""
if
not
name
:
key
=
station
+
'.'
+
cat
...
...
@@ -99,7 +102,7 @@ class EmisModel(object):
i_in_state
=
int
(
self
.
paramdict
[
key
])
if
return_index
:
return
i_in_state
else
:
value
=
self
.
prm
[
i_in_state
]
value
=
params
[
i_in_state
]
return
value
elif
return_index
:
return
False
return
1
...
...
@@ -107,63 +110,52 @@ class EmisModel(object):
def
get_emis
(
self
,
dacycle
,
samples
,
indices
,
do_pseudo
):
"""set up emission information for pseudo-obs (do_pseudo=1) and ensemble runs (do_pseudo=0)"""
if
do_pseudo
==
1
:
priorparam
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
pparam
)
f
=
io
.
ct_read
(
priorparam
,
'read'
)
self
.
prm
=
f
.
get_variable
(
'prior_values'
)[:
self
.
nparams
]
f
.
close
()
self
.
get_spatial
(
dacycle
,
999
,
samples
,
infile
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
paramfile
))
self
.
get_temporal
(
dacycle
,
999
,
samples
,
do_pseudo
=
1
)
elif
do_pseudo
==
0
:
self
.
timestartkey
=
self
.
dacycle
[
'time.sample.start'
]
self
.
timefinishkey
=
self
.
dacycle
[
'time.sample.end'
]
time_profiles
=
self
.
make_time_profiles
(
indices
=
indices
)
for
member
in
range
(
self
.
nmembers
):
#first remove old files, then create new emission files per ensemble member
if
self
.
startdate
==
self
.
timestartkey
:
file
=
os
.
path
.
join
(
dacycle
.
dasystem
[
'datadir'
],
'temporal_data_%03d.nc'
%
member
)
try
:
os
.
remove
(
file
)
except
OSError
:
pass
prmfile
=
os
.
path
.
join
(
dacycle
[
'dir.input'
],
'parameters.%03d.nc'
%
member
)
f
=
io
.
ct_read
(
prmfile
,
'read'
)
self
.
prm
=
f
.
get_variable
(
'parametervalues'
)
f
.
close
()
self
.
get_yearly_emissions
()
self
.
get_emissions
(
dacycle
,
member
,
time_profiles
,
infile
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
paramfile
))
logging
.
debug
(
'Succesfully wrote prior spatial and temporal emission files'
)
self
.
timestartkey
=
self
.
dacycle
[
'time.sample.start'
]
self
.
timefinishkey
=
self
.
dacycle
[
'time.sample.end'
]
time_profiles
=
self
.
make_time_profiles
(
indices
=
indices
)
pool
=
Pool
(
self
.
nmembers
)
# Create the function that calculates the conentration
func
=
partial
(
self
.
get_emissions
,
dacycle
,
time_profiles
)
# We need to run over all members
memberlist
=
list
(
range
(
0
,
self
.
nmembers
))
_
=
pool
.
map
(
func
,
memberlist
)
pool
.
close
()
pool
.
join
()
logging
.
debug
(
'Succesfully wrote prior emission files'
)
def
get_yearly_emissions
(
self
):
def
get_yearly_emissions
(
self
,
params
):
yremis
=
np
.
zeros
((
len
(
self
.
categories
),
len
(
self
.
countries
),
len
(
self
.
species
)))
for
i_country
,
country
in
enumerate
(
self
.
countries
):
for
i_cat
,
(
cat
,
values
)
in
enumerate
(
self
.
categories
.
items
()):
emission_factor
=
values
[
'emission_factors'
]
emission_factor
*=
self
.
find_in_state
(
country
,
cat
,
'emission_factors'
)
emission_factor
*=
self
.
find_in_state
(
params
,
country
,
cat
,
'emission_factors'
)
fraction_of_total
=
values
[
'fraction_of_total'
]
fraction_of_total
*=
self
.
find_in_state
(
country
,
cat
,
'fraction_of_total'
)
fraction_of_total
*=
self
.
find_in_state
(
params
,
country
,
cat
,
'fraction_of_total'
)
e_use
=
self
.
energy_use_per_country
[
country
][
values
[
'spatial'
]]
for
i_species
,
specie
in
enumerate
(
self
.
species
):
emission_ratio
=
values
[
specie
]
emission_ratio
*=
self
.
find_in_state
(
params
,
country
,
cat
,
specie
)
uncertainty
=
values
[
specie
+
'.uncertainty'
]
if
uncertainty
==
'l'
:
emission_ratio
=
np
.
exp
(
emission_ratio
)
if
emission_ratio
>
1
:
logging
.
debug
(
'{} {} {} {}'
.
format
(
country
,
cat
,
specie
,
emission_ratio
))
emission_ratio
*=
self
.
find_in_state
(
country
,
cat
,
specie
)
emission
=
e_use
*
fraction_of_total
*
emission_factor
*
emission_ratio
yremis
[
i_cat
,
i_country
,
i_species
]
=
emission
self
.
yremis
=
yremis
return
yremis
def
get_emissions
(
self
,
dacycle
,
member
,
time_profiles
,
infile
=
None
):
def
get_emissions
(
self
,
dacycle
,
time_profiles
,
member
):
"""read in proxy data used for spatial distribution of the gridded emissions, disaggregate yearly totals for the area"""
prmfile
=
os
.
path
.
join
(
dacycle
[
'dir.input'
],
'parameters.%03d.nc'
%
member
)
f
=
io
.
ct_read
(
prmfile
,
'read'
)
params
=
f
.
get_variable
(
'parametervalues'
)
f
.
close
()
yremis
=
self
.
get_yearly_emissions
(
params
)
# Create a recalculation factor from kg/km2/yr to umol/m2/sec
M_mass
=
[
44e-9
,
28e-9
]
sec_year
=
24
*
36
6
*
3600.
#seconds in a year (leapyear)
M_mass
=
np
.
array
(
[
44e-9
,
28e-9
]
[:
self
.
nrspc
])
sec_year
=
24
*
36
00.
*
self
.
ndays
#seconds in a year (leapyear)
kgperkmperyr2umolperm2pers
=
np
.
array
(
M_mass
)[:,
None
,
None
]
*
sec_year
*
self
.
area
[
None
,
:,
:]
self
.
kgperkmperyr2umolperm2pers
=
kgperkmperyr2umolperm2pers
#read in proxy data for spatial disaggregation
infile
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
proxyfile
)
...
...
@@ -185,7 +177,7 @@ class EmisModel(object):
spatial_distributions
[
i
,
country_index
,
:,
:]
=
category_distribution_country
# Multiply spatial distributions with the yearly emissions in the country to get spatially distributed emissions
spatial_emissions
=
spatial_distributions
[:,
:,
None
,
:,
:]
*
self
.
yremis
[:,
:,
:,
None
,
None
]
# cat, country, species, lat, lon
spatial_emissions
=
spatial_distributions
[:,
:,
None
,
:,
:]
*
yremis
[:,
:,
:,
None
,
None
]
# cat, country, species, lat, lon
# Sum over the countries to overlay them.
spatial_emissions
=
spatial_emissions
.
sum
(
axis
=
1
)
# Indices: category, species, lat, lon
self
.
spatial_emissions
=
spatial_emissions
...
...
@@ -196,8 +188,6 @@ class EmisModel(object):
cat_index
=
proxy_category_names
.
index
(
spatial_name
)
temporal_name
=
self
.
categories
[
category
][
'temporal'
]
temporal_profile
=
time_profiles
[
temporal_name
]
self
.
temporal_profile
=
temporal_profile
self
.
emissions
=
emissions
emissions
.
append
(
spatial_emissions
[
cat_index
,
:,
:,
:]
*
temporal_profile
[
None
,
:,
:,
:])
emissions
=
np
.
asarray
(
emissions
)
...
...
@@ -221,7 +211,8 @@ class EmisModel(object):
savedict
[
'long_name'
]
=
"Spatially distributed emissions"
savedict
[
'units'
]
=
"micromole/m2/s"
savedict
[
'dims'
]
=
dimid
+
dimtime
+
dimlat
+
dimlon
savedict
[
'values'
]
=
emissions
[
i
,:,:,:]
savedict
[
'values'
]
=
emissions
[
i
,
:,
:,
:]
savedict
[
'dtype'
]
=
'float'
f
.
add_data
(
savedict
)
f
.
close
()
...
...
@@ -284,7 +275,7 @@ class EmisModel(object):
engy_hrm
=
np
.
tile
(
engy_hr
,
(
nlat
,
nlon
,
1
)).
transpose
(
2
,
0
,
1
)
#Repeat the daily time profile for all days
cons_hrm
=
np
.
tile
(
cons_hr
,
(
nlat
,
nlon
,
1
)).
transpose
(
2
,
0
,
1
)
nt
=
len
(
self
.
times
)
nt
=
len
(
self
.
times
)
t_gas
=
np
.
empty
((
nt
,
nlat
,
nlon
))
t_coal
=
np
.
empty
((
nt
,
nlat
,
nlon
))
t_cons
=
np
.
empty
((
nt
,
nlat
,
nlon
))
...
...
@@ -358,7 +349,9 @@ class EmisModel(object):
t_ship
=
np
.
empty
((
nt
,
nlat
,
nlon
))
# Multiply the hourly, daily and monthly time profiles to create new profile
for
i
,
t
in
enumerate
(
self
.
times
):
# Only for the times in which we are interested
enum_times
=
list
(
enumerate
(
self
.
times
))[
indices
]
for
i
,
t
in
enum_times
:
month_idx
=
t
.
month
-
1
day_idx
=
t
.
weekday
()
hour_idx
=
t
.
hour
...
...
@@ -393,128 +386,24 @@ class EmisModel(object):
datepoint
=
self
.
startdate
enddate
=
self
.
enddate
#assert False
#time_indices = get_time_indices(datepoint, startdate=None, backtimes=24)
t_ind
=
np
.
ones
((
nt
,
nlat
,
nlon
))
# Make container (dict) containing all time profiles
time_profiles
=
{
't_gas'
:
t_gas
[
indices
],
't_coal'
:
t_coal
[
indices
],
't_ind'
:
t_ind
[
indices
],
't_cons'
:
t_cons
[
indices
],
't_gls'
:
t_gls
[
indices
],
't_carhw'
:
t_carhw
[
indices
],
't_carmr'
:
t_carmr
[
indices
],
't_carur'
:
t_carur
[
indices
],
't_hdvhw'
:
t_hdvhw
[
indices
],
't_hdvmr'
:
t_hdvmr
[
indices
],
't_hdvur'
:
t_hdvur
[
indices
],
't_ship'
:
t_ship
[
indices
]}
time_profiles
=
{
't_gas'
:
t_gas
[
indices
]
.
astype
(
np
.
float32
)
,
't_coal'
:
t_coal
[
indices
]
.
astype
(
np
.
float32
)
,
't_ind'
:
t_ind
[
indices
]
.
astype
(
np
.
float32
)
,
't_cons'
:
t_cons
[
indices
]
.
astype
(
np
.
float32
)
,
't_gls'
:
t_gls
[
indices
]
.
astype
(
np
.
float32
)
,
't_carhw'
:
t_carhw
[
indices
]
.
astype
(
np
.
float32
)
,
't_carmr'
:
t_carmr
[
indices
]
.
astype
(
np
.
float32
)
,
't_carur'
:
t_carur
[
indices
]
.
astype
(
np
.
float32
)
,
't_hdvhw'
:
t_hdvhw
[
indices
]
.
astype
(
np
.
float32
)
,
't_hdvmr'
:
t_hdvmr
[
indices
]
.
astype
(
np
.
float32
)
,
't_hdvur'
:
t_hdvur
[
indices
]
.
astype
(
np
.
float32
)
,
't_ship'
:
t_ship
[
indices
]
.
astype
(
np
.
float32
)
}
logging
.
debug
(
'Time profiles created'
)
return
time_profiles
def
get_temporal
(
self
,
dacycle
,
member
,
samples
,
do_pseudo
):
"""read in time profiles used for temporal distribution of the emissions"""
# First, get the station names from the smaples. For these stations, the time profiles will be written.
codes
=
samples
.
getvalues
(
'code'
)
self
.
codes
=
codes
## SI: Get the country names
stationnames
=
[]
for
code
in
codes
:
two_names
=
any
(
x
in
code
for
x
in
[
'UW'
,
'DW'
])
stationnames
.
append
(
'_'
.
join
(
code
.
split
(
'_'
)[
1
:
2
+
two_names
]))
stationnames
=
list
(
set
(
stationnames
))
# For pseudo-observation (do_pseudo==1) or when no time profiles need to be optimized the profiles are simply read from the
# input file and copied to another file. Otherwise create a new file per ensemble member at t=0 and update the profiles for each time step
# Check if the ensemble file exists. Otherwise create.
## SI: tmeporal data should include only countries.
ensfile
=
os
.
path
.
join
(
self
.
emisdir
,
'temporal_data_%03d.nc'
%
member
)
if
not
os
.
path
.
exists
(
ensfile
):
dumfile
=
os
.
path
.
join
(
self
.
emisdir
,
self
.
tempfilep
)
shutil
.
copy2
(
dumfile
,
ensfile
)
time_profiles_ds
=
nc
.
Dataset
(
ensfile
)
times
=
time_profiles_ds
[
'Times'
][:]
times
=
np
.
array
([
dtm
.
datetime
.
strptime
(
time
,
'%Y-%m-%d %H:%M:%S'
)
for
time
in
np
.
array
(
times
)])
self
.
times
=
times
subselect
=
logical_and
(
times
>=
self
.
timestartkey
,
times
<
self
.
timefinishkey
).
nonzero
()[
0
]
date_selection
=
times
.
take
(
subselect
,
axis
=
0
)
# The time profiles should always cover at least one full year
start_date
=
dtm
.
datetime
(
self
.
timestartkey
.
year
,
1
,
1
,
0
,
0
)
#first time included
end_date
=
dtm
.
datetime
(
self
.
timestartkey
.
year
,
12
,
31
,
23
,
0
)
#last time included
dt
=
dtm
.
timedelta
(
0
,
3600
)
starttime_index
=
np
.
where
(
times
==
self
.
timestartkey
)[
0
][
0
]
startdate_index
=
np
.
where
(
times
==
self
.
startdate
)[
0
][
0
]
end_index
=
np
.
where
(
times
==
self
.
timefinishkey
)[
0
][
0
]
""" Time profiles should, for a full year, always have an average value of 1.0. Therefore, a new method has been developed
to optimize time profiles such that we comply with this and the time profiles do not affect the total emissions.
For this purpose we apply the scaling factor (statevector) to the period covered in this cycle. The time profile for all dates
outside this period are scaled equally such that the time profile remains its average value of 1.0. Except previously updated
dates (from previous cycles) are maintained (they are already optimized!)."""
unselected_day
=
np
.
where
((
times
<
self
.
startdate
)
|
(
times
>
self
.
timefinishkey
))[
0
]
category_names
=
list
(
time_profiles_ds
[
'category_name'
][:])
self
.
category_names
=
category_names
station_names_ds
=
list
(
time_profiles_ds
[
'station_names'
][:])
profiles
=
np
.
zeros
(
time_profiles_ds
[
'time_profile'
][:].
shape
)
for
category
,
values
in
self
.
categories
.
items
():
cat_index
=
category_names
.
index
(
values
[
'temporal'
])
for
station
in
stationnames
:
## SI: for country in countries:
paramvalue
=
self
.
find_in_state
(
station
,
category
,
values
[
'temporal'
])
if
paramvalue
!=
1
:
station_index
=
station_names_ds
.
index
(
station
)
original_profile
=
time_profiles_ds
[
'time_profile'
][
station_index
,
cat_index
,
:]
selected_profile
=
time_profiles_ds
[
'time_profile'
][
station_index
,
cat_index
,
:].
take
(
subselect
)
new_profile
=
selected_profile
[:]
*
paramvalue
daily_sum
=
np
.
array
(
original_profile
[
unselected_day
]).
sum
()
original_profile
[:
startdate_index
]
=
original_profile
[:
startdate_index
]
-
(
original_profile
[:
startdate_index
]
/
daily_sum
)
*
(
new_profile
.
sum
()
-
selected_profile
.
sum
())
original_profile
[
end_index
:]
=
original_profile
[
end_index
:]
-
(
original_profile
[
end_index
:]
/
daily_sum
)
*
(
new_profile
.
sum
()
-
selected_profile
.
sum
())
original_profile
[
starttime_index
:
end_index
]
=
new_profile
profiles
[
station_index
,
cat_index
,
:]
=
original_profile
time_profiles_ds
.
close
()
# Now, write the output
tofile
=
nc
.
Dataset
(
ensfile
,
'r+'
)
for
category
,
values
in
self
.
categories
.
items
():
cat_index
=
category_names
.
index
(
values
[
'temporal'
])
for
station
in
stationnames
:
## SI: for country in countries:
if
self
.
find_in_state
(
station
,
category
,
'time_profile'
)
!=
1
:
station_index
=
station_names_ds
.
index
(
station
)
tofile
[
'time_profile'
][
station_index
,
cat_index
,
:]
=
profiles
[
station_index
,
cat_index
,
:]
tofile
.
close
()
# Now read in the correct profiles, select the correct time period and write the profiles into one file per ensemble member
time_profiles_ds
=
nc
.
Dataset
(
ensfile
)
subselect
=
logical_and
(
times
>=
times
[
0
]
,
times
<=
times
[
-
1
]).
nonzero
()[
0
]
date_selection
=
times
.
take
(
subselect
,
axis
=
0
)
for
station
in
stationnames
:
## SI: for country in countries:
station_index
=
station_names_ds
.
index
(
station
)
prior_file
=
os
.
path
.
join
(
self
.
inputdir
,
'prior_temporal_{0}_{1:03}.nc'
.
format
(
station
,
member
))
f
=
io
.
CT_CDF
(
prior_file
,
method
=
'create'
)
dimtime
=
f
.
add_dim
(
'Times'
,
len
(
date_selection
))
cat_names_done
=
[]
for
category
,
values
in
self
.
categories
.
items
():
cat_name
=
values
[
'temporal'
]
cat_index
=
category_names
.
index
(
cat_name
)
if
not
cat_name
in
cat_names_done
:
profile
=
np
.
array
(
time_profiles_ds
[
'time_profile'
][
station_index
,
cat_index
,
:].
take
(
subselect
))
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
cat_name
savedict
[
'long_name'
]
=
"Temporal distribution"
savedict
[
'units'
]
=
""
savedict
[
'dims'
]
=
dimtime
savedict
[
'values'
]
=
profile
f
.
add_data
(
savedict
)
cat_names_done
.
append
(
cat_name
)
f
.
close
()
time_profiles_ds
.
close
()
################### End Class Emission model ###################
...
...
da/ccffdas/emissionmodel.py.ori
0 → 100755
View file @
dbb92848
This diff is collapsed.
Click to expand it.
da/ccffdas/observationoperator.py
View file @
dbb92848
...
...
@@ -40,11 +40,11 @@ from da.tools.general import create_dirs, to_datetime
from
da.ccffdas.emissionmodel
import
EmisModel
from
da.baseclasses.observationoperator
import
ObservationOperator
#
try: # Import memoization. This speeds up the functions a lot.
#
from memoization import cached
#
except: # The memoization package is not always included. Import the memoization from #functools
#
from functools import lru_cache as cached
#
cached = cached()
try
:
# Import memoization. This speeds up the functions a lot.
from
memoization
import
cached
except
:
# The memoization package is not always included. Import the memoization from #functools
from
functools
import
lru_cache
as
cached
cached
=
cached
()
import
xlrd
...
...
@@ -107,31 +107,9 @@ def run_STILT(dacycle, footprint, datepoint, i_species, path, i_member):
# multiply footprint with emissions for each hour in the back trajectory. Indices: Time, Category, lat, lon
foot_flux
=
(
footprint
[
None
,
:,
:,
:]
*
spatial_emissions
[:,
:,
:,
:]).
sum
()
concentration_increase
=
float
(
recalc_factors
[
i_species
])
*
foot_flux
return
concentration_increase
#@cached
#def get_temporal_profiles(datepoint, station_name, path, i_member):
# """ Function that reads in the temporal profiles for the current timestep
# Input:
# datepoint: datepoint: datetime for which the temporal profile should be found
# i_station: int: the index of the location for which the c14 concentration should be calculated
# i_member: int: the index of the member for which the simulation is run
# Returns:
# np.array (2): the time profiles for all categories. Indices: time, category"""
# #read temporal profiles for the times within the footprint
# time_indices = get_time_indices(datepoint)
# temporal_prior = path + 'prior_temporal_{0}_{1:03}.nc'.format(station_name, i_member)
# temporal_prior = io.ct_read(temporal_prior, 'read')
# temp_profiles = []
# for category, values in categories.items():
# temporal_var_name = values['temporal']
# temporal_variation = temporal_prior.get_variable(temporal_var_name)[time_indices]
# temp_profiles.append(temporal_variation)
# #temporal_prior.close()
# temp_profiles = np.array(temp_profiles)
# return temp_profiles.T # Transpose to be similar to spatial data in dimensions
@
cached
def
get_spatial_emissions
(
dacycle
,
i_member
,
i_species
,
path
,
datepoint
):
""" Function that gets the spatial emissions
Input:
...
...
@@ -145,10 +123,10 @@ def get_spatial_emissions(dacycle, i_member, i_species, path, datepoint):
start_time
=
(
dacycle
[
'time.sample.end'
]
-
backtimes
)
indices
=
get_time_indices
(
datepoint
,
start_time
)
emisfile
=
path
+
'prior_spatial_{0:03d}.nc'
.
format
(
i_member
)
#format: species[SNAP,lon,lat]
emisfile
=
path
+
'prior_spatial_{0:03d}.nc'
.
format
(
i_member
)
#format: species[SNAP,
time,
lon,
lat]
f
=
io
.
ct_read
(
emisfile
,
method
=
'read'
)
emis
=
f
.
get_variable
(
spname
[
i_species
])
emis
=
emis
[:,
indices
,
:,
:]
# index the interesting times
emis
=
emis
[:,
indices
,
:,
:]
.
astype
(
np
.
float32
)
# index the interesting times
f
.
close
()
return
emis
...
...
@@ -169,8 +147,6 @@ def get_time_index_nc(time=None, startdate=None):
time_index
=
int
(
timediff_hours
)
return
time_index
#@cached(ttl=5)
def
get_time_indices
(
datepoint
,
startdate
=
None
,
backtimes
=
24
):
"""Function that gets the time indices in the flux files
Because if the footprint is for 24 hours back, we need the fluxes 24 hours back"""
...
...
@@ -233,22 +209,19 @@ class STILTObservationOperator(ObservationOperator):
self
.
bgswitch
=
int
(
dacycle
.
dasystem
[
'obs.bgswitch'
])
self
.
bgfile
=
dacycle
.
dasystem
[
'obs.background'
]
self
.
btime
=
int
(
dacycle
.
dasystem
[
'run.backtime'
])
self
.
categories
=
categories
self
.
categories
=
categories
# Imported from file at top
self
.
inputdir
=
dacycle
.
dasystem
[
'inputdir'
]
self
.
paramdict
=
rc
.
read
(
dacycle
.
dasystem
[
'paramdict'
])
biosphere_fluxes
=
nc
.
Dataset
(
dacycle
.
dasystem
[
'biosphere_fluxdir'
])
self
.
gpp
=
biosphere_fluxes
[
'GPP'
][:]
#[time_indices]
self
.
ter
=
biosphere_fluxes
[
'TER'
][:]
#[time_indices]
biosphere_fluxes
.
close
()
with
nc
.
Dataset
(
dacycle
.
dasystem
[
'biosphere_fluxdir'
])
as
biosphere_fluxes
:
self
.
gpp
=
biosphere_fluxes
[
'GPP'
][:].
astype
(
np
.
float32
)
self
.
ter
=
biosphere_fluxes
[
'TER'
][:].
astype
(
np
.
float32
)
with
nc
.
Dataset
(
self
.
dacycle
.
dasystem
[
'country.mask'
])
as
countries
:
self
.
masks
=
countries
[
'country_mask'
][:]
self
.
country_names
=
countries
[
'country_names'
][:]
self
.
nffparams
=
int
(
self
.
dacycle
.
dasystem
[
'nffparameters'
])
self
.
nbioparams
=
int
(
self
.
dacycle
.
dasystem
[
'nbioparameters'
])
self
.
noise
=
{
'CO2'
:
2.2
,
'CO'
:
8
,
'C14'
:
2
,
'C14_PSEUDO'
:
0
,
'C14_INTEGRATED'
:
2
,
'C14targeted'
:
2
}
logging
.
info
(
'Noise is hardcoded to be: {}'
.
format
(
self
.
noise
))
def
get_initial_data
(
self
,
samples
):
"""Function that loads the initial data to the observation operator"""
...
...
@@ -391,7 +364,7 @@ class STILTObservationOperator(ObservationOperator):
country
=
b
''
.
join
(
np
.
array
(
country
)).
decode
()
country_mask
=
self
.
masks
[
i
]
index_in_state
=
self
.
emismodel
.
find_in_state
(
country
.
upper
(),
'bio'
,
return_index
=
True
)
index_in_state
=
self
.
emismodel
.
find_in_state
(
param_values
,
country
.
upper
(),
'bio'
,
return_index
=
True
)
if
index_in_state
:
param_value
=
param_values
[
index_in_state
]
else
:
param_value
=
1
gpp_country
=
country_mask
[
None
,
:,
:]
*
gpp
[:,
:,
:]
*
param_value
...
...
@@ -469,7 +442,7 @@ class STILTObservationOperator(ObservationOperator):
footprint
=
nc
.
Dataset
(
f
)[
'foot'
]
# Flip, because the times are negative
return
np
.
flipud
(
footprint
)
return
np
.
flipud
(
footprint
)
.
astype
(
np
.
float32
)
#@cached
def
get_background
(
self
,
i_species
,
site
,
datepoint
):
...
...
@@ -619,7 +592,6 @@ class STILTObservationOperator(ObservationOperator):
# Initialise a list with the calculated concentrations. To be filled in this function.
calculated_concentrations
=
np
.
zeros
((
len
(
self
.
samples
),
self
.
forecast_nmembers
))
calculated_concentrations
[:,
:]
=
np
.
nan
self
.
calculated_concentrations
=
calculated_concentrations
# Initialise a datepoint that has been done. For now, this is None
previously_done_datepoint
=
None
;
previous_day
=
None
previously_done_site
=
None
...
...
@@ -662,13 +634,15 @@ class STILTObservationOperator(ObservationOperator):
except
:
pass
#calculated_concentrations = np.delete(calculated_concentrations, i, axis=0)
# Set the time and site
previously_done_datepoint
=
datepoint
;
previous_day
=
datepoint
.
day
previously_done_site
=
site
previously_done_datepoint
=
datepoint
previous_day
=
datepoint
.
day
previously_done_site
=
site
# This is how ingrid did it, so I will too...
self
.
mod
=
np
.
array
(
calculated_concentrations
)
# add the calculated concentrations to the object
self
.
calculated_concentrations
=
calculated_concentrations
logging
.
debug
(
'Cache info: {}'
.
format
(
get_spatial_emissions
.
cache_info
()))
def
calc_concentrations
(
self
,
sample
):
"""Function that calculates the concentration for a sample and all members.
...
...
@@ -694,7 +668,7 @@ class STILTObservationOperator(ObservationOperator):
background
=
self
.
get_background_orig
(
species
.
upper
())
# First, add noise (this represents errors in the transport model
noise
=
np
.
random
.
normal
(
0
,
s
elf
.
noise
[
sample
.
species
.
upper
()]
)
noise
=
np
.
random
.
normal
(
0
,
s
ample
.
mdm
)
# Some different cases for different species.
# First case: Species is not c14:
...
...
@@ -795,12 +769,11 @@ class STILTObservationOperator(ObservationOperator):
def
save_data
(
self
):
""" Write the data that is needed for a restart or recovery of the Observation Operator to the save directory """
import
da.tools.io4
as
io
# Create a new file
f
=
io
.
CT_CDF
(
self
.
simulated_file
,
method
=
'create'
)
logging
.
debug
(
'Creating new simulated observation file in ObservationOperator (%s)'
%
self
.
simulated_file
)
# Save the id of the observation
dimid
=
f
.
createDimension
(
'obs_num'
,
size
=
None
)
dimid
=
(
'obs_num'
,)
savedict
=
io
.
std_savedict
.
copy
()
...
...
@@ -812,6 +785,7 @@ class STILTObservationOperator(ObservationOperator):
savedict
[
'comment'
]
=
"Unique index number within this dataset ranging from 0 to UNLIMITED."
f
.
add_data
(
savedict
,
nsets
=
0
)
# Save the simulated mole fraction
dimmember
=
f
.
createDimension
(
'nmembers'
,
size
=
self
.
forecast_nmembers
)
dimmember
=
(
'nmembers'
,)
savedict
=
io
.
std_savedict
.
copy
()
...
...
@@ -829,91 +803,9 @@ class STILTObservationOperator(ObservationOperator):
for
i
,
data
in
enumerate
(
zip
(
ids
,
self
.
mod
)):
f
.
variables
[
'obs_num'
][
i
]
=
data
[
0
]
f
.
variables
[
'model'
][
i
,:]
=
data
[
1
]
dum
=
f
.
variables
[
'model'
][:]
f
.
close
()
f_in
.
close
()
def
save_obs
(
self
):
"""Write pseudo-observations to file"""
ct1
=
0
for
k
in
range
(
self
.
nrloc
*
self
.
nrspc
):
newfile
=
os
.
path
.
join
(
self
.
obsdir
,
self
.
obsnc
[
k
]
+
'.nc'
)
f
=
io
.
CT_CDF
(
newfile
,
method
=
'create'
)
logging
.
debug
(
'Creating new pseudo observation file in ObservationOperator (%s)'
%
newfile
)
dimid
=
f
.
add_dim
(
'Time'
,
len
(
self
.
datelist
))
ln
=
len
(
self
.
datelist
)
str19
=
f
.
add_dim
(
'strlen'
,
19
)
str3
=
f
.
add_dim
(
'strlen2'
,
3
)
data
=
[]
for
it
,
t
in
enumerate
(
self
.
datelist
):
data
.
append
(
list
(
t
.
isoformat
().
replace
(
'T'
,
'_'
)))
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'dtype'
]
=
"char"
savedict
[
'name'
]
=
"Times"
savedict
[
'units'
]
=
""
savedict
[
'dims'
]
=
dimid
+
str19
savedict
[
'values'
]
=
data
f
.
add_data
(
savedict
)
data
=
ln
*
[
self
.
lat
[
int
(
k
/
self
.
nrspc
)]]
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
"lat"
savedict
[
'units'
]
=
"degrees_north"
savedict
[
'dims'
]
=
dimid
savedict
[
'values'
]
=
data
f
.
add_data
(
savedict
)
data
=
ln
*
[
self
.
lon
[
int
(
k
/
self
.
nrspc
)]]
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
"lon"
savedict
[
'units'
]
=
"degrees_east"
savedict
[
'dims'
]
=
dimid
savedict
[
'values'
]
=
data
f
.
add_data
(
savedict
)
data
=
ln
*
[
self
.
hgt
[
int
(
k
/
self
.
nrspc
)]]
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
"alt"
savedict
[
'units'
]
=
"m above ground"
savedict
[
'dims'
]
=
dimid
savedict
[
'values'
]
=
data
f
.
add_data
(
savedict
)
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'name'
]
=
"obs"
savedict
[
'units'
]
=
"ppm or ppb"
savedict
[
'dims'
]
=
dimid
savedict
[
'values'
]
=
self
.
mod_prior
[
ct1
:
ct1
+
ln
]
f
.
add_data
(
savedict
)
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'dtype'
]
=
"char"
savedict
[
'name'
]
=
"species"
savedict
[
'units'
]
=
"observed species"
savedict
[
'dims'
]
=
dimid
+
str3
savedict
[
'values'
]
=
self
.
sps
[
ct1
:
ct1
+
ln
]
f
.
add_data
(
savedict
)
savedict
=
io
.
std_savedict
.
copy
()
savedict
[
'dtype'
]
=
"int"
savedict
[
'name'
]
=
"ids"
savedict
[
'units'
]
=
"unique observation identification number"
savedict
[
'dims'
]
=
dimid
savedict
[
'values'
]
=
self
.
ids
[
ct1
:
ct1
+
ln
]
f
.
add_data
(
savedict
)
f
.
close
()
ct1
+=
ln
logging
.
debug
(
"Successfully wrote data to obs file %s"
%
newfile
)
################### End Class STILT ###################
...
...
da/ccffdas/statevector.py
View file @
dbb92848
...
...
@@ -74,11 +74,6 @@ class CO2StateVector(StateVector):
# of lists of EnsembleMember objects, we define member 0 as the mean of the distribution and n=1,...,nmembers as the spread.
self
.
ensemble_members
=
[[]
for
n
in
range
(
self
.
nlag
)]
# self.ensemble_members = list(range(self.nlag))
#
# for n in range(self.nlag):
# self.ensemble_members[n] = []
# This specifies the file to read with the gridded mask at 1x1 degrees. Each gridbox holds a number that specifies the parametermember
# that maps onto it. From this map, a dictionary is created that allows a reverse look-up so that we can map parameters to a grid.
...
...
@@ -88,13 +83,13 @@ class CO2StateVector(StateVector):