Commit c0706f73 authored by Woude, Auke van der's avatar Woude, Auke van der
Browse files

write to py3

parent b4e4a62a
......@@ -44,14 +44,14 @@ class CO2DaSystem(DaSystem):
'run.obsflag']
for k, v in self.iteritems():
for k, v in self.items():
if v == 'True' :
self[k] = True
if v == 'False':
self[k] = False
for key in needed_rc_items:
if not self.has_key(key):
if key not in self:
logging.warning('Missing a required value in rc-file : %s' % key)
logging.debug('DA System Info settings have been validated succesfully')
......
......@@ -314,7 +314,8 @@ class EmisModel(object):
infile = os.path.join(self.emisdir, 'temporal_data_%03d.nc'%n)
tpr = io.ct_read(infile, method='read')
itimes = tpr.get_variable('Times')
times = array([dtm.datetime(int(''.join(d[:4])),int(''.join(d[5:7])),int(''.join(d[8:10])),int(''.join(d[11:13])),int(''.join(d[14:16]))) for d in itimes])
itimes = [b''.join(d) for d in itimes]
times = array([dtm.datetime.strptime(d.decode(), '%Y-%m-%d_%H:%M:%S') for d in itimes])
if psdo == 1:
startdum=dtm.datetime(self.startdate.year,self.startdate.month,self.startdate.day-1,1,0)
subselect = logical_and(times >= startdum, times <= self.enddate).nonzero()[0]
......
......@@ -44,7 +44,7 @@ class RdamObservations(Observations):
if not os.path.exists(op_dir):
msg = 'Could not find the required ObsPack distribution (%s) ' % op_dir
logging.error(msg)
raise IOError, msg
raise IOError(msg)
else:
self.obspack_dir = op_dir
self.obspack_id = op_id
......@@ -80,10 +80,10 @@ class RdamObservations(Observations):
infile = os.path.join(self.obspack_dir, ncfile + '.nc')
ncf = io.ct_read(infile, 'read')
idates = ncf.get_variable('Times')
dates = array([dtm.datetime(int(''.join(d[:4])),int(''.join(d[5:7])),int(''.join(d[8:10])),int(''.join(d[11:13])),0) for d in idates])
idates = [b''.join(d) for d in idates]
dates = array([dtm.datetime.strptime(d.decode(), '%Y-%m-%d_%H:%M:%S') for d in idates])
subselect = logical_and(dates >= self.startdate , dates <= self.enddate).nonzero()[0]
dates = dates.take(subselect, axis=0)
datasetname = ncfile # use full name of dataset to propagate for clarity
......@@ -111,7 +111,7 @@ class RdamObservations(Observations):
logging.error(msg)
logging.error("Did the sampling step succeed?")
logging.error("...exiting")
raise IOError, msg
raise IOError(msg)
ncf = io.ct_read(filename, method='read')
ids = ncf.get_variable('obs_num')
......@@ -120,7 +120,7 @@ class RdamObservations(Observations):
logging.info("Successfully read data from model sample file (%s)" % filename)
obs_ids = self.getvalues('id').tolist()
ids = map(int, ids)
ids = list(map(int, ids))
missing_samples = []
......@@ -209,13 +209,12 @@ class RdamObservations(Observations):
savedict['missing_value'] = -999.9
f.add_data(savedict)
data = np.array(self.tracer_list)
data = np.array(self.getvalues('species'))#tracer_list)
savedict = io.std_savedict.copy()
savedict['dtype'] = "int"
savedict['dtype'] = "char"
savedict['name'] = "source_type"
savedict['units'] = "NA"
savedict['dims'] = dimid
savedict['dims'] = dimid + dim10char
savedict['values'] = data.tolist()
savedict['missing_value'] = -9
f.add_data(savedict)
......@@ -231,29 +230,29 @@ class RdamObservations(Observations):
savedict['missing_value'] = '!'
f.add_data(savedict)
data = self.getvalues('obs')
savedict = io.std_savedict.copy()
savedict['name'] = "observed"
savedict['long_name'] = "observedvalues"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Observations used in optimization'
f.add_data(savedict)
data = self.getvalues('mdm')
data = self.getvalues('obs')
savedict = io.std_savedict.copy()
savedict['name'] = "modeldatamismatch"
savedict['long_name'] = "modeldatamismatch"
savedict['units'] = "[mol mol-1]"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch'
f.add_data(savedict)
f.close()
savedict = io.std_savedict.copy()
savedict['name'] = "observed"
savedict['long_name'] = "observedvalues"
savedict['units'] = "mol mol-1"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Observations used in optimization'
f.add_data(savedict)
data = self.getvalues('mdm')
savedict = io.std_savedict.copy()
savedict['name'] = "modeldatamismatch"
savedict['long_name'] = "modeldatamismatch"
savedict['units'] = "[mol mol-1]"
savedict['dims'] = dimid
savedict['values'] = data.tolist()
savedict['comment'] = 'Standard deviation of mole fractions resulting from model-data mismatch'
f.add_data(savedict)
f.close()
logging.debug("Successfully wrote data to obs file")
logging.info("Sample input file for obs operator now in place [%s]" % obsinputfile)
......@@ -271,7 +270,7 @@ class RdamObservations(Observations):
if not os.path.exists(filename):
msg = 'Could not find the required sites.rc input file (%s) ' % filename
logging.error(msg)
raise IOError, msg
raise IOError(msg)
else:
self.sites_file = filename
......@@ -285,7 +284,7 @@ class RdamObservations(Observations):
logging.warning('Model-data mismatch scaling factor : %f ' % self.global_R_scaling)
logging.debug('Model-data mismatch site categories : %d ' % self.n_site_categories)
cats = [k for k in sites_weights.keys() if 'site.category' in k]
cats = [k for k in list(sites_weights.keys()) if 'site.category' in k]
site_categories = {}
for key in cats:
......@@ -300,7 +299,7 @@ class RdamObservations(Observations):
site_move = {}
site_hourly = {} # option added to include only certain hours of the day (for e.g. PAL) IvdL
site_incalt = {} # option to increase sampling altitude for sites specified in sites and weights file
for key, value in sites_weights.iteritems():
for key, value in sites_weights.items():
if 'obsfile' in key: # to be fixed later, do not yet know how to parse valid keys from rc-files yet.... WP
sitename, sitecategory = key, value
sitename = sitename.strip()
......@@ -325,8 +324,8 @@ class RdamObservations(Observations):
identifier = obs.code
# species, site, method, lab, datasetnr = identifier.split('_')
if site_info.has_key(identifier):
if site_hourly.has_key(identifier):
if identifier in site_info:
if identifier in site_hourly:
obs.samplingstrategy = 2
hourf, hourt = site_hourly[identifier]
if int(obs.xdate.hour) >= hourf and int(obs.xdate.hour) <= hourt:
......@@ -351,7 +350,7 @@ class RdamObservations(Observations):
else:
logging.warning("Observation NOT found (%s, %d), please check sites.rc file (%s) !!!" % (identifier, obs.id, self.sites_file))
if site_move.has_key(identifier):
if identifier in site_move:
movelat, movelon = site_move[identifier]
obs.lat = obs.lat + movelat
......@@ -359,7 +358,7 @@ class RdamObservations(Observations):
logging.warning("Observation location for (%s, %d), is moved by %3.2f degrees latitude and %3.2f degrees longitude" % (identifier, obs.id, movelat, movelon))
if site_incalt.has_key(identifier):
if identifier in site_incalt:
incalt = site_incalt[identifier]
obs.height = obs.height + incalt
......@@ -387,7 +386,7 @@ class RdamObservations(Observations):
logging.error(msg)
logging.error("Did the sampling step succeed?")
logging.error("...exiting")
raise IOError, msg
raise IOError(msg)
ncf = io.ct_read(filename, method='read')
ids = ncf.get_variable('obs_num')
......@@ -407,7 +406,7 @@ class RdamObservations(Observations):
f.close()
#return outfile
for key, value in self.site_move.iteritems():
for key, value in self.site_move.items():
msg = "Site is moved by %3.2f degrees latitude and %3.2f degrees longitude" % value
f.add_attribute(key, msg)
......
......@@ -427,7 +427,7 @@ class STILTObservationOperator(object):
if not os.path.exists(datadir):
msg = "The specified input directory for the OPS model to read from does not exist (%s), exiting..." % datadir
logging.error(msg)
raise IOError, msg
raise IOError(msg)
datafiles = os.listdir(datadir)
......@@ -436,15 +436,15 @@ class STILTObservationOperator(object):
if not os.path.exists(obsfile):
msg = "The specified obs input file for the OPS model to read from does not exist (%s), exiting..." % obsfile
logging.error(msg)
if not self.dacycle.has_key('forward.savestate.dir'):
raise IOError, msg
if 'forward.savestate.dir' not in self.dacycle:
raise IOError(msg)
for n in range(int(self.dacycle['da.optimizer.nmembers'])):
paramfile = 'parameters.%03d.nc' % n
if paramfile not in datafiles:
msg = "The specified parameter input file for the OPS model to read from does not exist (%s), exiting..." % paramfile
logging.error(msg)
raise IOError, msg
raise IOError(msg)
self.ops_exec = self.model_settings['run.opsexec']
if not os.path.exists(self.ops_exec):
......@@ -513,7 +513,7 @@ class STILTObservationOperator(object):
elif psdo == 0:
#varname = '%sbg_true'%(self.spname[s])
varname = '%sbg_prior'%(self.spname[s])
if varname in bf.variables.keys():
if varname in list(bf.variables.keys()):
bgc = bf.get_variable(varname)[idx1[0]:idx2[0]+1]
else:
bgc = np.zeros(idx2-idx1+1)
......@@ -604,8 +604,8 @@ class STILTObservationOperator(object):
import da.tools.io4 as io
f = io.CT_CDF(self.simulated_file, method='create')
logging.debug('Creating new simulated observation file in ObservationOperator (%s)' % self.simulated_file)
logging.debug('Creating new simulated observation file in ObservationOperator (%s)' % self.simulated_file)
dimid = f.createDimension('obs_num', size=None)
dimid = ('obs_num',)
savedict = io.std_savedict.copy()
......@@ -632,7 +632,7 @@ class STILTObservationOperator(object):
ids = f_in.get_variable('obs_num')
for i,data in enumerate(zip(ids,self.mod)):
f.variables['obs_num'][i] = data[0]
f.variables['obs_num'][i] = data[0]
f.variables['model'][i,:] = data[1]
dum=f.variables['model'][:]
......
......@@ -44,18 +44,18 @@ def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsopera
logging.info(header + "Initializing current cycle" + footer)
start_job(dacycle, dasystem, platform, statevector, samples, obsoperator)
if dacycle.has_key('forward.savestate.exceptsam'):
if 'forward.savestate.exceptsam' in dacycle:
sam = (dacycle['forward.savestate.exceptsam'].upper() in ["TRUE","T","YES","Y"])
else:
sam = False
if dacycle.has_key('forward.savestate.dir'):
if 'forward.savestate.dir' in dacycle:
fwddir = dacycle['forward.savestate.dir']
else:
logging.debug("No forward.savestate.dir key found in rc-file, proceeding with self-constructed prior parameters")
fwddir = False
if dacycle.has_key('forward.savestate.legacy'):
if 'forward.savestate.legacy' in dacycle:
legacy = (dacycle['forward.savestate.legacy'].upper() in ["TRUE","T","YES","Y"])
else:
legacy = False
......@@ -172,7 +172,7 @@ def analysis_pipeline(dacycle, platform, dasystem, samples, statevector):
def archive_pipeline(dacycle, platform, dasystem):
""" Main entry point for archiving of output from one disk/system to another """
if not dacycle.has_key('task.rsync'):
if 'task.rsync' not in dacycle:
logging.info('rsync task not found, not starting automatic backup...')
return
else:
......@@ -374,7 +374,7 @@ def invert(dacycle, statevector, optimizer, obsoperator):
int(dacycle.dasystem['obs.spec.nr']),
dacycle.dasystem['datadir'])
if not dacycle.dasystem.has_key('opt.algorithm'):
if 'opt.algorithm' not in dacycle.dasystem:
logging.info("There was no minimum least squares algorithm specified in the DA System rc file (key : opt.algorithm)")
logging.info("...using serial algorithm as default...")
optimizer.set_algorithm()
......
!!! Info for the CarbonTracker data assimilation system
datadir : /Storage/CO2/super004/STILT_model/Data
datadir : /home/awoude/ffdas/test/Data
! list of all observation sites
obs.input.id : obsfiles.csv
......
......@@ -103,11 +103,9 @@ class STILTObservationOperator(ObservationOperator):
obs_ids = [b''.join(obs_id).decode() for obs_id in obs_ids]
self.tracer, self.site, self.obs_id = [], [], []
for obs_id in obs_ids:
tracer, site, *_ = obs_id.split('~')[1].split('_')
tracer, site, *_ = obs_id.split('-')[0].split('_')
self.tracer.append(tracer)
self.site.append(site)
self.obs_id.append(obs_id.split('~')[-1])
# Times and latitude and longitudes are static; only need once
self.times = infile['date_components'][:]
self.lat = infile['latitude'][:]
......@@ -357,7 +355,7 @@ class STILTObservationOperator(ObservationOperator):
# Add the simulated concentrations
dimmember = f.createDimension('nmembers', size=self.dacycle['da.optimizer.nmembers'])
savedict = io.std_savedict.copy()
savedict['name'] = "simulated"
savedict['name'] = "flask"
savedict['dtype'] = "float"
savedict['long_name'] = "Simulated_concentration"
savedict['units'] = "mol mol-1"
......
#!/usr/bin/env python
# control.py
"""
Author : peters
Revision History:
File created on 26 Aug 2010.
Adapted by super004 on 26 Jan 2017.
"""
import logging
################### Begin Class CO2DaSystem ###################
from da.baseclasses.dasystem import DaSystem
class CO2DaSystem(DaSystem):
""" Information on the data assimilation system used. This is normally an rc-file with settings.
"""
def validate(self):
"""
validate the contents of the rc-file given a dictionary of required keys
"""
needed_rc_items = ['obs.input.id',
'obs.input.nr',
'obs.spec.nr',
'obs.cat.nr',
'nparameters',
'random.seed',
'emis.pparam',
'ff.covariance',
'obs.bgswitch',
'obs.background',
'emis.input.spatial',
'emis.input.tempobs',
'emis.input.tempprior',
'emis.paramfile',
'emis.paramfile2',
'run.emisflag',
'run.emisflagens',
'run.obsflag']
for k, v in self.iteritems():
if v == 'True' :
self[k] = True
if v == 'False':
self[k] = False
for key in needed_rc_items:
if not self.has_key(key):
logging.warning('Missing a required value in rc-file : %s' % key)
logging.debug('DA System Info settings have been validated succesfully')
################### End Class CO2DaSystem ###################
if __name__ == "__main__":
pass
#!/usr/bin/env python
# stilt_tools.py
"""
Author : I. Super
Revision History:
Newly developed code, September 2017
This module holds an emission model that prepares emission files used by the observation operator and
to create pseudo-data
"""
import shutil
import os
import logging
import datetime as dtm
import numpy as np
from numpy import array, logical_and
import da.tools.io4 as io
import math
import da.tools.rc as rc
from da.tools.general import create_dirs, to_datetime
identifier = 'EmissionModel ensemble '
version = '1.0'
################### Begin Class Emission model ###################
class EmisModel(object):
def __init__(self, dacycle=None):
if dacycle != None:
self.dacycle = dacycle
else:
self.dacycle = {}
def setup(self, dacycle):
self.dacycle = dacycle
self.startdate = self.dacycle['time.fxstart']
self.enddate = self.dacycle['time.finish']
self.emisdir = dacycle.dasystem['datadir']
self.proxyfile = dacycle.dasystem['emis.input.spatial']
self.tempfileo = dacycle.dasystem['emis.input.tempobs']
self.tempfilep = dacycle.dasystem['emis.input.tempprior']
self.btime = int(dacycle.dasystem['run.backtime'])
self.obsfile = dacycle.dasystem['obs.input.id']
self.nrspc = int(dacycle.dasystem['obs.spec.nr'])
self.nrcat = int(dacycle.dasystem['obs.cat.nr'])
self.nparams = int(dacycle.dasystem['nparameters'])
self.nmembers = int(dacycle['da.optimizer.nmembers'])
self.pparam = dacycle.dasystem['emis.pparam']
self.paramfile = dacycle.dasystem['emis.paramfile']
#self.paramfile2 = dacycle.dasystem['emis.paramfile2']
def get_emis(self, dacycle, psdo):
"""set up emission information for pseudo-obs (psdo=1) and ensemble runs (psdo=0)"""
if psdo==1:
priorparam=os.path.join(self.emisdir,self.pparam)
f = io.ct_read(priorparam, 'read')
self.prm = f.get_variable('true_values')[:self.nparams]
f.close()
self.get_spatial(dacycle, n=999, infile=os.path.join(self.emisdir, self.paramfile))
self.get_temporal(dacycle, n=999, psdo=1)
elif psdo==0:
self.timestartkey = self.dacycle['time.sample.start']
self.timefinishkey = self.dacycle['time.sample.end']
for j in range(self.nmembers):
#first remove old files, then create new emission files per ensemble member
if self.startdate == self.timestartkey:
file = os.path.join(dacycle.dasystem['datadir'],'temporal_data_%03d.nc'%j)
try:
os.remove(file)
except OSError:
pass
prmfile=os.path.join(dacycle['dir.input'],'parameters.%03d.nc'%j)
f = io.ct_read(prmfile, 'read')
self.prm = f.get_variable('parametervalues')
f.close()
self.get_spatial(dacycle, n=j, infile=os.path.join(self.emisdir, self.paramfile))
self.get_temporal(dacycle, n=j, psdo=0)
def get_totals(self, infile=None):
"""gather all required data and calculate total emissions per sector in kg/yr"""
yremis = np.array(self.nrcat*[self.nrspc*[0.]])
self.spatial_var = []
self.spatial_inc = []
self.temporal_var = []
self.temporal_inc = []
self.ops_sector = []
### Read in parameter values for the emission functions and ratios to calculate total emissions
f = open(infile, 'r')
lines = f.readlines()
f.close()
ct = 0
for line in lines:
dum=line.split(",")
if dum[0]=='#':
continue
else:
id = int(dum[0])
### If id == 0 this (sub)sector is treated only by WRF-STILT; if id takes another value the local sources are treated by OPS
if id != 0:
self.ops_sector.append(ct)
inc = int(dum[2])
### If inc == 999 this parameter is not in the state vector and will not be optimized; otherwise inc refers to the
### position of this parameter in the state vector
if inc!=999:
EC = float(dum[1])*self.prm[inc]
else:
EC = float(dum[1])
inc = int(dum[4])
if inc!=999:
EF = float(dum[3])*self.prm[inc]
else:
EF = float(dum[3])
inc = int(dum[6])
if inc!=999:
AD = float(dum[5])*self.prm[inc]
else:
AD = float(dum[5])
inc = int(dum[8])
if inc!=999:
fr = float(dum[7])*self.prm[inc]
else:
fr = float(dum[7])
### emission = energy consumption per activity x emission factor x activity
### fr can be used to divide sectoral emissions over several subsectors (e.g. to split road traffic into cars and HDV)
ems = EC*EF*AD*fr
### Now we calculated emissions of CO2. To get emissions of other trace gases we multiply with an emission ratio
for s in range(self.nrspc):
inc = int(dum[15+s*3])
if inc!=999:
rat = float(dum[13+s*3])*self.prm[inc]
else:
rat = float(dum[13+s*3])
### Some emission ratios have lognormal uncertainty distributions (label 'l')
if dum[14+s*3]=='n':
yremis[ct,s] = ems*rat
elif dum[14+s*3]=='l':
yremis[ct,s] = ems*np.exp(rat)
ct = ct + 1
### Here we list the spatial and temporal variables that are part of the state vector for use in the get_spatial and get_temporal functions
self.spatial_var.append(dum[9])
self.spatial_inc.append(int(dum[10]))
self.temporal_var.append(dum[11])
self.temporal_inc.append(int(dum[12]))
logging.debug("Successfully calculated total emissions")
return yremis
def get_spatial(self, dacycle, n, infile=None):
"""read in proxy data used for spatial distribution of the gridded emissions, disaggregate yearly totals for the area"""
yremis=self.get_totals(infile)
# read in species information
infile = os.path.join(self.emisdir, self.obsfile)
f = open(infile, 'r')
lines = f.readlines()
f.close()
M_mass = []
spname = []
for line in lines:
dum=line.split(",")
if dum[0]=='#':
continue
else:
M_mass.append(float(dum[6])*1e-9) #kg/micromole
spname.append(dum[5]) #name of the species
sec_year = 8760.*3600. #seconds in a year
arcor = 1e6 #km2 -> m2
conv = np.array(M_mass)*sec_year*arcor # to convert emissions in kg/km2/yr to micromole/m2/s
#read in proxy data for spatial disaggregation
infile = os.path.join(self.emisdir, self.proxyfile)
prx = io.ct_read(infile, method='read')
sp_distr = []
for c in range(self.nrcat):
sp_distr.append(prx.get_variable(self.spatial_var[c]))
sp_distr=np.array(sp_distr)
prx.close()
### create output file
prior_file = os.path.join(self.emisdir, 'prior_spatial_%03d.nc'%n)
f = io.CT_CDF(prior_file, method='create')
dimid = f.add_dim('ncat', self.nrcat)
dimid2 = f.add_dim('ops', 3)
dimlon = f.add_dim('lon', sp_distr.shape[1])
dimlat = f.add_dim('lat', sp_distr.shape[2])
#loop over all tracers
for s in range(self.nrspc):
# determine which fraction of the emissions are local and are treated by OPS
datalistOPS = []