Commit 4c44bd77 authored by Auke van der Woude's avatar Auke van der Woude
Browse files

Merge branch 'master' into feature-STILT

parents 969b2682 c06bd5b2
......@@ -28,12 +28,17 @@ File created on 28 Jul 2010.
"""
import logging
import logging, sys, os
from numpy import array, ndarray
import datetime as dt
sys.path.append(os.getcwd())
sys.path.append('../../')
identifier = 'Observations baseclass'
version = '0.0'
import da.tools.io4 as io
################### Begin Class Observations ###################
class Observations(object):
......@@ -73,11 +78,15 @@ class Observations(object):
def getlength(self):
return len(self.datalist)
def setup(self, cycleparams):
def setup(self, dacycle):
""" Perform all steps needed to start working with observational data, this can include moving data, concatenating files,
selecting datasets, etc.
"""
self.startdate = dacycle['time.sample.start']
self.enddate = dacycle['time.sample.end']
self.datalist = []
def add_observations(self):
"""
Add actual observation data to the Observations object. This is in a form of an
......@@ -87,20 +96,179 @@ class Observations(object):
"""
def add_simulations(self):
for n in range(10):
self.datalist.append(MoleFractionSample(n, self.startdate+dt.timedelta(hours=n+1),'testobs' , 400+n, 0.0, 0.0, 0.0, 0.0, 0 , 100.0 , 52.0, 6.0 , '%04d'%n , 'co2', 1, 0.0, 'none.nc'))
logging.debug("Added %d observations to the Data list" % (1))
logging.info("Observations list now holds %d values" % len(self.datalist))
def add_simulations(self, filename, silent=False):
""" Add the simulation data to the Observations object.
"""
def add_model_data_mismatch(self):
if not os.path.exists(filename):
msg = "Sample output filename for observations could not be found : %s" % filename
logging.error(msg)
logging.error("Did the sampling step succeed?")
logging.error("...exiting")
raise IOError(msg)
ncf = io.ct_read(filename, method='read')
ids = ncf.get_variable('obs_num')
simulated = ncf.get_variable('flask')
ncf.close()
logging.info("Successfully read data from model sample file (%s)" % filename)
obs_ids = self.getvalues('id').tolist()
ids = list(map(int, ids))
missing_samples = []
for idx, val in zip(ids, simulated):
if idx in obs_ids:
index = obs_ids.index(idx)
self.datalist[index].simulated = val # in mol/mol
else:
missing_samples.append(idx)
if not silent and missing_samples != []:
logging.warning('Model samples were found that did not match any ID in the observation list. Skipping them...')
#msg = '%s'%missing_samples ; logging.warning(msg)
logging.debug("Added %d simulated values to the Data list" % (len(ids) - len(missing_samples)))
def add_model_data_mismatch(self, filename):
"""
Get the model-data mismatch values for this cycle.
"""
self.rejection_threshold = 3.0 # 3-sigma cut-off
self.global_R_scaling = 1.0 # no scaling applied
for obs in self.datalist: # first loop over all available data points to set flags correctly
obs.mdm = 1.0
obs.may_localize = True
obs.may_reject = True
obs.flag = 0
logging.debug("Added Model Data Mismatch to all samples ")
def write_sample_coords(self,obsinputfile):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
"""
if len(self.datalist) == 0:
logging.debug("No observations found for this time period, nothing written to obs file")
else:
f = io.CT_CDF(obsinputfile, method='create')
logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile)
dimid = f.add_dim('obs', len(self.datalist))
dim200char = f.add_dim('string_of200chars', 200)
dim10char = f.add_dim('string_of10chars', 10)
dimcalcomp = f.add_dim('calendar_components', 6)
data = self.getvalues('id')
savedict = io.std_savedict.copy()
savedict['name'] = "obs_num"
savedict['dtype'] = "int"
savedict['long_name'] = "Unique_Dataset_observation_index_number"
savedict['units'] = ""
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED."
f.add_data(savedict)
data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate') ]
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "date_components"
savedict['units'] = "integer components of UTC date/time"
savedict['dims'] = dimid + dimcalcomp
savedict['values'] = data
savedict['missing_value'] = -9
savedict['comment'] = "Calendar date components as integers. Times and dates are UTC."
savedict['order'] = "year, month, day, hour, minute, second"
f.add_data(savedict)
data = self.getvalues('lat')
savedict = io.std_savedict.copy()
savedict['name'] = "latitude"
savedict['units'] = "degrees_north"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('lon')
savedict = io.std_savedict.copy()
savedict['name'] = "longitude"
savedict['units'] = "degrees_east"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('height')
savedict = io.std_savedict.copy()
savedict['name'] = "altitude"
savedict['units'] = "meters_above_sea_level"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('samplingstrategy')
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "sampling_strategy"
savedict['units'] = "NA"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -9
f.add_data(savedict)
data = self.getvalues('obs')
savedict = io.std_savedict.copy()
savedict['name'] = "observed"
savedict['long_name'] = "observedvalues"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Observations used in optimization'
f.add_data(savedict)
data = self.getvalues('mdm')
savedict = io.std_savedict.copy()
savedict['name'] = "modeldatamismatch"
savedict['long_name'] = "modeldatamismatch"
savedict['units'] = "[mol mol-1]"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch'
f.add_data(savedict)
f.close()
logging.debug("Successfully wrote data to obs file")
logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile)
def write_sample_auxiliary(self, auxoutputfile):
"""
Write selected additional information contained in the Observations object to a file for later processing.
......@@ -118,4 +286,40 @@ class Observations(object):
################### End Class Observations ###################
################### Begin Class MoleFractionSample ###################
class MoleFractionSample(object):
"""
Holds the data that defines a mole fraction Sample in the data assimilation framework. Sor far, this includes all
attributes listed below in the __init__ method. One can additionally make more types of data, or make new
objects for specific projects.
"""
def __init__(self, idx, xdate, code='XXX', obs=0.0, simulated=0.0, resid=0.0, hphr=0.0, mdm=0.0, flag=0, height=0.0, lat= -999., lon= -999., evn='0000', species='co2', samplingstrategy=1, sdev=0.0, fromfile='none.nc'):
self.code = code.strip() # dataset identifier, i.e., co2_lef_tower_insitu_1_99
self.xdate = xdate # Date of obs
self.obs = obs # Value observed
self.simulated = simulated # Value simulated by model
self.resid = resid # Mole fraction residuals
self.hphr = hphr # Mole fraction prior uncertainty from fluxes and (HPH) and model data mismatch (R)
self.mdm = mdm # Model data mismatch
self.may_localize = True # Whether sample may be localized in optimizer
self.may_reject = True # Whether sample may be rejected if outside threshold
self.flag = flag # Flag
self.height = height # Sample height in masl
self.lat = lat # Sample lat
self.lon = lon # Sample lon
self.id = idx # Obspack ID within distrution (integer), e.g., 82536
self.evn = evn # Obspack Number within distrution (string), e.g., obspack_co2_1_PROTOTYPE_v0.9.2_2012-07-26_99_82536
self.sdev = sdev # standard deviation of ensemble
self.masl = True # Sample is in Meters Above Sea Level
self.mag = not self.masl # Sample is in Meters Above Ground
self.species = species.strip()
self.samplingstrategy = samplingstrategy
self.fromfile = fromfile # netcdf filename inside ObsPack distribution, to write back later
################### End Class MoleFractionSample ###################
......@@ -94,7 +94,7 @@ class ObservationOperator(object):
import da.tools.io4 as io
import numpy as np
# Create a flask output file in TM5-style (to be updated later?) to hold simulated values for later reading
# Create a flask output file to hold simulated values for later reading
f = io.CT_CDF(self.simulated_file, method='create')
logging.debug('Creating new simulated observation file in ObservationOperator (%s)' % self.simulated_file)
......
......@@ -402,19 +402,46 @@ class Optimizer(object):
logging.info('Minimum Least Squares solution was calculated, returning')
def set_localization(self):
def set_localization(self, loctype='None'):
""" determine which localization to use """
if loctype == 'CT2007':
self.localization = True
self.localizetype = "None"
self.localizetype = 'CT2007'
#T-test values for two-tailed student's T-test using 95% confidence interval for some options of nmembers
if self.nmembers == 50:
self.tvalue = 2.0086
elif self.nmembers == 100:
self.tvalue = 1.9840
elif self.nmembers == 150:
self.tvalue = 1.97591
elif self.nmembers == 200:
self.tvalue = 1.9719
else: self.tvalue = 0
else:
self.localization = False
self.localizetype = 'None'
logging.info("Current localization option is set to %s" % self.localizetype)
if self.localization == True:
if self.tvalue == 0:
logging.error("Critical tvalue for localization not set for %i ensemble members"%(self.nmembers))
sys.exit(2)
else: logging.info("Used critical tvalue %0.05f is based on 95%% probability and %i ensemble members in a two-tailed student's T-test"%(self.tvalue,self.nmembers))
def localize(self, n):
""" localize the Kalman Gain matrix """
logging.debug('Not localized observation %s, %i' % (self.sitecode[n], self.obs_ids[n]))
def set_algorithm(self):
def set_algorithm(self, algorithm='Serial'):
""" determine which minimum least squares algorithm to use """
if algorithm == 'Serial':
self.algorithm = 'Serial'
logging.info("Current algorithm is set to %s in baseclass optimizer" % self.algorithm)
else:
self.algorithm = 'Bulk'
logging.info("Current minimum least squares algorithm is set to %s" % self.algorithm)
################### End Class Optimizer ###################
......
......@@ -56,7 +56,7 @@ class CO2GriddedDaSystem(DaSystem):
'regtype']
for k, v in self.iteritems():
for k, v in self.items():
if v == 'True' : self[k] = True
if v == 'False': self[k] = False
......
......@@ -145,7 +145,7 @@ class CO2GriddedStateVector(StateVector):
dof += np.sum(s) ** 2 / sum(s ** 2)
try:
C = np.linalg.cholesky(matrix)
except np.linalg.linalg.LinAlgError, err:
except np.linalg.linalg.LinAlgError as err:
logging.error('Cholesky decomposition has failed ')
logging.error('For a matrix of dimensions: %d' % matrix.shape[0])
logging.debug(err)
......
......@@ -89,7 +89,7 @@ class CO2GriddedStateVector(StateVector):
if 'pco2' in file or 'cov_ocean' in file:
cov_ocn = f.get_variable('CORMAT')
parnr = range(9805,9835)
parnr = list(range(9805,9835))
cov = cov_ocn
elif 'tropic' in file:
cov = f.get_variable('covariance')
......@@ -157,7 +157,7 @@ class CO2GriddedStateVector(StateVector):
dof += np.sum(s) ** 2 / sum(s ** 2)
try:
C = np.linalg.cholesky(matrix)
except np.linalg.linalg.LinAlgError, err:
except np.linalg.linalg.LinAlgError as err:
logging.error('Cholesky decomposition has failed ')
logging.error('For a matrix of dimensions: %d' % matrix.shape[0])
logging.debug(err)
......
......@@ -108,12 +108,12 @@ class CartesiusPlatform(Platform):
template += """#$ -hold_jid depends \n"""
# First replace from passed dictionary
for k, v in joboptions.items():
for k, v in list(joboptions.items()):
while k in template:
template = template.replace(k, v)
# Fill remaining values with std_options
for k, v in std_joboptions.items():
for k, v in list(std_joboptions.items()):
while k in template:
template = template.replace(k, v)
......@@ -132,9 +132,9 @@ class CartesiusPlatform(Platform):
logging.info("A new task will be started (%s)" % cmd)
output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0]
logging.info(output)
print 'output', output
print('output', output)
jobid = output.split()[-1]
print 'jobid', jobid
print('jobid', jobid)
else:
cmd = ["sbatch", jobfile]
logging.info("A new job will be submitted (%s)" % cmd)
......
......@@ -11,39 +11,15 @@
! You should have received a copy of the GNU General Public License along with this
! program. If not, see <http://www.gnu.org/licenses/>.
!!! Info for the CarbonTracker data assimilation system
datadir : /store/empa/em05/parsenov/
! For ObsPack
obspack.input.id : obspack
obspack.input.dir : ${datadir}
!/obspack !/${obspack.input.id}
obs.sites.rc : ${obspack.input.dir}/summary/sites_weights_ctdas.rc
! Using a second ObsPack (from 1 January 2016)
!obspack.input.id2 : obspack_co2_1_NRT_v3.0_2016-06-06
!obspack.input.dir2 : ${datadir}/obspacks/${obspack.input.id2}
!obs.sites.rc2 : ${obspack.input.dir2}/summary/sites_weights_Dec2016.rc
!ocn.covariance : ${datadir}/oceans/oif/cov_ocean.2000.01.nc
!deltaco2.prefix : oif_p3_era40.dpco2
!bio.cov.dir : ${datadir}/covariances/gridded_NH/
!bio.cov.prefix : cov_ecoregion
!regtype : gridded_oif30
nparameters : 11
!random.seed : 4385
regionsfile : /store/empa/em05/parsenov/CTDAS/ctdas-py3/da/analysis/regions_ok.nc
!extendedregionsfile : /store/empa/em05/parsenov/CTDAS/ctdas-py3/da/analysis/regions_ok.nc
!regionsfile : ${datadir}/covariances/gridded_NH/griddedNHparameters.nc
!random.seed.init: ${datadir}/randomseedinit.pickle
! Include a naming scheme for the variables
#include NamingScheme.wp_Mar2011.rc
! Info on the sites file used
datadir : /Storage/CO2/carbontracker/input/ctdas_2016/
ocn.covariance : ${datadir}/oceans/oif/cov_ocean.2000.01.nc
deltaco2.prefix : oif_p3_era40.dpco2
bio.cov.dir : ${datadir}/covariances/gridded_NH/
bio.cov.prefix : cov_ecoregion
regtype : gridded_oif30
nparameters : 9835
random.seed : 4385
regionsfile : ${datadir}/covariances/gridded_NH/griddedNHparameters.nc
obs.sites.rc : Random
......@@ -343,7 +343,9 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
if os.path.exists(sampling_coords_file):
samples.add_simulations(obsoperator.simulated_file)
else: logging.warning("No simulations added, because input file does not exist (no samples found in obspack)")
logging.debug("Simulated values read, continuing pipeline")
else:
logging.warning("No simulations added, because input file does not exist (no samples found in obspack)")
# Now write a small file that holds for each observation a number of values that we need in postprocessing
......
......@@ -5,10 +5,8 @@
#$ 06:30:00
#$ /bin/sh
echo "All output piped to file template.log"
source /usr/local/Modules/3.2.8/init/sh
source /opt/intel/bin/ifortvars.sh intel64
export HOST='daint'
module load python
export icycle_in_job=999
echo "All output piped to file ct_base_test.log"
export HOST='capegrim'
export icycle_in_job=1
python --version
python template.py rc=template.rc -v $1 >& template.log &
......@@ -29,12 +29,11 @@ from da.tools.initexit import start_logger, validate_opts_args, parse_options, C
from da.tools.pipeline import ensemble_smoother_pipeline, header, footer
from da.platform.capegrim import CapeGrimPlatform
from da.baseclasses.dasystem import DaSystem
from da.baseclasses.statevector import StateVector
from da.carbondioxide.obspack_globalviewplus2 import ObsPackObservations
from da.carbondioxide.optimizer import CO2Optimizer
from da.baseclasses.observationoperator import ObservationOperator
#from da.analysis.expand_fluxes import save_weekly_avg_1x1_data, save_weekly_avg_state_data, save_weekly_avg_tc_data, save_weekly_avg_ext_tc_data
#from da.analysis.expand_molefractions import write_mole_fractions
from da.co2gridded.statevector import CO2GriddedStateVector
from da.baseclasses.obs import Observations
from da.baseclasses.optimizer import Optimizer
from da.baseclasses.observationoperator import RandomizerObservationOperator
#################################################################################################
......@@ -51,13 +50,12 @@ opts, args = validate_opts_args(opts, args)
dacycle = CycleControl(opts, args)
platform = MaunaloaPlatform()
platform = CapeGrimPlatform()
dasystem = DaSystem(dacycle['da.system.rc'])
obsoperator = ObservationOperator(dacycle['da.obsoperator.rc'])
samples = ObsPackObservations()
statevector = StateVector()
optimizer = CO2Optimizer()
obsoperator = RandomizerObservationOperator(dacycle['da.obsoperator.rc'])
samples = Observations()
statevector = CO2GriddedStateVector()
optimizer = Optimizer()
##########################################################################################
################### ENTER THE PIPELINE WITH THE OBJECTS PASSED BY THE USER ###############
......
......@@ -28,8 +28,8 @@
!
! The time for which to start and end the data assimilation experiment in format YYYY-MM-DD HH:MM:SS
time.start : 2016-01-01 00:00:00
time.finish : 2016-02-01 00:00:00
time.start : 2012-01-01 00:00:00
time.finish : 2012-02-01 00:00:00
! Whether to restart the CTDAS system from a previous cycle, or to start the sequence fresh. Valid entries are T/F/True/False/TRUE/FALSE
......@@ -37,16 +37,16 @@ time.restart : False
! The length of a cycle is given in days, such that the integer 7 denotes the typically used weekly cycle. Valid entries are integers > 1
time.cycle : 7
time.cycle : 1
! The number of cycles of lag to use for a smoother version of CTDAS. CarbonTracker CO2 typically uses 5 weeks of lag. Valid entries are integers > 0
time.nlag : 3
time.nlag : 2
! The directory under which the code, input, and output will be stored. This is the base directory for a run. The word
! '/' will be replaced through the start_ctdas.sh script by a user-specified folder name. DO NOT REPLACE
dir.da_run : /scratch/snx3000/parsenov/projname
dir.da_run : template
! The resources used to complete the data assimilation experiment. This depends on your computing platform.
! The number of cycles per job denotes how many cycles should be completed before starting a new process or job, this
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment