Commit ed56c8e7 authored by Woude, Auke van der's avatar Woude, Auke van der
Browse files

Cleanup in observationoperator

parent e6f5955c
......@@ -87,34 +87,34 @@ alpha_14CO2_gpp = 1 + epsilon_14CO2_gpp/1000.
################### Begin Class STILT ###################
model_settings = rc.read('/projects/0/ctdas/awoude/develop/exec/da/rc/stilt/stilt.rc')
recalc_factors = [1, 1000]
recalc_factors = {'CO2': 1, 'CO': 1000}
spname = ['CO2', 'CO']
def run_STILT(dacycle, footprint, datepoint, i_species, path, i_member):
def run_STILT(dacycle, footprint, datepoint, species, path, i_member):
"""This function reads in STILT footprints and returns hourly concentration data
Input:
footprint: np.array: the footprint of the station
datepoint: datepoint: datetime: the datepoint for which the concentrations should be calculated
site : str: name of the location
i_species : int: index of the species
species : str: the species
i_member : int: index of the ensemblemember
Returns:
float: the concentration increase at the respective location due to the emissions from the respective species"""
# get the emission data:
spatial_emissions = get_spatial_emissions(dacycle, i_member, i_species, path, datepoint)
spatial_emissions = get_spatial_emissions(dacycle, i_member, species, path, datepoint)
# multiply footprint with emissions for each hour in the back trajectory. Indices: Time, Category, lat, lon
foot_flux = (footprint[None, :, :, :] * spatial_emissions[:, :, :, :]).sum()
concentration_increase = float(recalc_factors[i_species]) * foot_flux
concentration_increase = float(recalc_factors[species]) * foot_flux
return concentration_increase
@cached
def get_spatial_emissions(dacycle, i_member, i_species, path, datepoint):
def get_spatial_emissions(dacycle, i_member, species, path, datepoint):
""" Function that gets the spatial emissions
Input:
i_member: int: the index of the member for which the simulation is run
i_species: int: the index of the species for which the simulation is run
species: str: the species for which the simulation is run
Returns:
np.ndarray (3d): the spatial emissions per category, lat and lon"""
#read spatially distributed emissions calculated with the emission model
......@@ -125,7 +125,7 @@ def get_spatial_emissions(dacycle, i_member, i_species, path, datepoint):
indices = get_time_indices(datepoint, start_time)
emisfile = path +'prior_spatial_{0:03d}.nc'.format(i_member) #format: species[SNAP, time, lon, lat]
f = io.ct_read(emisfile,method='read')
emis=f.get_variable(spname[i_species])
emis=f.get_variable(species)
emis = emis[:, indices, :, :].astype(np.float32) # index the interesting times
f.close()
return emis
......@@ -154,28 +154,6 @@ def get_time_indices(datepoint, startdate=None, backtimes=24):
time_index = get_time_index_nc(datepoint, startdate=startdate)
return slice(time_index - backtimes, time_index)
def get_biosphere_concentration(foot, gpp_mem, ter_mem):
"""Function that calculates the atmospheric increase due to the exchange of fluxes over the footprint
Input:
foot: np.array(times, lat, lon):
the footprint of the station
gpp_mem: np.array(nmmebers, time, lat, lon)
map of the gpp for each member
ter_mem: np.array(nmembers, time, lat, lon)
map of the ter for each member
Returns:
tuple of 2 floats: GPP and TER in umol/s """
# First, recalculate to good units
gpp = gpp_mem[:, :, :, :] * (1./MASS_C) * MOL2UMOL * PERHR2PERS
ter = ter_mem[:, :, :, :] * (1./MASS_C) * MOL2UMOL * PERHR2PERS
# Multiply with the footprint
gpp_increase = - (gpp * foot[None, :, :, :]).sum()
ter_increase = (ter * foot[None, :, :, :]).sum()
return gpp_increase, ter_increase
class STILTObservationOperator(ObservationOperator):
def __init__(self, filename, dacycle=None): #only the filename used to specify the location of the stavector file for wrf-stilt runs
""" The instance of an STILTObservationOperator is application dependent """
......@@ -256,66 +234,17 @@ class STILTObservationOperator(ObservationOperator):
self.forecast_nmembers = int(self.dacycle['da.optimizer.nmembers'])
# Set the inputdir
self.param_inputdir = self.dacycle.dasystem['inputdir']
#list all observation locations and species
self.lat = []
self.lon = []
self.hgt = []
self.obsnc = []
self.mmass = []
self.sitenames=[]
self.recalc_factors = []
infile = os.path.join(self.obsdir, self.obsfile)
f = open(infile, 'r')
lines = f.readlines()
f.close()
self.spname = []
# Parse input file
for line in lines:
if line[0] == '#': continue
else:
ct_, filename, lat, lon, height, species_name, species_mass, recalc_factor, *_ = line.split(',')
two_names = any(x in filename for x in ['UW', 'DW'])
sitename = ('_'.join(filename.split('_')[1:2 + two_names]))
ct = int(ct_) - 1 # Set the counter
self.obsnc.append(filename)
self.sitenames.append(sitename)
if species_name in self.spname:
if species_name == self.spname[0]:
self.lat.append(lat)
self.lon.append(lon)
self.hgt.append(height)
else:
self.spname.append(species_name)
self.mmass.append(species_mass)
self.recalc_factors.append(recalc_factor)
if ct == 0:
self.lat.append(lat)
self.lon.append(lon)
self.hgt.append(height)
self.temporal_var = []
for k, v in self.categories.items():
self.temporal_var.append(v['temporal'])
self.temporal_var = [v['temporal'] for k, v in self.categories.items()]
#set time control variables for this cycle
if do_pseudo==0:
self.timestartkey = dtm.datetime(self.dacycle['time.sample.start'].year,
self.dacycle['time.sample.start'].month,
self.dacycle['time.sample.start'].day,
self.dacycle['time.sample.start'].hour, 0)
self.timefinishkey = dtm.datetime(self.dacycle['time.sample.end'].year,
self.dacycle['time.sample.end'].month,
self.dacycle['time.sample.end'].day,
self.dacycle['time.sample.end'].hour, 0)
self.timestartkey = self.dacycle['time.sample.start']
self.timefinishkey = self.dacycle['time.sample.end']
elif do_pseudo==1:
self.timestartkey = dtm.datetime(self.dacycle['time.fxstart'].year,
self.dacycle['time.fxstart'].month,
self.dacycle['time.fxstart'].day,
self.dacycle['time.fxstart'].hour, 0)
self.timefinishkey = dtm.datetime(self.dacycle['time.finish'].year,
self.dacycle['time.finish'].month,
self.dacycle['time.finish'].day,
self.dacycle['time.finish'].hour, 0)
self.timestartkey = self.dacycle['time.fxstart']
self.timefinishkey = self.dacycle['time.finish']
self.prepare_c14()
self.run_dynamic_emis_model()
......@@ -385,13 +314,10 @@ class STILTObservationOperator(ObservationOperator):
file = self.model_settings['biosphere_fluxdir']
dis_eq_flux = self.get_nc_variable(file, 'TER_dC14')
self.dis_eq_flux = dis_eq_flux
self.dis_eq_flux = self.get_nc_variable(file, 'TER_dC14')
file = self.model_settings['nuclear_fluxdir']
nuc_flux = self.get_nc_variable(file, 'E_14CO2_nuc')
self.nuc_flux = nuc_flux
self.nuc_flux = self.get_nc_variable(file, 'E_14CO2_nuc')
### !!!! Only cache if made dependent on datepoint/datetime !!!!
def get_time_index_nc(self, time=None, startdate=None):
......@@ -421,7 +347,6 @@ class STILTObservationOperator(ObservationOperator):
time_index = self.get_time_index_nc(datepoint, startdate=startdate)
return slice(time_index - int(self.model_settings['num_backtimes']), time_index)
# @cached
def get_foot(self, site, datepoint):
"""Function that gets the footprint for the current time and site.
Returns a 3D np.array with dims (time, lat, lon)
......@@ -445,11 +370,11 @@ class STILTObservationOperator(ObservationOperator):
return np.flipud(footprint).astype(np.float32)
#@cached
def get_background(self, i_species, site, datepoint):
def get_background(self, species, site, datepoint):
"""Function that finds the center of mass of the first footprint and the time corresponding to it.
and finds the concentration in the center of mass. This is used as the background.
Input:
i_species: int: the index of the species for which the background should be found
species: str: the species for which the background should be found
i_loc : int: the index of the locatin for which the background should be found
datepoint: datetime.datetime: the datetime of the background concentration
Returns:
......@@ -469,7 +394,7 @@ class STILTObservationOperator(ObservationOperator):
center_of_mass = ndimage.measurements.center_of_mass(fp[start_influence])
center_of_mass = np.array(np.rint(center_of_mass), dtype=int)
species_name = self.spname[i_species]
species_name = self.spname[species]
index = self.get_time_index_nc() - (int(self.model_settings['num_backtimes']) - start_influence)
......@@ -483,9 +408,7 @@ class STILTObservationOperator(ObservationOperator):
def get_background_orig(self, species):
"""Function that finds the background concentration, non-time dependent and hard-coded.
Input:
i_species: int: the index of the species for which the background should be found
i_loc : int: the index of the locatin for which the background should be found
datepoint: datetime for which the background concentration should be found.
species: the species for which the background should be found
Returns:
float: background concentration
"""
......@@ -512,12 +435,9 @@ class STILTObservationOperator(ObservationOperator):
gpp_increase = - (gpp * foot[None, :, :, :]).sum(axis=(1,2,3))
ter_increase = (ter * foot[None, :, :, :]).sum(axis=(1,2,3))
# logging.debug('GGP flux = {0:.3}; TER flux = {1:.3}'.format(gpp_increase, ter_increase))
return gpp_increase, ter_increase
# @cached
def get_c14_concentration(self, datepoint, gpp, ter, ff_flux, background):
"""Function that gets the c14 concentration based on the emissions by the biosphere,
fossil fuels and nuclear power. The constants are defined at the top of this script.
......@@ -565,7 +485,6 @@ class STILTObservationOperator(ObservationOperator):
delta_14CO2 = (Asn * np.exp(lambda_14CO2 * dt_obs) / Aabs -1) * 1000.# per mille
return delta_14CO2
# @cached
def get_nc_variable(self, file, fluxname):
"""Helper function that gets the values from an nc file
Input:
......@@ -636,12 +555,13 @@ class STILTObservationOperator(ObservationOperator):
previously_done_datepoint = datepoint
previous_day = datepoint.day
previously_done_site = site
logging.debug('Cache info: {}'.format(get_spatial_emissions.cache_info()))
# This is how ingrid did it, so I will too...
self.mod = np.array(calculated_concentrations)
# add the calculated concentrations to the object
self.calculated_concentrations = calculated_concentrations
logging.debug('Cache info: {}'.format(get_spatial_emissions.cache_info()))
def calc_concentrations(self, sample):
"""Function that calculates the concentration for a sample and all members.
......@@ -657,7 +577,6 @@ class STILTObservationOperator(ObservationOperator):
# The species, the sitename, the datepoint
# Note that these could also be passed from the function that calls this function
species = sample.species[:3]
i_species = self.spname.index(species.upper())
datepoint = sample.xdate
# logging.debug('{} {}'.format(datepoint, sample.species))
two_names = any(x in sample.code for x in ['UW', 'DW'])
......@@ -675,7 +594,7 @@ class STILTObservationOperator(ObservationOperator):
if not 'C14' in species.upper():
pool = Pool(self.forecast_nmembers)
# Create the function that calculates the conentration
func = partial(run_STILT, self.dacycle, self.foot, datepoint, i_species, self.inputdir)
func = partial(run_STILT, self.dacycle, self.foot, datepoint, species, self.inputdir)
# We need to run over all members
memberlist = list(range(0, self.forecast_nmembers))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment