Commit 9ff3413f authored by Woude, Auke van der's avatar Woude, Auke van der
Browse files

Merge branch 'master' into develop

parents 188dd6be faa0f782
......@@ -2,5 +2,11 @@
*.py
*.rc
*.jb
/da/*/__pycache__
/da/*/*.pyc
joblog
# Exclude the logfiles
*.log
# Exclude files created by the slurm system
slurm*.out
# Exclude compiled files
da/**/__pycache__/
da/**/*.pyc
This diff is collapsed.
......@@ -40,15 +40,15 @@ from da.tools.general import create_dirs, to_datetime
from da.ccffdas.emissionmodel import EmisModel
from da.baseclasses.observationoperator import ObservationOperator
try: # Import memoization. This speeds up the functions a lot.
from memoization import cached
except: # The memoization package is not always included. Import the memoization from functools
from functools import lru_cache as cached
cached = cached()
#try: # Import memoization. This speeds up the functions a lot.
# from memoization import cached
#except: # The memoization package is not always included. Import the memoization from #functools
# from functools import lru_cache as cached
# cached = cached()
import xlrd
from da.ffdas import category_info
from da.ccffdas import category_info
categories = category_info.categories
# global constants, which will be used in the following classes
identifier = 'WRF-STILT'
......@@ -86,10 +86,12 @@ epsilon_14CO2_gpp = -36.
alpha_14CO2_gpp = 1 + epsilon_14CO2_gpp/1000. # = 0.964
################### Begin Class STILT ###################
model_settings = rc.read('/projects/0/ctdas/RINGO/inversions/ffdas/exec/da/rc/ffdas/stilt.rc')
model_settings = rc.read('/projects/0/ctdas/awoude/develop/exec/da/rc/stilt/stilt.rc')
recalc_factors = [1, 1000]
spname = ['CO2', 'CO']
def run_STILT(footprint, datepoint, site, i_species, path, i_member):
def run_STILT(dacycle, footprint, datepoint, i_species, path, i_member):
"""This function reads in STILT footprints and returns hourly concentration data
Input:
footprint: np.array: the footprint of the station
......@@ -100,39 +102,37 @@ def run_STILT(footprint, datepoint, site, i_species, path, i_member):
Returns:
float: the concentration increase at the respective location due to the emissions from the respective species"""
# get the date:
temp_profiles = get_temporal_profiles(datepoint, site, path, i_member)
spatial_emissions = get_spatial_emissions(i_member, i_species, path)
spatial_emissions = get_spatial_emissions(dacycle, i_member, i_species, path, datepoint)
# multiply footprint with emissions for each hour in the back trajectory. Indices: Time, Category, lat, lon
foot_flux = (footprint[:, None, :, :] * spatial_emissions[None, :, :, :] * temp_profiles[:, :, None, None]).sum()
foot_flux = (footprint[None, :, :, :] * spatial_emissions[:, :, :, :]).sum()
concentration_increase = float(recalc_factors[i_species]) * foot_flux
return concentration_increase
@cached
def get_temporal_profiles(datepoint, station_name, path, i_member):
""" Function that reads in the temporal profiles for the current timestep
Input:
datepoint: datepoint: datetime for which the temporal profile should be found
i_station: int: the index of the location for which the c14 concentration should be calculated
i_member: int: the index of the member for which the simulation is run
Returns:
np.array (2): the time profiles for all categories. Indices: time, category"""
#read temporal profiles for the times within the footprint
time_indices = get_time_indices(datepoint)
temporal_prior = path + 'prior_temporal_{0}_{1:03}.nc'.format(station_name, i_member)
temporal_prior = io.ct_read(temporal_prior, 'read')
temp_profiles = []
for category, values in categories.items():
temporal_var_name = values['temporal']
temporal_variation = temporal_prior.get_variable(temporal_var_name)[time_indices]
temp_profiles.append(temporal_variation)
#temporal_prior.close()
temp_profiles = np.array(temp_profiles)
return temp_profiles.T # Transpose to be similar to spatial data in dimensions
@cached
def get_spatial_emissions(i_member, i_species, path):
#@cached
#def get_temporal_profiles(datepoint, station_name, path, i_member):
# """ Function that reads in the temporal profiles for the current timestep
# Input:
# datepoint: datepoint: datetime for which the temporal profile should be found
# i_station: int: the index of the location for which the c14 concentration should be calculated
# i_member: int: the index of the member for which the simulation is run
# Returns:
# np.array (2): the time profiles for all categories. Indices: time, category"""
# #read temporal profiles for the times within the footprint
# time_indices = get_time_indices(datepoint)
# temporal_prior = path + 'prior_temporal_{0}_{1:03}.nc'.format(station_name, i_member)
# temporal_prior = io.ct_read(temporal_prior, 'read')
# temp_profiles = []
# for category, values in categories.items():
# temporal_var_name = values['temporal']
# temporal_variation = temporal_prior.get_variable(temporal_var_name)[time_indices]
# temp_profiles.append(temporal_variation)
# #temporal_prior.close()
# temp_profiles = np.array(temp_profiles)
# return temp_profiles.T # Transpose to be similar to spatial data in dimensions
def get_spatial_emissions(dacycle, i_member, i_species, path, datepoint):
""" Function that gets the spatial emissions
Input:
i_member: int: the index of the member for which the simulation is run
......@@ -140,9 +140,15 @@ def get_spatial_emissions(i_member, i_species, path):
Returns:
np.ndarray (3d): the spatial emissions per category, lat and lon"""
#read spatially distributed emissions calculated with the emission model
backtimes = 24 + int(dacycle['time.cycle']) * 24 # extra hours in backtime
backtimes = dtm.timedelta(hours=backtimes)
start_time = (dacycle['time.sample.end'] - backtimes)
indices = get_time_indices(datepoint, start_time)
emisfile = path +'prior_spatial_{0:03d}.nc'.format(i_member) #format: species[SNAP,lon,lat]
f = io.ct_read(emisfile,method='read')
emis=f.get_variable(spname[i_species])
emis = emis[:, indices, :, :] # index the interesting times
f.close()
return emis
......@@ -160,16 +166,17 @@ def get_time_index_nc(time=None, startdate=None):
# Note that this is in hours, and thus assumes that the flux files are hourly as well
timediff = time - startdate
timediff_hours = int(timediff.total_seconds()/3600) # 1hour could also be softcoded from time.cycle
time_index = int(timediff_hours) + DO_RINGO
time_index = int(timediff_hours)
return time_index
@cached(ttl=5)
def get_time_indices(datepoint, startdate=None):
#@cached(ttl=5)
def get_time_indices(datepoint, startdate=None, backtimes=24):
"""Function that gets the time indices in the flux files
Because if the footprint is for 24 hours back, we need the fluxes 24 hours back"""
time_index = get_time_index_nc(datepoint, startdate=startdate)
return slice(time_index - int(model_settings['num_backtimes']), time_index)
return slice(time_index - backtimes, time_index)
def get_biosphere_concentration(foot, gpp_mem, ter_mem):
"""Function that calculates the atmospheric increase due to the exchange of fluxes over the footprint
......@@ -228,7 +235,7 @@ class STILTObservationOperator(ObservationOperator):
self.btime = int(dacycle.dasystem['run.backtime'])
self.categories = categories
self.inputdir = dacycle.dasystem['inputdir']
self.paramdict = rc.read('/projects/0/ctdas/RINGO/inversions/Data/paramdict.rc')
self.paramdict = rc.read(dacycle.dasystem['paramdict'])
biosphere_fluxes = nc.Dataset(dacycle.dasystem['biosphere_fluxdir'])
self.gpp = biosphere_fluxes['GPP'][:]#[time_indices]
......@@ -349,10 +356,14 @@ class STILTObservationOperator(ObservationOperator):
def run_dynamic_emis_model(self):
"""Function that runs the dynamic emission model and writes the spatial emissions and time profiles to nc files
based on emismodel.py"""
logging.info('Calculating the emissions; entering Emismodel.py')
emismodel = EmisModel()
emismodel.setup(self.dacycle)
emismodel.get_emis(self.dacycle, self.allsamples, do_pseudo=0)
self.emismodel = emismodel
backtimes = 24 + int(self.dacycle['time.cycle']) * 24 # extra hours in backtime
indices = get_time_indices(self.dacycle['time.sample.end'], backtimes=backtimes)
emismodel.get_emis(self.dacycle, self.allsamples, indices=indices, do_pseudo=0)
logging.info('Emissions calculated')
def prepare_biosphere(self, datepoint):
"""Function that prepares the biosphere fluxes on a map
......@@ -427,7 +438,7 @@ class STILTObservationOperator(ObservationOperator):
# Note that this is in hours, and thus assumes that the flux files are hourly as well
timediff = time - startdate
timediff_hours = int(timediff.total_seconds()/3600) # 1hour could also be softcoded from time.cycle
time_index = int(timediff_hours) + DO_RINGO
time_index = int(timediff_hours)
return time_index
def get_time_indices(self, datepoint, startdate=None):
......@@ -514,7 +525,6 @@ class STILTObservationOperator(ObservationOperator):
Input:
i_member: int: member number for which the biosphere concentration should be calculated
site: str: location for which the concentration should be calculated
datepoint: datetime.datetime: the datepoint for which the concentration should be calculated
total: bool, optional: Whether to returned summed values or gpp and ter seperately as tuple
Returns:
if total == True:
......@@ -594,7 +604,7 @@ class STILTObservationOperator(ObservationOperator):
fluxmap = data[fluxname][:]
data.close()
return fluxmap
def run_forecast_model(self,dacycle,do_pseudo,adv):
self.prepare_run(do_pseudo,adv)
self.run(dacycle,do_pseudo)
......@@ -622,7 +632,7 @@ class STILTObservationOperator(ObservationOperator):
# Check if it is a new date
self.sample = sample
datepoint = sample.xdate
indices = get_time_indices(datepoint)
if datepoint != previously_done_datepoint:
# If it is a new date, we need a new biosphere prior
......@@ -660,12 +670,6 @@ class STILTObservationOperator(ObservationOperator):
# add the calculated concentrations to the object
self.calculated_concentrations = calculated_concentrations
# Clear the cache from the functions, so that a new cycle does
# (with different parameters) does not find the same results.
get_spatial_emissions.cache_clear()
get_temporal_profiles.cache_clear()
def calc_concentrations(self, sample):
"""Function that calculates the concentration for a sample and all members.
Gets the relevant information from the sample and then calculates the concentration
......@@ -698,7 +702,7 @@ class STILTObservationOperator(ObservationOperator):
if not 'C14' in species.upper():
pool = Pool(self.forecast_nmembers)
# Create the function that calculates the conentration
func = partial(run_STILT, self.foot, datepoint, site, i_species, self.inputdir)
func = partial(run_STILT, self.dacycle, self.foot, datepoint, i_species, self.inputdir)
# We need to run over all members
memberlist = list(range(0, self.forecast_nmembers))
......
......@@ -32,7 +32,7 @@ def ensemble_smoother_pipeline(dacycle, platform, dasystem, samples, statevector
invert(dacycle, statevector, optimizer, obsoperator)
advance(dacycle, samples, statevector, obsoperator)
#advance(dacycle, samples, statevector, obsoperator)
save_and_submit(dacycle, statevector)
logging.info("Cycle finished...exiting pipeline")
......
!!! Info for the CarbonTracker data assimilation system
basepath : /projects/0/ctdas/RINGO/inversions/
basepath : /projects/0/ctdas/awoude/develop/
name :
strategy : CO2C14
datadir : ${basepath}/Data
inputdir : ${basepath}/${name}${strategy}/input/
outputdir : ${basepath}/${name}${strategy}/output/
restartdir : ${basepath}/${name}${strategy}/restart/
strategy : CO2
datadir : /projects/0/ctdas/RINGO/inversions/Data
inputdir : ${basepath}/input/
outputdir : ${basepath}/output/
restartdir : ${basepath}/restart/
! list of all observation sites
obs.input.id : obsfiles.csv
! number of observation sites included; number of species included and to be used in inversion
......@@ -15,16 +15,17 @@ obs.dir : obsfiles${strategy}
do.co : 0
do.c14integrated: 0
do.c14targeted: 0
obs.spec.name : CO2,CO
obs.spec.name : CO2
! number of emission categories defined in the emission model
obs.cat.nr : 14
! For Rdam obs
obs.sites.rc : ${datadir}/sites_weights.rc
obs.sites.rc : ${datadir}/sites_weights2.rc
! number of parameters
! In the covmatrix and statevector, the ff parameters are first, then the bio parameters!
nffparameters : 16
nbioparameters : 4
nparameters : 20
nffparameters : 24
nbioparameters : 6
nparameters : 30
paramdict : /projects/0/ctdas/RINGO/inversions/Data/paramdict.rc
! set fixed seed for random number generator, or use 0 if you want to use any random seed
random.seed : 4385
!file with prior estimate of scaling factors (statevector) and covariances
......
......@@ -100,7 +100,7 @@ class CartesiusPlatform(Platform):
"""#SBATCH -n jobnodes \n""" + \
"""#SBATCH -t jobtime \n""" + \
"""#SBATCH -o joblog \n""" + \
"""module load python\n""" + \
"""module load Python/3.6.6-intel-2018b\n""" + \
"""module load nco\n""" + \
"""\n"""
......
......@@ -11,17 +11,15 @@
! You should have received a copy of the GNU General Public License along with this
! program. If not, see <http://www.gnu.org/licenses/>.
homedir : /projects/0/ctdas/RINGO
sibdir : /projects/0/ctdas/RINGO/EmissionInventories/
biosphere_fluxdir : /projects/0/ctdas/RINGO/EmissionInventories/True/RINGO_ORCHIDEE_GPP_TER_dC14_old.nc
nuclear_timeprofiledir : /projects/0/ctdas/RINGO/EmissionInventories/True/Power_TempDistributionFrance.xlsx
nuclear_fluxdir : /projects/0/ctdas/RINGO/EmissionInventories/True/RINGO_NuclearEmissions_14CO2.nc
bgfdir : /projects/0/ctdas/RINGO/EmissionInventories/
footdir : /projects/0/ctdas/RINGO/STILT_Output/
bounddir : /projects/0/ctdas/RINGO/EmissionInventories/
inputdir : /home/awoude/ctdas_test/ctdas_stilt/input/
outdir : /home/awoude/ctdas_test/ctdas_stilt/output/
inputdir: /projects/0/ctdas/RINGO/inversions/ffdas/input
nsam : 100
ndayscycle : 10
sysstadate : 2016-01-15 00:00:00
cycstadate : 2016-01-20 00:00:00
files_startdate : 2016-01-01 00:00:00
tracerdir : /home/awoude/ctdas_test/ctdas_stilt/exec/da/rc/stilt/tracers
! Now list the tracers separated by a comma, without a space!
tracers : co2,222Rn
num_backtimes: 24
......@@ -157,7 +157,7 @@ class CycleControl(dict):
self[k] = False
if 'date' in k :
self[k] = to_datetime(v)
if k in ['time.start', 'time.end', 'time.finish', 'da.restart.tstamp']:
if k in ['time.start', 'time.end', 'time.finish', 'da.restart.tstamp', 'time.fxstart']:
self[k] = to_datetime(v)
for key in needed_da_items:
if key not in self:
......@@ -333,7 +333,7 @@ class CycleControl(dict):
strippedname = os.path.split(self['jobrcfilename'])[-1]
self['jobrcfilename'] = os.path.join(self['dir.exec'], strippedname)
# shutil.copy(os.path.join(self.dasystem['regionsfile']),os.path.join(self['dir.exec'],'da','analysis','copied_regions.nc'))
logging.info('Copied regions file to the analysis directory: %s'%os.path.join(self.dasystem['regionsfile']))
#logging.info('Copied regions file to the analysis directory: %s'%os.path.join(self.dasystem['regionsfile']))
if 'extendedregionsfile' in self.dasystem:
# shutil.copy(os.path.join(self.dasystem['extendedregionsfile']),os.path.join(self['dir.exec'],'da','analysis','copied_regions_extended.nc'))
logging.info('Copied extended regions file to the analysis directory: %s'%os.path.join(self.dasystem['extendedregionsfile']))
......
......@@ -262,7 +262,7 @@ class RcFile(object) :
Class to store settings read from a rcfile.
"""
def __init__(self, filename, silent=False, marks=('${', '}')) :
def __init__(self, filename, silent=False, marks=('${', '}'), do_eval=True):
"""
......@@ -836,7 +836,6 @@ class RcFile(object) :
#endif
else :
if do_eval:
print(val)
if isinstance(val, str):
if any(operator in val for operator in '+-*/'):
try: val = eval(val)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment