From 8c176765572b771061abd61627cda28c33ed9ebf Mon Sep 17 00:00:00 2001
From: Clemens Berteld <clemens@berteld.com>
Date: Mon, 13 Sep 2021 08:10:13 +0200
Subject: [PATCH] API works. Possible to get interpolated data from any
 coordinates. Calculated by average, could be improved.

---
 dataacquisition/ExportToDatabase.py | 81 +++++++++++++++--------------
 dataacquisition/GetAverageData.py   | 71 +++++++++++++++++++++++++
 dataacquisition/api.py              | 65 ++++++++++++-----------
 3 files changed, 150 insertions(+), 67 deletions(-)
 create mode 100644 dataacquisition/GetAverageData.py

diff --git a/dataacquisition/ExportToDatabase.py b/dataacquisition/ExportToDatabase.py
index 8523ef4..0c18685 100644
--- a/dataacquisition/ExportToDatabase.py
+++ b/dataacquisition/ExportToDatabase.py
@@ -22,78 +22,82 @@ param_postgres = cfg["POSTGRES"]
 
 stationGPD = None
 
+
 # Use existing connection to DB "postgres" to create DB "temperatures_berteld_morstein"
-def create_db(dbName):
-    print("Create DB: ",dbName)
+def create_db(db_name):
+    print("Create DB: ", db_name)
     connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
     connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)  # Needs to be in AUTOCOMMIT mode for creating database
     with connection.cursor() as cursor:
-        create_db_query = sql.SQL("""CREATE DATABASE {};""".format(dbName))
+        create_db_query = sql.SQL("""CREATE DATABASE {};""".format(db_name))
         cursor.execute(create_db_query)
         connection.close()
-        
-def drop_db(dbName):
-    if dbexists(dbName):
-        print("Drop DB: ",dbName)
+
+
+def drop_db(db_name):
+    if dbexists(db_name):
+        print("Drop DB: ", db_name)
         try:
             connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
-            connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database
+            connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)  # Needs to be in AUTOCOMMIT mode for creating database
             with connection.cursor() as cursor:
-                cursor.execute("DROP DATABASE {};".format(dbName));
+                cursor.execute("DROP DATABASE {};".format(db_name))
                 connection.close()
-        except(psycopg2.DatabaseError) as error:
+        except psycopg2.DatabaseError as error:
             # do nothing, because test db is clean
             print(error.message)
             return
 
-def dbexists(dbName):
-        try:
-            connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
-            cursor = connection.cursor()
-            cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(dbName))
-            
-            db_exists = cursor.fetchall()[0]
-            connection.close()
-            return True
-        except(Exception, psycopg2.DatabaseError) as error:
-            # do nothing, because test db is clean
-            return False
+
+def dbexists(db_name):
+    try:
+        connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
+        cursor = connection.cursor()
+        cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(db_name))
+
+        db_exists = cursor.fetchall()[0]
+        connection.close()
+        return True
+    except(Exception, psycopg2.DatabaseError) as error:
+        # do nothing, because test db is clean
+        return False
 
 
 # Connect to DB "postgres" to check for database "temperatures_berteld_morstein"
-def check_for_db_existence(stationList, dbName):
+def check_for_db_existence(station_list, db_name):
     print("Checking for database existence")
-    if dbexists(dbName):
+    if dbexists(db_name):
         print('DB existing exists')
     else: 
-         create_db(dbName)
-         create_table(stationList, dbName)
+        create_db(db_name)
+        create_table(station_list, db_name)
+
 
 # Connect to DB "temperatures_berteld_morstein" to create table "temperatures"
-def create_table(stationList, dbName):
-    df_columns = list(stationList)
+def create_table(station_list, db_name):
+    df_columns = list(station_list)
     columns = ['id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT']
     for column in df_columns:
         if str(column).startswith('19') or str(column).startswith('20'):
             columns.append('"{}" NUMERIC'.format(column))
     columns_clean = str(columns).strip('[]').replace("'", "")
 
-    with psycopg2.connect(database=dbName, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
+    with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
         with connection.cursor() as cursor:
             query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean))
             #print(query)
             cursor.execute(query)
 
 
-def insert_data(stationList, dbName):
-    with psycopg2.connect(database=dbName, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
+def insert_data(station_list, db_name):
+    with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
         with connection.cursor() as cursor:
 
-            if len(stationList) > 0:
+            if len(station_list) > 0:
                 # print(stationList)
                 cursor.execute("DELETE FROM stations;")
 
-                df_columns = list(stationList)
+                df_columns = list(station_list)
                 # create (col1,col2,...)
 
                 # As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups
@@ -101,15 +105,16 @@ def insert_data(stationList, dbName):
                 for column in df_columns:
                     columns.append('"' + column + '"')
                 columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '')
-                stationList = stationList.round(decimals=3)
+                station_list = station_list.round(decimals=3)
 
                 # create VALUES('%s', '%s",...) one '%s' per column
                 values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
 
                 # create INSERT INTO table (columns) VALUES('%s',...)
                 insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values)
-                psycopg2.extras.execute_batch(cursor, insert_stmt, stationList.values)
+                psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values)
+
 
-def export(stationList):
-    check_for_db_existence(stationList, param_postgres['dbName'])
-    insert_data(stationList, param_postgres['dbName'])
+def export(station_list):
+    check_for_db_existence(station_list, param_postgres['dbName'])
+    insert_data(station_list, param_postgres['dbName'])
diff --git a/dataacquisition/GetAverageData.py b/dataacquisition/GetAverageData.py
new file mode 100644
index 0000000..38d1716
--- /dev/null
+++ b/dataacquisition/GetAverageData.py
@@ -0,0 +1,71 @@
+import configparser
+
+import psycopg2
+from psycopg2 import sql
+
+cfg = configparser.ConfigParser()
+cfg.read('config.ini')
+assert "POSTGRES" in cfg, "missing POSTGRES in config.ini"
+param_postgres = cfg["POSTGRES"]
+
+
+def get_year_columns(cursor):
+    columns = []
+    query = sql.SQL("SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'stations';")
+    cursor.execute(query)
+    results = cursor.fetchall()
+    for result in results:
+        try:
+            columns.append(int(result[0]))
+        except ValueError:
+            pass
+    return columns
+
+
+def get_neighbours(cursor, lat, lon, columns):
+    values = ''  # Used in second parameter of cursor.execute() (Avoids SQL injection)
+    for n in [lat, lon]:
+        values = (*values, n)  # adding n to existing tuple
+
+    query = sql.SQL("""
+                SELECT array_to_json(array_agg(row_to_json(t))) from (
+                    SELECT {columns} FROM stations 
+                    ORDER BY ST_Distance(ST_MakePoint(lon, lat), ST_MakePoint({lon}, {lat})) 
+                    LIMIT 10
+                ) t;
+                    """).format(columns=columns, lon=sql.Placeholder(), lat=sql.Placeholder())
+    cursor.execute(query, values)
+    neighbours = cursor.fetchall()[0][0]
+    return neighbours
+
+
+def calc_averages(neighbours, years):
+    averages = {}
+    for year in years:
+        values = []
+        # print(neighbours)
+        for neighbour in neighbours:
+
+            # print(neighbour[str(year)])
+            if not neighbour[str(year)] == 'NaN': values.append(neighbour[str(year)])
+        avg = round(sum(values) / len(values), 3)
+        averages[year] = avg
+    return averages
+
+
+def get_average_data_for_point(lat, lon, columns):
+    with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
+        with connection.cursor() as cursor:
+            print(columns)
+            if '*' in str(columns):
+                year_columns = get_year_columns(cursor)
+            else:
+                year_columns = (str(columns).replace("""SQL('""", "").replace('"', '').replace("')", "")).split(',')
+                print(year_columns)
+            neighbours = get_neighbours(cursor, lat, lon, columns)
+            avg_data = calc_averages(neighbours, year_columns)
+            print(avg_data)
+            return avg_data
+
+
+# get_average_data_for_point(52.5, 13.4)
diff --git a/dataacquisition/api.py b/dataacquisition/api.py
index aaa673a..d4d984c 100644
--- a/dataacquisition/api.py
+++ b/dataacquisition/api.py
@@ -4,6 +4,7 @@ import psycopg2
 from flask import Flask, jsonify, request
 from psycopg2 import sql
 import configparser
+from GetAverageData import get_average_data_for_point
 
 cfg = configparser.ConfigParser()
 cfg.read('config.ini')
@@ -25,12 +26,6 @@ def index():
     wheres = sql.SQL('')        # where filters
     values = ''                 # Used in second parameter of cursor.execute() (Avoids SQL injection)
 
-    if 'id' in request.args:
-        station_id = request.args['id']
-        wheres = wheres + (sql.SQL("AND id = {values} ").format(column=sql.Identifier('stations', 'id'), values=sql.Placeholder()))
-        for n in [int(station_id)]:
-            values = (*values, n)  # adding n to existing tuple
-
     if 'years' in request.args:
         years = request.args['years'].split(',')
         years_clean = []
@@ -39,29 +34,41 @@ def index():
             years_clean.append('"' + year + '"')
         years_clean = str(years_clean).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '')
 
-        columns = sql.SQL('id, ') + sql.SQL(years_clean)
-
-    # Just for development
-    # if 'country' in request.args:
-    #     country = request.args['country']
-    #     wheres = wheres + (sql.SQL("AND LOWER({column}) LIKE {values} ").format(column=sql.Identifier('stations', 'country'), values=sql.Placeholder()))
-    #     for n in [country]:
-    #         values = (*values, n)  # adding n to existing tuple
-
-    query = sql.SQL("SELECT array_to_json(array_agg(row_to_json(t))) from ("
-                    "SELECT {} FROM stations "
-                    "WHERE lon IS NOT NULL "    # Unnecessary filter, just so the real filters can always be written with AND
-                    "{} "
-                    ") t;").format(columns, wheres)
-
-    with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
-        with connection.cursor() as cursor:
-            print(query.as_string(cursor))
-            print(values)
-            cursor.execute(query, values)
-
-            results = cursor.fetchall()[0][0]
-            return jsonify(results)
+        columns = sql.SQL(years_clean)
+
+    if 'lat' in request.args or 'lng' in request.args:
+        lat = request.args['lat']
+        lon = request.args['lon']
+        return get_average_data_for_point(lat, lon, columns)
+
+    else:
+        if 'id' in request.args:
+            station_id = request.args['id']
+            wheres = wheres + (sql.SQL("AND id = {values} ").format(column=sql.Identifier('stations', 'id'), values=sql.Placeholder()))
+            for n in [int(station_id)]:
+                values = (*values, n)  # adding n to existing tuple
+
+        # Just for development
+        # if 'country' in request.args:
+        #     country = request.args['country']
+        #     wheres = wheres + (sql.SQL("AND LOWER({column}) LIKE {values} ").format(column=sql.Identifier('stations', 'country'), values=sql.Placeholder()))
+        #     for n in [country]:
+        #         values = (*values, n)  # adding n to existing tuple
+
+        query = sql.SQL("SELECT array_to_json(array_agg(row_to_json(t))) from ("
+                        "SELECT id, {} FROM stations "
+                        "WHERE lon IS NOT NULL "    # Unnecessary filter, just so the real filters can always be written with AND
+                        "{} "
+                        ") t;").format(columns, wheres)
+
+        with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
+            with connection.cursor() as cursor:
+                print(query.as_string(cursor))
+                print(values)
+                cursor.execute(query, values)
+
+                results = cursor.fetchall()[0][0]
+                return jsonify(results)
 
 
 app.run(host='127.0.0.1', port=42000)
-- 
GitLab