Commit 5e1f40c1 authored by Wit, Allard de's avatar Wit, Allard de
Browse files

- Main change is that grompy can now handle relative paths much better:

Relative paths means that the location of files is now determined relative to the location of grompy.yaml
- the shapefile is now checked for the existence of the required columns.
parent f7d41077
......@@ -7,4 +7,4 @@ from .dap import DataAccessProvider
from .cmd import cli
__version__ = "1.0.0"
\ No newline at end of file
__version__ = "1.1.0"
\ No newline at end of file
......@@ -3,16 +3,22 @@
# Allard de Wit (allard.dewit@wur.nl), April 2021
from pathlib import Path
import geopandas as gpd
import sqlalchemy as sa
import sqlalchemy.exc
import yaml
from .util import count_lines
from .util import count_lines, make_path_absolute
ALL_CHECKS_OK = True
def check_DB_connection(dsn):
def check_DB_connection(grompy_yaml, dsn):
global ALL_CHECKS_OK
if dsn.startswith("sqlite"):
sqlite_path = dsn.replace("sqlite:///", "")
sqlite_path = make_path_absolute(grompy_yaml, Path(sqlite_path))
dsn = f"sqlite:///{sqlite_path}"
try:
e = sa.create_engine(dsn)
e.connect()
......@@ -24,32 +30,42 @@ def check_DB_connection(dsn):
return False
def check_parcel_info(dsn, counts_file, shape_file, table_name):
def check_parcel_info(grompy_yaml, dsn, counts_file, shape_file, **kwargs):
global ALL_CHECKS_OK
# Check if files exist
fname_pixcounts = Path(counts_file).absolute()
fname_pixcounts = make_path_absolute(grompy_yaml, Path(counts_file))
if fname_pixcounts.exists():
print(f"OK: pixel counts file exists!")
else:
print(f"ERROR: Missing file: {fname_pixcounts}")
ALL_CHECKS_OK = False
fname_shape_file = Path(shape_file).absolute()
fname_shape_file = make_path_absolute(grompy_yaml, Path(shape_file))
if fname_shape_file.exists():
print(f"OK: shape file with parcel info exists!")
else:
print(f"ERROR: Missing file: {fname_shape_file}")
ALL_CHECKS_OK = False
check_DB_connection(dsn)
if fname_shape_file.exists():
gdf = gpd.read_file(fname_shape_file, rows=1)
for column in ["fieldid", "year", "cat_gewasc", "gws_gewasc", "gws_gewas", "provincie",
"gemeente", "regio", "PC4", "woonplaats", "waterschap"]:
if column not in gdf.columns:
print(f"ERROR: Attribute {column} missing in BRP shapefile")
ALL_CHECKS_OK = False
check_DB_connection(grompy_yaml, dsn)
def check_CSV_inputs(dsn, bands, **kwargs):
def check_CSV_inputs(grompy_yaml, dsn, bands, **kwargs):
global ALL_CHECKS_OK
n_tested = 0
fnames = []
for key, fname in bands.items():
fpath = Path(fname).absolute()
fpath = make_path_absolute(grompy_yaml, Path(fname))
fnames.append(fpath)
if fpath.exists():
n_tested += 1
else:
......@@ -58,26 +74,26 @@ def check_CSV_inputs(dsn, bands, **kwargs):
nlines = None
if n_tested == len(bands):
nlines = count_lines(bands)
nlines = count_lines(fnames)
if nlines is None:
ALL_CHECKS_OK = False
db_OK = check_DB_connection(dsn)
db_OK = check_DB_connection(grompy_yaml, dsn)
return nlines, db_OK
def check_grompy_inputs(yaml_file):
grompy_conf = yaml.safe_load(open(yaml_file))
def check_grompy_inputs(grompy_yaml):
grompy_conf = yaml.safe_load(open(grompy_yaml))
parcel_info = grompy_conf.pop("parcel_info")
check_parcel_info(**parcel_info)
check_parcel_info(grompy_yaml, **parcel_info)
for dataset_name, description in grompy_conf["datasets"].items():
n, OK = check_CSV_inputs(**description)
n, OK = check_CSV_inputs(grompy_yaml, **description)
description.update({"nlines": n, "DBcheck": OK})
if ALL_CHECKS_OK:
print("OK! All inputs seem fine.")
grompy_conf["parcel_info"] = parcel_info
yaml.safe_dump(grompy_conf, open(yaml_file, "w"))
yaml.safe_dump(grompy_conf, open(grompy_yaml, "w"))
else:
print("ERRORS FOUND! Check log messages.")
......@@ -15,26 +15,28 @@ def cli():
@click.command("init")
@click.argument("year", type=click.INT)
@click.argument("input_path", type=click.Path(exists=True))
def init_grompy(input_path):
init(input_path)
def init_grompy(year, input_path):
input_path = Path(input_path)
year = int(year)
if year not in list(range(2000,2030)):
print("YEAR must be in range 2000..2030")
sys.exit()
init(year, input_path)
@click.command("check")
def check_grompy():
grompy_yaml = Path.cwd() / "grompy.yaml"
if not grompy_yaml.exists():
click.echo("Cannot find 'grompy.yaml' in current working directory! Abort...")
sys.exit()
@click.argument("grompy_yaml", type=click.Path(exists=True))
def check_grompy(grompy_yaml):
grompy_yaml = Path(grompy_yaml)
check_grompy_inputs(grompy_yaml)
@click.command("load")
def load_grompy():
grompy_yaml = Path.cwd() / "grompy.yaml"
if not grompy_yaml.exists():
click.echo("Cannot find 'grompy.yaml' in current working directory! Abort...")
sys.exit()
@click.argument("grompy_yaml", type=click.Path(exists=True))
def load_grompy(grompy_yaml):
grompy_yaml = Path(grompy_yaml)
load_data(grompy_yaml)
......
......@@ -7,6 +7,8 @@ import sqlalchemy as sa
import pandas as pd
import yaml
from .util import open_DB_connection, make_path_absolute
class Container:
pass
......@@ -14,31 +16,30 @@ class Container:
class DataAccessProvider:
def __init__(self, grompy_conf, fieldID=None, area_gt=None, pixcount_gt=None, provincie=None, limit=None, gws_gewasc=None):
def __init__(self, grompy_yaml, fieldID=None, area_gt=None, pixcount_gt=None, provincie=None, limit=None, gws_gewasc=None):
grompy_conf = Path(grompy_conf)
if not grompy_conf.exists():
msg = f"Cannot find config file: {grompy_conf}"
grompy_yaml = Path(grompy_yaml)
if not grompy_yaml.exists():
msg = f"Cannot find config file: {grompy_yaml}"
raise RuntimeError(msg)
self.grompy_conf = yaml.safe_load(open(grompy_conf))
self.grompy_conf = yaml.safe_load(open(grompy_yaml))
self.parcel_dsn = self.grompy_conf["parcel_info"]["dsn"]
self.parcel_table = self.grompy_conf["parcel_info"]["table_name"]
# Build connections to dataset tables
self.dataset_connections = {}
for dataset_name, details in self.grompy_conf["datasets"].items():
e = sa.create_engine(details["dsn"])
e = open_DB_connection(grompy_yaml, details["dsn"])
meta = sa.MetaData(e)
tbl = sa.Table(dataset_name, meta, autoload=True)
self.dataset_connections[dataset_name] = (e, tbl)
self.datasets_enabled = set(self.datasets)
# Build connection to parcel info
self.engine = sa.create_engine(self.parcel_dsn)
meta = sa.MetaData(self.engine)
self.tbl_perc_info = sa.Table(self.parcel_table, meta, autoload=True)
if limit is None:
limit = int(1e9)
self.limit = limit
self.limit = int(1e9) if limit is None else limit
self.perc_stmt = sa.select([self.tbl_perc_info]).order_by("fieldID")
if fieldID is not None:
......@@ -63,6 +64,23 @@ class DataAccessProvider:
def datasets(self):
return list(self.dataset_connections.keys())
def enable(self, dataset):
"""Enables a dataset for reading with grompy..
"""
if dataset in self.datasets:
self.datasets_enabled.update(dataset)
else:
print(f"'{dataset}' unknown, should be one of: {self.datasets}")
def disable(self, dataset):
"""Disable a dataset for reading with grompy.
"""
if dataset in self.datasets:
if dataset in self.datasets_enabled:
self.datasets_enabled.remove(dataset)
else:
print(f"'{dataset}' unknown, should be one of: {self.datasets}")
def __iter__(self):
r = self.perc_stmt.limit(self.limit).execute()
......@@ -71,6 +89,8 @@ class DataAccessProvider:
for row in rows:
c = Container()
for dataset_name, (engine, tbl) in self.dataset_connections.items():
if dataset_name not in self.datasets_enabled:
continue
s = sa.select([tbl],
sa.and_(tbl.c.fieldID==row.fieldID),
order_by={tbl.c.day})
......@@ -80,7 +100,7 @@ class DataAccessProvider:
setattr(c, dataset_name, df)
yield row, c
rows = r.fetchmany()
rows = r.fetchmany(100)
def __len__(self):
return self.parcel_count
......
......@@ -3,48 +3,48 @@ grompy:
parcel_info:
dsn: sqlite:///{input_path}/parcel_info.db3
counts_file: {input_path}/Optisch/perceelscount.csv
shape_file: {input_path}/BRP/gewaspercelen_2019.shp
shape_file: {input_path}/BRP/gewaspercelen_{year}.shp
table_name: parcel_info
datasets:
sentinel2_reflectance_values:
dsn: sqlite:///{input_path}/sentinel2_reflectance_values.db3
bands:
NDVI: {input_path}/Optisch/zonal_stats_mean_2019_ADC.csv
B02: {input_path}/Optisch/zonal_stats_mean_B02_2019_ADC.csv
B03: {input_path}/Optisch/zonal_stats_mean_B03_2019_ADC.csv
B04: {input_path}/Optisch/zonal_stats_mean_B04_2019_ADC.csv
B05: {input_path}/Optisch/zonal_stats_mean_B05_2019_ADC.csv
B06: {input_path}/Optisch/zonal_stats_mean_B06_2019_ADC.csv
B07: {input_path}/Optisch/zonal_stats_mean_B07_2019_ADC.csv
B08: {input_path}/Optisch/zonal_stats_mean_B08_2019_ADC.csv
B11: {input_path}/Optisch/zonal_stats_mean_B11_2019_ADC.csv
B12: {input_path}/Optisch/zonal_stats_mean_B12_2019_ADC.csv
B8A: {input_path}/Optisch/zonal_stats_mean_B8A_2019_ADC.csv
NDVI: {input_path}/Optisch/zonal_stats_mean_{year}_ADC.csv
B02: {input_path}/Optisch/zonal_stats_mean_B02_{year}_ADC.csv
B03: {input_path}/Optisch/zonal_stats_mean_B03_{year}_ADC.csv
B04: {input_path}/Optisch/zonal_stats_mean_B04_{year}_ADC.csv
B05: {input_path}/Optisch/zonal_stats_mean_B05_{year}_ADC.csv
B06: {input_path}/Optisch/zonal_stats_mean_B06_{year}_ADC.csv
B07: {input_path}/Optisch/zonal_stats_mean_B07_{year}_ADC.csv
B08: {input_path}/Optisch/zonal_stats_mean_B08_{year}_ADC.csv
B11: {input_path}/Optisch/zonal_stats_mean_B11_{year}_ADC.csv
B12: {input_path}/Optisch/zonal_stats_mean_B12_{year}_ADC.csv
B8A: {input_path}/Optisch/zonal_stats_mean_B8A_{year}_ADC.csv
sentinel2_reflectance_std:
dsn: sqlite:///{input_path}/sentinel2_reflectance_std.db3
bands:
NDVI: {input_path}/Optisch/zonal_stats_std_2019_ADC.csv
B02: {input_path}/Optisch/zonal_stats_std_B02_2019_ADC.csv
B03: {input_path}/Optisch/zonal_stats_std_B03_2019_ADC.csv
B04: {input_path}/Optisch/zonal_stats_std_B04_2019_ADC.csv
B05: {input_path}/Optisch/zonal_stats_std_B05_2019_ADC.csv
B06: {input_path}/Optisch/zonal_stats_std_B06_2019_ADC.csv
B07: {input_path}/Optisch/zonal_stats_std_B07_2019_ADC.csv
B08: {input_path}/Optisch/zonal_stats_std_B08_2019_ADC.csv
B11: {input_path}/Optisch/zonal_stats_std_B11_2019_ADC.csv
B12: {input_path}/Optisch/zonal_stats_std_B12_2019_ADC.csv
B8A: {input_path}/Optisch/zonal_stats_std_B8A_2019_ADC.csv
NDVI: {input_path}/Optisch/zonal_stats_std_{year}_ADC.csv
B02: {input_path}/Optisch/zonal_stats_std_B02_{year}_ADC.csv
B03: {input_path}/Optisch/zonal_stats_std_B03_{year}_ADC.csv
B04: {input_path}/Optisch/zonal_stats_std_B04_{year}_ADC.csv
B05: {input_path}/Optisch/zonal_stats_std_B05_{year}_ADC.csv
B06: {input_path}/Optisch/zonal_stats_std_B06_{year}_ADC.csv
B07: {input_path}/Optisch/zonal_stats_std_B07_{year}_ADC.csv
B08: {input_path}/Optisch/zonal_stats_std_B08_{year}_ADC.csv
B11: {input_path}/Optisch/zonal_stats_std_B11_{year}_ADC.csv
B12: {input_path}/Optisch/zonal_stats_std_B12_{year}_ADC.csv
B8A: {input_path}/Optisch/zonal_stats_std_B8A_{year}_ADC.csv
sentinel1_backscatter:
dsn: sqlite:///{input_path}/sentinel1_backscatter.db3
bands:
VV: {input_path}/Radar/zonal_stats_mean_VV_2019_ADC.csv
VH: {input_path}/Radar/zonal_stats_mean_VH_2019_ADC.csv
VV_std: {input_path}/Radar/zonal_stats_std_VV_2019_ADC.csv
VH_std: {input_path}/Radar/zonal_stats_std_VH_2019_ADC.csv
VV: {input_path}/Radar/zonal_stats_mean_VV_{year}_ADC.csv
VH: {input_path}/Radar/zonal_stats_mean_VH_{year}_ADC.csv
VV_std: {input_path}/Radar/zonal_stats_std_VV_{year}_ADC.csv
VH_std: {input_path}/Radar/zonal_stats_std_VH_{year}_ADC.csv
sentinel1_coherence:
dsn: sqlite:///{input_path}/sentinel1_coherence.db3
bands:
S1A_VV: {input_path}/Radar/zonal_stats_mean_coh_S1A_VV_ALL_2019_ADC.csv
S1A_VV_std: {input_path}/Radar/zonal_stats_std_coh_S1A_VV_ALL_2019_ADC.csv
S1B_VV: {input_path}/Radar/zonal_stats_mean_coh_S1B_VV_ALL_2019_ADC.csv
S1B_VV_std: {input_path}/Radar/zonal_stats_std_coh_S1B_VV_ALL_2019_ADC.csv
S1A_VV: {input_path}/Radar/zonal_stats_mean_coh_S1A_VV_ALL_{year}_ADC.csv
S1A_VV_std: {input_path}/Radar/zonal_stats_std_coh_S1A_VV_ALL_{year}_ADC.csv
S1B_VV: {input_path}/Radar/zonal_stats_mean_coh_S1B_VV_ALL_{year}_ADC.csv
S1B_VV_std: {input_path}/Radar/zonal_stats_std_coh_S1B_VV_ALL_{year}_ADC.csv
......@@ -5,7 +5,7 @@ from pathlib import Path
import click
def init(input_path=None):
def init(year, input_path=None):
"""Initializes a fresh 'grompy.yaml' at the input_path location or the current
working directory otherwise.
"""
......@@ -15,11 +15,11 @@ def init(input_path=None):
yaml_conf = fp.read().decode()
if input_path is None:
input_path = Path.cwd()
input_path = Path(".")
else:
input_path = Path(input_path)
input_path = input_path.absolute()
yaml_conf = yaml_conf.format(input_path=input_path)
# input_path = input_path.absolute()
yaml_conf = yaml_conf.format(input_path=input_path, year=year)
yaml_output_fname = Path.cwd() / "grompy.yaml"
if yaml_output_fname.exists():
......
......@@ -15,7 +15,7 @@ import geopandas as gpd
import numpy as np
import yaml
from .util import prepare_db, printProgressBar, Process
from .util import prepare_db, printProgressBar, Process, make_path_absolute, open_DB_connection
dummy_date = "19000101"
......@@ -30,7 +30,7 @@ class CSVLoadingError(Exception):
super().__init__()
def load_parcel_info(dsn, counts_file, shape_file, table_name):
def load_parcel_info(grompy_yaml, dsn, counts_file, shape_file, table_name):
"""Loads the parcel info from the
:param dsn: Data source name where to write to
......@@ -40,12 +40,12 @@ def load_parcel_info(dsn, counts_file, shape_file, table_name):
:return:
"""
shp_fname = Path(shape_file)
shp_fname = make_path_absolute(grompy_yaml, Path(shape_file))
df = gpd.read_file(shp_fname)
df["area_ha"] = df.geometry.area / 1e4
df = df.set_index("fieldid")
fname_counts = Path(counts_file)
fname_counts = make_path_absolute(grompy_yaml, Path(counts_file))
df_counts = pd.read_csv(fname_counts)
df_counts.set_index("field_ID", inplace=True)
df["pixcount"] = df_counts["pixcount"]
......@@ -65,7 +65,7 @@ def load_parcel_info(dsn, counts_file, shape_file, table_name):
"waterschap": df.waterschap.apply(str),
})
engine = sa.create_engine(dsn)
engine = open_DB_connection(grompy_yaml, dsn)
meta = sa.MetaData(engine)
tbl = sa.Table(table_name, meta,
sa.Column("fieldID", sa.Integer, primary_key=True),
......@@ -163,15 +163,16 @@ def write_to_database(engine, dataset_name, csv_readers, child_conn):
break
def load_satellite_csv(child_conn, dataset_name, dsn, bands, **kwargs):
def load_satellite_csv(grompy_yaml, child_conn, dataset_name, dsn, bands, **kwargs):
mean_csv_readers = {}
for column_name, csv_fname in bands.items():
mean_csv_readers[column_name] = DictReader(open(csv_fname))
csv_fpath = make_path_absolute(grompy_yaml, Path(csv_fname))
mean_csv_readers[column_name] = DictReader(open(csv_fpath))
engine = prepare_db(dsn, table_name=dataset_name, bands=mean_csv_readers.keys())
write_to_database(engine, dataset_name, mean_csv_readers, child_conn)
def start_parallel_loading(datasets):
def start_parallel_loading(grompy_yaml, datasets):
"""Start loading CSV files in parallel
:param datasets:
:return:
......@@ -186,7 +187,7 @@ def start_parallel_loading(datasets):
print(f"Starting loading of: {dataset_name}")
lines_per_dataset[dataset_name] = description["nlines"]
p = Process(target=load_satellite_csv, args=(child_conn, dataset_name,), kwargs=description)
p = Process(target=load_satellite_csv, args=(grompy_yaml, child_conn, dataset_name,), kwargs=description)
process_list.append(p)
for p in process_list:
p.start()
......@@ -225,19 +226,19 @@ def monitor_parallel_loading(process_list, parent_conn, lines_per_dataset):
print(e.traceback)
def load_data(yaml_file):
def load_data(grompy_yaml):
"""Loads data point to by the YAML config file.
:param yaml_file:
:param grompy_yaml:
:return:
"""
grompy_conf = yaml.safe_load(open(yaml_file))
grompy_conf = yaml.safe_load(open(grompy_yaml))
# First load parcel info
print("Start loading parcel information. This will take some time...")
parcel_info = grompy_conf.pop("parcel_info")
load_parcel_info(**parcel_info)
load_parcel_info(grompy_yaml, **parcel_info)
process_list, parent_conn, lines_per_dataset = start_parallel_loading(grompy_conf["datasets"])
process_list, parent_conn, lines_per_dataset = start_parallel_loading(grompy_yaml, grompy_conf["datasets"])
monitor_parallel_loading(process_list, parent_conn, lines_per_dataset)
......@@ -3,6 +3,7 @@
# Allard de Wit (allard.dewit@wur.nl), April 2021
import multiprocessing as mp
import traceback
from pathlib import Path
import sqlalchemy.exc
from sqlalchemy import MetaData, Table, Column, Integer, Date, Float, Text, create_engine
......@@ -20,7 +21,7 @@ def count_lines(files):
"""
counts = {}
print("Checking number of lines:")
for band, fname in files.items():
for fname in files:
with open(fname) as my_file:
c = sum(1 for _ in my_file)
counts[fname] = c
......@@ -97,3 +98,31 @@ class Process(mp.Process):
if self._parent_conn.poll():
self._exception = self._parent_conn.recv()
return self._exception
def make_path_absolute(grompy_yaml, filepath):
"""Makes a path absolute, if a relative path is found it is asssumed to be relative
to the location of grompy_conf
:param grompy_yaml: the Path() of gromp_conf
:param filepath: the (possible relative) Path of the input file
:return: the absolute path to the file
"""
if not filepath.is_absolute():
filepath = grompy_yaml.parent / filepath
return filepath
def open_DB_connection(grompy_yaml, dsn):
if dsn.startswith("sqlite"):
sqlite_path = dsn.replace("sqlite:///", "")
sqlite_path = make_path_absolute(grompy_yaml, Path(sqlite_path))
dsn = f"sqlite:///{sqlite_path}"
engine = create_engine(dsn)
engine.connect()
return engine
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment