Commit b8fca6fc authored by Wit, Allard de's avatar Wit, Allard de
Browse files

Grompy INIT, CHECK and LOAD are now working. Some polishing to be done.

parent 453db233
grompy:
version: 0.9
parcel_info:
dsn: sqlite:////home/wit015/Data/groenmonitor/parcel_info.db3
counts_file: /home/wit015/Data/groenmonitor/Optisch/perceelscount.csv
# shape_file: /home/wit015/Data/groenmonitor/BRP/gewaspercelen_2019.shp
shape_file: /home/wit015/Data/groenmonitor/BRP/BRP_10rows.shp
table_name: parcel_info
datasets:
sentinel2_reflectance_values:
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel2_reflectance_values.db3
sentinel1_backscatter:
DBcheck: true
bands:
NDVI: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_2019_ADC.csv
B02: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B02_2019_ADC.csv
B03: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B03_2019_ADC.csv
B04: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B04_2019_ADC.csv
B05: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B05_2019_ADC.csv
B06: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B06_2019_ADC.csv
B07: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B07_2019_ADC.csv
B08: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B08_2019_ADC.csv
B11: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B11_2019_ADC.csv
B12: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B12_2019_ADC.csv
B8A: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B8A_2019_ADC.csv
VH: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_VH_2019_ADC.csv
VH_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_VH_2019_ADC.csv
VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_VV_2019_ADC.csv
VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_VV_2019_ADC.csv
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel1_backscatter.db3
nlines: 803016
sentinel1_coherence:
DBcheck: true
bands:
S1A_VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_coh_S1A_VV_ALL_2019_ADC.csv
S1A_VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_coh_S1A_VV_ALL_2019_ADC.csv
S1B_VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_coh_S1B_VV_ALL_2019_ADC.csv
S1B_VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_coh_S1B_VV_ALL_2019_ADC.csv
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel1_coherence.db3
nlines: 803016
sentinel2_reflectance_std:
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel2_reflectance_std.db3
DBcheck: true
bands:
NDVI: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_2019_ADC.csv
B02: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B02_2019_ADC.csv
B03: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B03_2019_ADC.csv
B04: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B04_2019_ADC.csv
......@@ -35,17 +30,29 @@ datasets:
B11: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B11_2019_ADC.csv
B12: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B12_2019_ADC.csv
B8A: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B8A_2019_ADC.csv
sentinel1_backscatter:
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel1_backscatter.db3
bands:
VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_VV_2019_ADC.csv
VH: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_VH_2019_ADC.csv
VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_VV_2019_ADC.csv
VH_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_VH_2019_ADC.csv
sentinel1_coherence:
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel1_coherence.db3
NDVI: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_2019_ADC.csv
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel2_reflectance_std.db3
nlines: 803016
sentinel2_reflectance_values:
DBcheck: true
bands:
S1A_VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_coh_S1A_VV_ALL_2019_ADC.csv
S1A_VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_coh_S1A_VV_ALL_2019_ADC.csv
S1B_VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_coh_S1B_VV_ALL_2019_ADC.csv
S1B_VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_coh_S1B_VV_ALL_2019_ADC.csv
B02: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B02_2019_ADC.csv
B03: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B03_2019_ADC.csv
B04: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B04_2019_ADC.csv
B05: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B05_2019_ADC.csv
B06: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B06_2019_ADC.csv
B07: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B07_2019_ADC.csv
B08: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B08_2019_ADC.csv
B11: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B11_2019_ADC.csv
B12: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B12_2019_ADC.csv
B8A: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B8A_2019_ADC.csv
NDVI: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_2019_ADC.csv
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel2_reflectance_values.db3
nlines: 803016
grompy:
version: 1.0
parcel_info:
counts_file: /home/wit015/Data/groenmonitor/Optisch/perceelscount.csv
dsn: sqlite:////home/wit015/Data/groenmonitor/parcel_info.db3
shape_file: /home/wit015/Data/groenmonitor/BRP/gewaspercelen_2019.shp
table_name: parcel_info
from pathlib import Path
import sys
import click
from .load_data import load_data
from .check_files import check_grompy_inputs
from .init import init
@click.group()
def cli():
pass
@click.command("init")
@click.argument("input_path", type=click.Path(exists=True))
def init_grompy(input_path):
init(input_path)
@click.command("check")
def check_grompy():
grompy_yaml = Path.cwd() / "grompy.yaml"
if not grompy_yaml.exists():
click.echo("Cannot find 'grompy.yaml' in current working directory! Abort...")
sys.exit()
check_grompy_inputs(grompy_yaml)
@click.command("load")
def load_grompy():
grompy_yaml = Path.cwd() / "grompy.yaml"
if not grompy_yaml.exists():
click.echo("Cannot find 'grompy.yaml' in current working directory! Abort...")
sys.exit()
load_data(grompy_yaml)
cli.add_command(init_grompy)
cli.add_command(check_grompy)
cli.add_command(load_grompy)
def main():
cli()
load_data("/home/wit015/Sources/grompy/grompy.yaml")
\ No newline at end of file
if __name__ == "__main__":
cli()
\ No newline at end of file
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Wageningen Environmental Research
# Allard de Wit (allard.dewit@wur.nl), April 2021
from pathlib import Path
import sqlalchemy as sa
import sqlalchemy.exc
import yaml
from .util import count_lines
ALL_CHECKS_OK = True
def check_DB_connection(dsn):
global ALL_CHECKS_OK
try:
e = sa.create_engine(dsn)
e.connect()
print(f"OK: Connection seems fine for: {dsn}")
return True
except sa.exc.SQLAlchemyError as e:
print(f"ERROR: Failed making DB connection: {dsn}")
ALL_CHECKS_OK = False
return False
def check_parcel_info(dsn, counts_file, shape_file, table_name):
global ALL_CHECKS_OK
# Check if files exist
fname_pixcounts = Path(counts_file).absolute()
if fname_pixcounts.exists():
print(f"OK: pixel counts file exists!")
else:
print(f"ERROR: Missing file: {fname_pixcounts}")
ALL_CHECKS_OK = False
fname_shape_file = Path(shape_file).absolute()
if fname_shape_file.exists():
print(f"OK: shape file with parcel info exists!")
else:
print(f"ERROR: Missing file: {fname_shape_file}")
ALL_CHECKS_OK = False
check_DB_connection(dsn)
def check_CSV_inputs(dsn, bands, **kwargs):
global ALL_CHECKS_OK
n_tested = 0
for key, fname in bands.items():
fpath = Path(fname).absolute()
if fpath.exists():
n_tested += 1
else:
print(f"ERROR: Cannot find CSV file: {fpath}")
ALL_CHECKS_OK = False
nlines = None
if n_tested == len(bands):
nlines = count_lines(bands)
if nlines is None:
ALL_CHECKS_OK = False
db_OK = check_DB_connection(dsn)
return nlines, db_OK
def check_grompy_inputs(yaml_file):
grompy_conf = yaml.safe_load(open(yaml_file))
parcel_info = grompy_conf.pop("parcel_info")
check_parcel_info(**parcel_info)
for dataset_name, description in grompy_conf["datasets"].items():
n, OK = check_CSV_inputs(**description)
description.update({"nlines": n, "DBcheck": OK})
if ALL_CHECKS_OK:
print("OK! All inputs seem fine.")
grompy_conf["parcel_info"] = parcel_info
yaml.safe_dump(grompy_conf, open(yaml_file, "w"))
else:
print("ERRORS FOUND! Check log messages.")
from pathlib import Path
import click
def init(input_path=None):
"""Initializes a fresh 'grompy.yaml' at the input_path location or the current
working directory otherwise.
"""
yaml_template_fname = Path(__file__).parent / "grompy.yaml.template"
with open(yaml_template_fname, "rb") as fp:
yaml_conf = fp.read().decode()
if input_path is None:
input_path = Path.cwd()
yaml_conf = yaml_conf.format(input_path=input_path)
yaml_output_fname = Path.cwd() / "grompy.yaml"
if yaml_output_fname.exists():
click.confirm('A grompy.yaml file exists. Overwrite it?', abort=True)
with open("grompy.yaml", "w") as fp:
fp.write(yaml_conf)
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Wageningen Environmental Research
# Allard de Wit (allard.dewit@wur.nl), April 2021
import sys
from pathlib import Path
from csv import DictReader
import time
import multiprocessing as mp
import traceback
import math
import sqlalchemy as sa
import sqlalchemy.exc
import pandas as pd
import geopandas as gpd
import numpy as np
......@@ -17,6 +22,8 @@ dummy_date = "19000101"
class CSVLoadingError(Exception):
"""Exception for raising errors related to CSV reading
"""
def __init__(self, error, traceback):
self.error = error
......@@ -24,13 +31,13 @@ class CSVLoadingError(Exception):
super().__init__()
def load_parcel_info(dsn, counts_file, shape_file, table_name):
def load_parcel_info(dsn, pixcounts_file, shape_file, table_name):
"""Loads the parcel info from the
:param dsn: Data source name where to write to
counts_file: CSV file from which pixel counts should be read
shape_file: shapefile whose .DBF file should be used as parcel info.
table_name: name of the table to write parcel info into
:param pixcounts_file: CSV file from which pixel counts should be read
:param shape_file: shapefile whose .DBF file should be used as parcel info.
:param table_name: name of the table to write parcel info into
:return:
"""
......@@ -39,7 +46,7 @@ def load_parcel_info(dsn, counts_file, shape_file, table_name):
df["area_ha"] = df.geometry.area/1e4
df = df.set_index("fieldid")
fname_counts = Path(counts_file)
fname_counts = Path(pixcounts_file)
df_counts = pd.read_csv(fname_counts)
df_counts.set_index("field_ID", inplace=True)
df["pixcount"] = df_counts["pixcount"]
......@@ -64,6 +71,13 @@ def load_parcel_info(dsn, counts_file, shape_file, table_name):
def process_rows(rows):
"""Process a single row from different CSV files and convert them
into a DataFrame.
:param rows: a dict with column names as keys and a CSV row as values.
returns a pandas DataFrame
"""
df = pd.DataFrame()
fieldIDs = []
for column_name, row in rows.items():
......@@ -99,50 +113,61 @@ def process_rows(rows):
return df
def write_to_database(engine, table_name, mean_csv_readers, nlines):
# printProgressBar(0, nlines-1, prefix='Progress:', suffix='Complete', length=50)
def write_to_database(engine, table_name, csv_readers, nlines, child_conn):
"""routine writes data from a set of CSV files into the database
:param engine: the database engine to be used
:param table_name: the name of the output table
:param csv_readers: the set of CSV DictReaders (one per CSV file)
:param nlines: number of lines to process
:param child_conn: The pipe to report progress in loading data from file
:return:
"""
this_line = 0
while True:
try:
rows = {column_name: next(reader) for column_name, reader in mean_csv_readers.items()}
rows = {column_name: next(reader) for column_name, reader in csv_readers.items()}
df = process_rows(rows)
df.to_sql(table_name, engine, if_exists="append", index=False)
try:
df.to_sql(table_name, engine, if_exists="append", index=False)
except sa.exc.IntegrityError as e:
print(f"Field ID {df.fieldID.unique()} failed to insert in table {table_name}")
this_line += 1
# if this_line % 1000 == 0:
# printProgressBar(this_line, nlines, prefix='Progress:', suffix='Complete', length=50)
if this_line % 100 == 0:
progress = this_line/nlines
child_conn.send(progress)
except StopIteration:
break
def load_satellite_csv(dataset_name, dsn, bands):
def load_satellite_csv(child_conn, dataset_name, dsn, bands, nlines, ):
nlines = count_lines(bands) # 803016
mean_csv_readers = {}
for column_name, csv_fname in bands.items():
mean_csv_readers[column_name] = DictReader(open(csv_fname))
t1 = time.time()
engine = prepare_db(dsn, table_name=dataset_name, bands=mean_csv_readers.keys())
write_to_database(engine, dataset_name, mean_csv_readers, nlines)
write_to_database(engine, dataset_name, mean_csv_readers, nlines, child_conn)
class Process(mp.Process):
def __init__(self, *args, **kwargs):
mp.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = mp.Pipe()
self._parent_conn, self._child_conn = mp.Pipe()
self._exception = None
def run(self):
try:
mp.Process.run(self)
self._cconn.send(None)
self._child_conn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
raise e # You can still rise this exception if you need to
self._child_conn.send((e, tb))
# raise e # You can still rise this exception if you need to
@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
if self._parent_conn.poll():
self._exception = self._parent_conn.recv()
return self._exception
......@@ -158,21 +183,33 @@ def load_data(yaml_file):
load_parcel_info(**parcel_info)
process_list = []
parent_conn, child_conn = mp.Pipe()
for dataset_name, description in grompy_conf["datasets"].items():
print(f"Starting loading of: {dataset_name}")
p = Process(target=load_satellite_csv, args=(dataset_name,), kwargs=description)
p = Process(target=load_satellite_csv, args=(child_conn, dataset_name,), kwargs=description)
process_list.append(p)
for p in process_list:
p.start()
try:
printProgressBar(0, 1000, decimals=2, length=50)
while any([p.is_alive() for p in process_list]):
print("Data loading running.....")
time.sleep(10)
for p in process_list:
if p.exception:
error, traceback = p.exception
raise CSVLoadingError(error, traceback)
time.sleep(3)
progress = []
for p in process_list:
if parent_conn.poll():
progress.append(parent_conn.recv())
if p.exception:
error, traceback = p.exception
raise CSVLoadingError(error, traceback)
if progress:
p = math.floor(min(progress) * 1000)
# print(f"\rprogress: {min(progress):7.4f}")
printProgressBar(p, 1000, decimals=2, length=50)
else:
# print("no progress value")
pass
except KeyboardInterrupt:
for p in process_list:
p.kill()
......
......@@ -12,17 +12,19 @@ def count_lines(files):
They should all be the same else throw an error.
"""
counts = {}
print("Checking number of lines:")
for band, fname in files.items():
with open(fname) as my_file:
c = sum(1 for _ in my_file)
counts[fname] = c
print(f" - {fname}: {c}")
if len(set(counts.values())) > 1:
msg = "CSV files do not have the same number of rows!\n"
msg = "ERROR: CSV files do not have the same number of rows!\n"
for fname, c in counts.items():
msg += f" - {fname}: {c}\n"
raise RuntimeError(msg)
else:
print("OK: CSV files all have same length")
return take_first(counts.values())
......@@ -43,7 +45,7 @@ def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1,
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end = printEnd)
print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end = printEnd, flush=True)
# Print New Line on Complete
if iteration == total:
print()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment