Commit e7fbcfda authored by Wit, Allard de's avatar Wit, Allard de
Browse files

Main change: parcel table is not created explicitly instead of through pandas.to_sql

parent 52485840
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Wageningen Environmental Research
# Allard de Wit (allard.dewit@wur.nl), April 2021
from .dap import DataAccessProvider
\ No newline at end of file
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Wageningen Environmental Research
# Allard de Wit (allard.dewit@wur.nl), April 2021
from pathlib import Path
import sys
import click
......
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Wageningen Environmental Research
# Allard de Wit (allard.dewit@wur.nl), April 2021
from pathlib import Path
import sqlalchemy as sa
......
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Wageningen Environmental Research
# Allard de Wit (allard.dewit@wur.nl), April 2021
from pathlib import Path
import click
......
......@@ -6,7 +6,7 @@ from pathlib import Path
from csv import DictReader
import time
import multiprocessing as mp
import traceback
import sqlalchemy as sa
import sqlalchemy.exc
......@@ -15,7 +15,7 @@ import geopandas as gpd
import numpy as np
import yaml
from .util import prepare_db, printProgressBar
from .util import prepare_db, printProgressBar, Process
dummy_date = "19000101"
......@@ -24,9 +24,9 @@ class CSVLoadingError(Exception):
"""Exception for raising errors related to CSV reading
"""
def __init__(self, error, traceback):
def __init__(self, error, trace_back):
self.error = error
self.traceback = traceback
self.traceback = trace_back
super().__init__()
......@@ -42,7 +42,7 @@ def load_parcel_info(dsn, counts_file, shape_file, table_name):
"""
shp_fname = Path(shape_file)
df = gpd.read_file(shp_fname)
df["area_ha"] = df.geometry.area/1e4
df["area_ha"] = df.geometry.area / 1e4
df = df.set_index("fieldid")
fname_counts = Path(counts_file)
......@@ -66,6 +66,33 @@ def load_parcel_info(dsn, counts_file, shape_file, table_name):
})
engine = sa.create_engine(dsn)
meta = sa.MetaData(engine)
tbl = sa.Table(table_name, meta,
sa.Column("fieldID", sa.Integer, primary_key=True),
sa.Column("year", sa.Integer),
sa.Column("pixcount", sa.Integer),
sa.Column("area_ha", sa.Float),
sa.Column("cat_gewasc", sa.Text),
sa.Column("gws_gewasc", sa.Integer),
sa.Column("gws_gewas", sa.Text),
sa.Column("provincie", sa.Text),
sa.Column("gemeente", sa.Text),
sa.Column("regio", sa.Text),
sa.Column("pc4", sa.Text),
sa.Column("woonplaats", sa.Text),
sa.Column("waterschap", sa.Text),
)
# TODO: Find out if adding indexes makes a difference
# sa.Index('ix_gewas_area', tbl.c.gws_gewasc, tbl.c.area_ha)
# sa.Index('ix_gewas_pixcount', tbl.c.gws_gewasc, tbl.c.pixcount)
# sa.Index('ix_gewas_pixcount', tbl.c.gws_gewasc, tbl.c.pixcount)
try:
tbl.drop()
except sqlalchemy.exc.OperationalError:
pass
tbl.create()
df_out.to_sql(table_name, engine, if_exists="replace", index=False)
df_out = df = df_counts = None
......@@ -144,28 +171,6 @@ def load_satellite_csv(child_conn, dataset_name, dsn, bands, **kwargs):
write_to_database(engine, dataset_name, mean_csv_readers, child_conn)
class Process(mp.Process):
def __init__(self, *args, **kwargs):
mp.Process.__init__(self, *args, **kwargs)
self._parent_conn, self._child_conn = mp.Pipe()
self._exception = None
def run(self):
try:
mp.Process.run(self)
self._child_conn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._child_conn.send((e, tb))
# raise e # You can still rise this exception if you need to
@property
def exception(self):
if self._parent_conn.poll():
self._exception = self._parent_conn.recv()
return self._exception
def start_parallel_loading(datasets):
"""Start loading CSV files in parallel
:param datasets:
......@@ -194,7 +199,7 @@ def monitor_parallel_loading(process_list, parent_conn, lines_per_dataset):
progressbar.
"""
total_lines = sum(c for c in lines_per_dataset.values())
lines_per_dataset = {ds:0 for ds in lines_per_dataset}
lines_per_dataset = {ds: 0 for ds in lines_per_dataset}
printProgressBar(0, total_lines, decimals=2, length=50)
try:
processes = [p for p in process_list if p.is_alive()]
......@@ -236,6 +241,3 @@ def load_data(yaml_file):
process_list, parent_conn, lines_per_dataset = start_parallel_loading(grompy_conf["datasets"])
monitor_parallel_loading(process_list, parent_conn, lines_per_dataset)
import sqlalchemy
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Wageningen Environmental Research
# Allard de Wit (allard.dewit@wur.nl), April 2021
import multiprocessing as mp
import traceback
import sqlalchemy.exc
from sqlalchemy import MetaData, Table, Column, Integer, Date, Float, Text, create_engine
def take_first(iter):
for i in iter:
return i
......@@ -67,4 +74,26 @@ def prepare_db(dsn, table_name, bands):
pass
tbl.create()
return engine
\ No newline at end of file
return engine
class Process(mp.Process):
def __init__(self, *args, **kwargs):
mp.Process.__init__(self, *args, **kwargs)
self._parent_conn, self._child_conn = mp.Pipe()
self._exception = None
def run(self):
try:
mp.Process.run(self)
self._child_conn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._child_conn.send((e, tb))
# raise e # You can still rise this exception if you need to
@property
def exception(self):
if self._parent_conn.poll():
self._exception = self._parent_conn.recv()
return self._exception
from csv import DictReader
from pathlib import Path
from shutil import copyfile
import time
import pandas as pd
import sqlalchemy as sa
cwd = Path(__file__).parent
csv_dir = cwd / "Optisch"
template_db = cwd / "sentinel2_observations_template.db3"
db_s2_observations = cwd / "sentinel2_observations_2019.db3"
csv_s2_observations = cwd / "sentinel2_observations_2019.csv"
dummy_date = "19000101"
mean_files = {
"NDVI": csv_dir / "zonal_stats_mean_2019_ADC.csv",
"B02": csv_dir / "zonal_stats_mean_B02_2019_ADC.csv",
"B03": csv_dir / "zonal_stats_mean_B03_2019_ADC.csv",
"B04": csv_dir / "zonal_stats_mean_B04_2019_ADC.csv",
"B05": csv_dir / "zonal_stats_mean_B05_2019_ADC.csv",
"B06": csv_dir / "zonal_stats_mean_B06_2019_ADC.csv",
"B07": csv_dir / "zonal_stats_mean_B07_2019_ADC.csv",
"B08": csv_dir / "zonal_stats_mean_B08_2019_ADC.csv",
"B11": csv_dir / "zonal_stats_mean_B11_2019_ADC.csv",
"B12": csv_dir / "zonal_stats_mean_B12_2019_ADC.csv",
"B8A": csv_dir / "zonal_stats_mean_B8A_2019_ADC.csv",
}
# Print iterations progress
def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█', printEnd = "\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end = printEnd)
# Print New Line on Complete
if iteration == total:
print()
def take_first(iter):
for i in iter:
return i
def count_lines(files):
"""Checks the number of lines in the input CSV files.
They should all be the same else throw an error.
"""
print("Checking file row counts...")
counts = {}
for band, fname in files.items():
with open(fname) as my_file:
c = sum(1 for _ in my_file)
counts[fname] = c
print(f" - {fname}: {c}")
if len(set(counts.values())) > 1:
msg = "CSV files do not have the same number of rows!"
raise RuntimeError(msg)
return take_first(counts.values())
def process_rows(rows):
df = pd.DataFrame()
fieldIDs = []
for column_name, row in rows.items():
fieldIDs.append(int(row.pop("field_ID")))
count = row.pop("count")
recs = []
for sdate, value in row.items():
value = float(value)
if value == 0.:
continue
recs.append({"day": sdate, "value": float(value), "band": column_name})
if not recs: # only zero (null) values for the column
# We add one dummy record to make sure we can create the dataframe properly
recs.append({"day": dummy_date, "value": None, "band": column_name})
df_tmp = pd.DataFrame(recs)
try:
df_tmp["day"] = pd.to_datetime(df_tmp.day)
except:
pass
df = pd.concat([df, df_tmp])
df = df.pivot(index="day", columns="band", values="value")
df.reset_index(inplace=True)
if len(set(fieldIDs)) > 1:
msg = f"FieldIDs are not the same for this row: {fieldIDs}"
raise RuntimeError(msg)
df["fieldID"] = fieldIDs[0]
ix = (df.day == pd.to_datetime(dummy_date))
if any(ix):
df = df[~ix]
return df
def write_to_SQLite(mean_csv_readers, nlines):
copyfile(template_db, db_s2_observations)
engine = sa.create_engine(f"sqlite:///{db_s2_observations}")
printProgressBar(0, nlines-1, prefix='Progress:', suffix='Complete', length=50)
this_line = 0
while True:
try:
rows = {column_name:next(reader) for column_name, reader in mean_csv_readers.items()}
df = process_rows(rows)
df.to_sql("s2_observations", engine, if_exists="append", index=False)
this_line += 1
if this_line % 1000 == 0:
printProgressBar(this_line, nlines, prefix='Progress:', suffix='Complete', length=50)
except StopIteration:
break
def main():
nlines = count_lines(mean_files)
mean_csv_readers = {}
for column_name, csv_fname in mean_files.items():
mean_csv_readers[column_name] = DictReader(open(csv_fname))
t1 = time.time()
write_to_SQLite(mean_csv_readers, nlines)
print(f"Processing {nlines} lines to SQLite took {time.time()-t1} seconds.")
if __name__ == "__main__":
main()
\ No newline at end of file
from pathlib import Path
import geopandas as gpd
import pandas as pd
import sqlalchemy as sa
import numpy as np
def main():
fname_percelen = Path.cwd() / "BRP" / "gewaspercelen_2019.shp"
# fname_percelen = Path.cwd() / "BRP" / "BRP_10rows.shp"
df = gpd.read_file(fname_percelen)
df["area_ha"] = df.geometry.area/1e4
df = df.set_index("fieldid")
fname_counts = Path.cwd() / "Optisch" / "perceelscount.csv"
df_counts = pd.read_csv(fname_counts)
df_counts.set_index("field_ID", inplace=True)
df["pixcount"] = df_counts.pixcount
df_out = pd.DataFrame({"fieldID": df.index,
"year": df.year,
"pixcount": df.pixcount,
"area_ha": df.area_ha,
"cat_gewasc": df.cat_gewasc.apply(str),
"gws_gewasc": df.gws_gewasc.astype(np.int32),
"gws_gewas": df.gws_gewas.apply(str),
"provincie": df.provincie.apply(str),
"gemeente": df.gemeente.apply(str),
"regio": df.regio.apply(str),
"pc4": df.PC4.apply(str),
"woonplaats": df.woonplaats.apply(str),
"waterschap": df.waterschap.apply(str),
})
fname_percelen_db = Path.cwd() / "sentinel2_observations_2019.db3"
dsn = f"sqlite:///{fname_percelen_db}"
engine = sa.create_engine(dsn)
df_out.to_sql("perceels_info", engine, if_exists="append", index=False)
if __name__ == "__main__":
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment