Commit f0075661 authored by Woude, Auke van der's avatar Woude, Auke van der
Browse files

Merge branch 'feature-STILT' of https://git.wur.nl/woude033/CTDAS into feature-STILT

parents 39acbed3 4d1ecc23
......@@ -28,12 +28,17 @@ File created on 28 Jul 2010.
"""
import logging
import logging, sys, os
from numpy import array, ndarray
import datetime as dt
sys.path.append(os.getcwd())
sys.path.append('../../')
identifier = 'Observations baseclass'
version = '0.0'
import da.tools.io4 as io
################### Begin Class Observations ###################
class Observations(object):
......@@ -73,11 +78,15 @@ class Observations(object):
def getlength(self):
return len(self.datalist)
def setup(self, cycleparams):
def setup(self, dacycle):
""" Perform all steps needed to start working with observational data, this can include moving data, concatenating files,
selecting datasets, etc.
"""
self.startdate = dacycle['time.sample.start']
self.enddate = dacycle['time.sample.end']
self.datalist = []
def add_observations(self):
"""
Add actual observation data to the Observations object. This is in a form of an
......@@ -87,20 +96,179 @@ class Observations(object):
"""
def add_simulations(self):
for n in range(10):
self.datalist.append(MoleFractionSample(n, self.startdate+dt.timedelta(hours=n+1),'testobs' , 400+n, 0.0, 0.0, 0.0, 0.0, 0 , 100.0 , 52.0, 6.0 , '%04d'%n , 'co2', 1, 0.0, 'none.nc'))
logging.debug("Added %d observations to the Data list" % (1))
logging.info("Observations list now holds %d values" % len(self.datalist))
def add_simulations(self, filename, silent=False):
""" Add the simulation data to the Observations object.
"""
def add_model_data_mismatch(self):
if not os.path.exists(filename):
msg = "Sample output filename for observations could not be found : %s" % filename
logging.error(msg)
logging.error("Did the sampling step succeed?")
logging.error("...exiting")
raise IOError(msg)
ncf = io.ct_read(filename, method='read')
ids = ncf.get_variable('obs_num')
simulated = ncf.get_variable('flask')
ncf.close()
logging.info("Successfully read data from model sample file (%s)" % filename)
obs_ids = self.getvalues('id').tolist()
ids = list(map(int, ids))
missing_samples = []
for idx, val in zip(ids, simulated):
if idx in obs_ids:
index = obs_ids.index(idx)
self.datalist[index].simulated = val # in mol/mol
else:
missing_samples.append(idx)
if not silent and missing_samples != []:
logging.warning('Model samples were found that did not match any ID in the observation list. Skipping them...')
#msg = '%s'%missing_samples ; logging.warning(msg)
logging.debug("Added %d simulated values to the Data list" % (len(ids) - len(missing_samples)))
def add_model_data_mismatch(self, filename):
"""
Get the model-data mismatch values for this cycle.
"""
self.rejection_threshold = 3.0 # 3-sigma cut-off
self.global_R_scaling = 1.0 # no scaling applied
for obs in self.datalist: # first loop over all available data points to set flags correctly
obs.mdm = 1.0
obs.may_localize = True
obs.may_reject = True
obs.flag = 0
logging.debug("Added Model Data Mismatch to all samples ")
def write_sample_coords(self,obsinputfile):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
"""
if len(self.datalist) == 0:
logging.debug("No observations found for this time period, nothing written to obs file")
else:
f = io.CT_CDF(obsinputfile, method='create')
logging.debug('Creating new observations file for ObservationOperator (%s)' % obsinputfile)
dimid = f.add_dim('obs', len(self.datalist))
dim200char = f.add_dim('string_of200chars', 200)
dim10char = f.add_dim('string_of10chars', 10)
dimcalcomp = f.add_dim('calendar_components', 6)
data = self.getvalues('id')
savedict = io.std_savedict.copy()
savedict['name'] = "obs_num"
savedict['dtype'] = "int"
savedict['long_name'] = "Unique_Dataset_observation_index_number"
savedict['units'] = ""
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = "Unique index number within this dataset ranging from 0 to UNLIMITED."
f.add_data(savedict)
data = [[d.year, d.month, d.day, d.hour, d.minute, d.second] for d in self.getvalues('xdate') ]
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "date_components"
savedict['units'] = "integer components of UTC date/time"
savedict['dims'] = dimid + dimcalcomp
savedict['values'] = data
savedict['missing_value'] = -9
savedict['comment'] = "Calendar date components as integers. Times and dates are UTC."
savedict['order'] = "year, month, day, hour, minute, second"
f.add_data(savedict)
data = self.getvalues('lat')
savedict = io.std_savedict.copy()
savedict['name'] = "latitude"
savedict['units'] = "degrees_north"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('lon')
savedict = io.std_savedict.copy()
savedict['name'] = "longitude"
savedict['units'] = "degrees_east"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('height')
savedict = io.std_savedict.copy()
savedict['name'] = "altitude"
savedict['units'] = "meters_above_sea_level"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = self.getvalues('samplingstrategy')
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['name'] = "sampling_strategy"
savedict['units'] = "NA"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['missing_value'] = -9
f.add_data(savedict)
data = self.getvalues('obs')
savedict = io.std_savedict.copy()
savedict['name'] = "observed"
savedict['long_name'] = "observedvalues"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Observations used in optimization'
f.add_data(savedict)
data = self.getvalues('mdm')
savedict = io.std_savedict.copy()
savedict['name'] = "modeldatamismatch"
savedict['long_name'] = "modeldatamismatch"
savedict['units'] = "[mol mol-1]"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch'
f.add_data(savedict)
f.close()
logging.debug("Successfully wrote data to obs file")
logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile)
def write_sample_auxiliary(self, auxoutputfile):
"""
Write selected additional information contained in the Observations object to a file for later processing.
......@@ -118,4 +286,40 @@ class Observations(object):
################### End Class Observations ###################
################### Begin Class MoleFractionSample ###################
class MoleFractionSample(object):
"""
Holds the data that defines a mole fraction Sample in the data assimilation framework. Sor far, this includes all
attributes listed below in the __init__ method. One can additionally make more types of data, or make new
objects for specific projects.
"""
def __init__(self, idx, xdate, code='XXX', obs=0.0, simulated=0.0, resid=0.0, hphr=0.0, mdm=0.0, flag=0, height=0.0, lat= -999., lon= -999., evn='0000', species='co2', samplingstrategy=1, sdev=0.0, fromfile='none.nc'):
self.code = code.strip() # dataset identifier, i.e., co2_lef_tower_insitu_1_99
self.xdate = xdate # Date of obs
self.obs = obs # Value observed
self.simulated = simulated # Value simulated by model
self.resid = resid # Mole fraction residuals
self.hphr = hphr # Mole fraction prior uncertainty from fluxes and (HPH) and model data mismatch (R)
self.mdm = mdm # Model data mismatch
self.may_localize = True # Whether sample may be localized in optimizer
self.may_reject = True # Whether sample may be rejected if outside threshold
self.flag = flag # Flag
self.height = height # Sample height in masl
self.lat = lat # Sample lat
self.lon = lon # Sample lon
self.id = idx # Obspack ID within distrution (integer), e.g., 82536
self.evn = evn # Obspack Number within distrution (string), e.g., obspack_co2_1_PROTOTYPE_v0.9.2_2012-07-26_99_82536
self.sdev = sdev # standard deviation of ensemble
self.masl = True # Sample is in Meters Above Sea Level
self.mag = not self.masl # Sample is in Meters Above Ground
self.species = species.strip()
self.samplingstrategy = samplingstrategy
self.fromfile = fromfile # netcdf filename inside ObsPack distribution, to write back later
################### End Class MoleFractionSample ###################
......@@ -94,7 +94,7 @@ class ObservationOperator(object):
import da.tools.io4 as io
import numpy as np
# Create a flask output file in TM5-style (to be updated later?) to hold simulated values for later reading
# Create a flask output file to hold simulated values for later reading
f = io.CT_CDF(self.simulated_file, method='create')
logging.debug('Creating new simulated observation file in ObservationOperator (%s)' % self.simulated_file)
......
......@@ -402,19 +402,46 @@ class Optimizer(object):
logging.info('Minimum Least Squares solution was calculated, returning')
def set_localization(self):
def set_localization(self, loctype='None'):
""" determine which localization to use """
self.localization = True
self.localizetype = "None"
if loctype == 'CT2007':
self.localization = True
self.localizetype = 'CT2007'
#T-test values for two-tailed student's T-test using 95% confidence interval for some options of nmembers
if self.nmembers == 50:
self.tvalue = 2.0086
elif self.nmembers == 100:
self.tvalue = 1.9840
elif self.nmembers == 150:
self.tvalue = 1.97591
elif self.nmembers == 200:
self.tvalue = 1.9719
else: self.tvalue = 0
else:
self.localization = False
self.localizetype = 'None'
logging.info("Current localization option is set to %s" % self.localizetype)
if self.localization == True:
if self.tvalue == 0:
logging.error("Critical tvalue for localization not set for %i ensemble members"%(self.nmembers))
sys.exit(2)
else: logging.info("Used critical tvalue %0.05f is based on 95%% probability and %i ensemble members in a two-tailed student's T-test"%(self.tvalue,self.nmembers))
def localize(self, n):
""" localize the Kalman Gain matrix """
logging.debug('Not localized observation %s, %i' % (self.sitecode[n], self.obs_ids[n]))
def set_algorithm(self):
self.algorithm = 'Serial'
logging.info("Current algorithm is set to %s in baseclass optimizer" % self.algorithm)
def set_algorithm(self, algorithm='Serial'):
""" determine which minimum least squares algorithm to use """
if algorithm == 'Serial':
self.algorithm = 'Serial'
else:
self.algorithm = 'Bulk'
logging.info("Current minimum least squares algorithm is set to %s" % self.algorithm)
################### End Class Optimizer ###################
......
......@@ -254,27 +254,27 @@ class ObsPackObservations(Observations):
savedict['missing_value'] = '!'
f.add_data(savedict)
data = self.getvalues('obs')
savedict = io.std_savedict.copy()
savedict['name'] = "observed"
savedict['long_name'] = "observedvalues"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Observations used in optimization'
f.add_data(savedict)
data = self.getvalues('obs')
savedict = io.std_savedict.copy()
savedict['name'] = "observed"
savedict['long_name'] = "observedvalues"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Observations used in optimization'
f.add_data(savedict)
data = self.getvalues('mdm')
data = self.getvalues('mdm')
savedict = io.std_savedict.copy()
savedict['name'] = "modeldatamismatch"
savedict['long_name'] = "modeldatamismatch"
savedict['units'] = "[mol mol-1]"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch'
f.add_data(savedict)
savedict = io.std_savedict.copy()
savedict['name'] = "modeldatamismatch"
savedict['long_name'] = "modeldatamismatch"
savedict['units'] = "[mol mol-1]"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch'
f.add_data(savedict)
f.close()
......
......@@ -39,7 +39,7 @@ class CartesiusPlatform(Platform):
"""
Returns a blocking flag, which is important if tm5 is submitted in a queue system. The python ctdas code is forced to wait before tm5 run is finished
-on Huygens: return "-s"
-on Huygens: return "-s"
-on Maunaloa: return "" (no queue available)
-on Jet/Zeus: return
"""
......@@ -102,7 +102,7 @@ class CartesiusPlatform(Platform):
"""#SBATCH -o joblog \n""" + \
"""module load python\n""" + \
"""module load nco\n""" + \
"""\n"""
"""\n"""
if 'depends' in joboptions:
template += """#$ -hold_jid depends \n"""
......@@ -151,16 +151,16 @@ class CartesiusPlatform(Platform):
# jobid = output.split()[2]
# retcode = output.split()[-1]
#
#
# #for huygens
# print 'output', output
# test = output.split()[3]
# print 'output', output
# test = output.split()[3]
# dummy, jobid =test.split('nl.')
# jobid='%s%s' %('"',jobid)
# submitmsg ='%s%s%s'%(output.split()[4],output.split()[5],output.split()[6])
# jobid='%s%s' %('"',jobid)
# submitmsg ='%s%s%s'%(output.split()[4],output.split()[5],output.split()[6])
# if submitmsg=='hasbeensubmitted.':
# retcode=2
# print 'retcode',submitmsg,retcode
# retcode=2
# print 'retcode',submitmsg,retcode
# return retcode
#
# def KillJob(self,jobid):
......
......@@ -11,23 +11,9 @@
! You should have received a copy of the GNU General Public License along with this
! program. If not, see <http://www.gnu.org/licenses/>.
!!! Info for the CarbonTracker data assimilation system
datadir : /Storage/CO2/carbontracker/input/ctdas_2016/
! For ObsPack
obspack.input.id : obspack_co2_1_GLOBALVIEWplus_v2.1_2016-09-02
obspack.input.dir : ${datadir}/obspacks/${obspack.input.id}
obs.sites.rc : ${obspack.input.dir}/summary/sites_weights_Dec2016.rc
! Using a second ObsPack (from 1 January 2016)
!obspack.input.id2 : obspack_co2_1_NRT_v3.0_2016-06-06
!obspack.input.dir2 : ${datadir}/obspacks/${obspack.input.id2}
!obs.sites.rc2 : ${obspack.input.dir2}/summary/sites_weights_Dec2016.rc
ocn.covariance : ${datadir}/oceans/oif/cov_ocean.2000.01.nc
deltaco2.prefix : oif_p3_era40.dpco2
bio.cov.dir : ${datadir}/covariances/gridded_NH/
bio.cov.prefix : cov_ecoregion
......@@ -35,12 +21,5 @@ regtype : gridded_oif30
nparameters : 9835
random.seed : 4385
regionsfile : ${datadir}/covariances/gridded_NH/griddedNHparameters.nc
!random.seed.init: ${datadir}/randomseedinit.pickle
! Include a naming scheme for the variables
#include NamingScheme.wp_Mar2011.rc
! Info on the sites file used
obs.sites.rc : Random
......@@ -67,7 +67,7 @@ class ObsPackObservations(Observations):
We will loop over all site files in the ObsPackage, and subset each to our needs
"""
import string
import string
# Step 1: Read list of available site files in package
......@@ -98,11 +98,11 @@ class ObsPackObservations(Observations):
subselect = logical_and(dates >= self.startdate , dates <= self.enddate).nonzero()[0]
if len(subselect) == 0:
if len(subselect) == 0:
ncf.close()
continue
continue
logging.debug("Trying to add %d observations from file (%s) to the Data list" % (len(subselect), ncfile))
logging.debug("Trying to add %d observations from file (%s) to the Data list" % (len(subselect), ncfile))
dates = dates.take(subselect, axis=0)
......@@ -321,27 +321,27 @@ class ObsPackObservations(Observations):
savedict['missing_value'] = '!'
f.add_data(savedict)
data = data_obs
savedict = io.std_savedict.copy()
savedict['name'] = "observed"
savedict['long_name'] = "observedvalues"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Observations used in optimization'
f.add_data(savedict)
data = data_mdm
savedict = io.std_savedict.copy()
savedict['name'] = "modeldatamismatch"
savedict['long_name'] = "modeldatamismatch"
savedict['units'] = "[mol mol-1]"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch'
f.add_data(savedict)
data = data_obs
savedict = io.std_savedict.copy()
savedict['name'] = "observed"
savedict['long_name'] = "observedvalues"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Observations used in optimization'
f.add_data(savedict)
data = data_mdm
savedict = io.std_savedict.copy()
savedict['name'] = "modeldatamismatch"
savedict['long_name'] = "modeldatamismatch"
savedict['units'] = "[mol mol-1]"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch'
f.add_data(savedict)
f.close()
......
......@@ -325,7 +325,7 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
logging.info(" ... No simulations needed for this cycle of lag")
logging.info(" ... No residuals needed for this cycle of lag")
logging.info(" ... Proceeding to next cycle of lag")
return
return
samples.setup(dacycle)
samples.add_observations()
......
......@@ -343,7 +343,9 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
if os.path.exists(sampling_coords_file):
samples.add_simulations(obsoperator.simulated_file)
else: logging.warning("No simulations added, because input file does not exist (no samples found in obspack)")
logging.debug("Simulated values read, continuing pipeline")
else:
logging.warning("No simulations added, because input file does not exist (no samples found in obspack)")
# Now write a small file that holds for each observation a number of values that we need in postprocessing
......
......@@ -30,7 +30,7 @@ from da.tools.pipeline import ensemble_smoother_pipeline, header, footer
from da.platform.capegrim import CapeGrimPlatform
from da.baseclasses.dasystem import DaSystem
from da.co2gridded.statevector import CO2GriddedStateVector
from da.carbondioxide.obspack_globalviewplus2 import ObsPackObservations
from da.baseclasses.obs import Observations
from da.baseclasses.optimizer import Optimizer
from da.baseclasses.observationoperator import RandomizerObservationOperator
......@@ -53,7 +53,7 @@ dacycle = CycleControl(opts, args)
platform = CapeGrimPlatform()
dasystem = DaSystem(dacycle['da.system.rc'])
obsoperator = RandomizerObservationOperator(dacycle['da.obsoperator.rc'])
samples = ObsPackObservations()
samples = Observations()
statevector = CO2GriddedStateVector()
optimizer = Optimizer()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment