From 4138e75d968351db1fe7e8f90e5a41f8794357cc Mon Sep 17 00:00:00 2001
From: Clemens Berteld <clemens@berteld.com>
Date: Sat, 18 Sep 2021 16:56:36 +0200
Subject: [PATCH] Lots of cleanups. Database connections are pretty efficient
 now.

---
 dataacquisition/DwdAcquisition.py   |   4 +-
 dataacquisition/ExportToDatabase.py | 192 +++++++++++++++-------------
 dataacquisition/GetAverageData.py   |  20 ++-
 dataacquisition/api.py              |   4 +-
 dataacquisition/sandbox.py          |  62 +--------
 dataacquisition/sandbox_sqlite.py   |  22 ++++
 6 files changed, 146 insertions(+), 158 deletions(-)
 create mode 100644 dataacquisition/sandbox_sqlite.py

diff --git a/dataacquisition/DwdAcquisition.py b/dataacquisition/DwdAcquisition.py
index 1959456..b22851d 100644
--- a/dataacquisition/DwdAcquisition.py
+++ b/dataacquisition/DwdAcquisition.py
@@ -177,10 +177,12 @@ def loadTemperatureFromDWDGauges():
                     continue
             i += 1
             
-            if i % 10 == 0:
+            if i % round((len(gaugeCountry.country) / 10)) == 0:
                 finished = i/len(gaugeCountry.country) * 100
                 
                 print(np.round(finished), end="% ... ")
+            if i == (len(gaugeCountry.country)):
+                print('', end=' 100%, Done. \n')
             
     stationList.columns = stationList.columns.astype(str)
     stationList = stationList.sort_index(axis=1, ascending=False)
diff --git a/dataacquisition/ExportToDatabase.py b/dataacquisition/ExportToDatabase.py
index 09f5604..284f403 100644
--- a/dataacquisition/ExportToDatabase.py
+++ b/dataacquisition/ExportToDatabase.py
@@ -25,15 +25,11 @@ stationGPD = None
 
 
 # Create DB "temperatures_berteld_morstein"
-def create_db(db_name):
-    print("Create DB: ", db_name)
-    connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
-    connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)  # Needs to be in AUTOCOMMIT mode for creating database
-    with connection.cursor() as cursor:
-        create_db_query = sql.SQL("""CREATE DATABASE {};""".format(db_name))
-        cursor.execute(create_db_query)
-        cursor.close()
-        connection.close()
+def create_db(cursor):
+    print("Creating DB: ", param_postgres['dbName'])
+    create_db_query = sql.SQL("""CREATE DATABASE {};""".format(param_postgres['dbName']))
+    cursor.execute(create_db_query)
+    print('Done')
 
 
 def drop_db(db_name):
@@ -50,31 +46,31 @@ def drop_db(db_name):
 
 
 # Checks, if database "temperatures_berteld_morstein" exists
-def dbexists(db_name):
+def dbexists(cursor):
     try:
-        with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
-            with connection.cursor() as cursor:
-                cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(db_name))
-
-                db_exists = cursor.fetchall()[0]
-                return True
+        cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(param_postgres['dbName']))
+        db_exists = cursor.fetchall()[0]
+        print('DB existing')
+        print('Updating data')
+        return True
     except(Exception, psycopg2.DatabaseError) as error:
-        # do nothing, because test db is clean
+        print('No DB found')
         return False
 
 
 # If database "temperatures_berteld_morstein" not exists, create it
-def check_for_db_existence(station_list, db_name):
+def check_for_db_existence(cursor):
     print("Checking for database existence")
-    if dbexists(db_name):
-        print('DB existing')
+    if dbexists(cursor):
+        return True
     else: 
-        create_db(db_name)
-        create_table(station_list, db_name)
+        create_db(cursor)
+        return False
 
 
 # Connect to DB "temperatures_berteld_morstein" to create table "temperatures". Also installs PostGIS
-def create_table(station_list, db_name):
+def create_table(station_list, cursor):
+    print('Creating table stations')
     df_columns = list(station_list)
     columns = ['id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT']
     for column in df_columns:
@@ -82,93 +78,109 @@ def create_table(station_list, db_name):
             columns.append('"{}" NUMERIC'.format(column))
     columns_clean = str(columns).strip('[]').replace("'", "")
 
-    with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
-        with connection.cursor() as cursor:
-            query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean))
-            #print(query)
-            cursor.execute(query)
-            cursor.execute('CREATE EXTENSION postgis;')
+    query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean))
+    #print(query)
+    cursor.execute(query)
+    cursor.execute('CREATE EXTENSION postgis;')
+    print('Done')
 
 
 # Loading matrix coordinates from csv and writing it into table "stations" in db "temperatures_berteld_morstein"
-def insert_empty_matrix_into_db():
+def insert_empty_matrix_into_db(cursor):
     print('Inserting empty matrix into database')
     matrix_density = param_interpol['matrix_density']
-    with psycopg2.connect(database=param_postgres['dbName'], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
-        with connection.cursor() as cursor:
-            with open('clipped_matrix_{}x{}.csv'.format(matrix_density, matrix_density), 'r') as matrix:
-                matrix_data = matrix.readlines()
-                for line in matrix_data[1:]:
-                    values = ''  # Used in second parameter of cursor.execute() (Avoids SQL injection)
-                    data = line.split(';')
-                    id = int("9999" + data[0].replace('"', ''))
-                    lon = float(data[1])
-                    lat = float(data[2].replace('\n', ''))
-                    for n in [id, lon, lat]:
-                        values = (*values, n)  # adding n to existing tuple
-
-                    query = sql.SQL("INSERT INTO STATIONS (id, lon, lat, country) "
-                                    "VALUES ({id}, {lon}, {lat}, 'Germany');").format(id=sql.Placeholder(), lon=sql.Placeholder(), lat=sql.Placeholder())
-                    # print(query.as_string(cursor))
-                    # print(values)
-                    cursor.execute(query, values)
+
+    with open('clipped_matrix_{}x{}.csv'.format(matrix_density, matrix_density), 'r') as matrix:
+        matrix_data = matrix.readlines()
+        matrix_points = 0
+        for line in matrix_data[1:]:
+            matrix_points += 1
+            values = ''  # Used in second parameter of cursor.execute() (Avoids SQL injection)
+            data = line.split(';')
+            id = int("9999" + data[0].replace('"', ''))
+            lon = float(data[1])
+            lat = float(data[2].replace('\n', ''))
+            for n in [id, lon, lat]:
+                values = (*values, n)  # adding n to existing tuple
+
+            query = sql.SQL("INSERT INTO STATIONS (id, lon, lat, country) "
+                            "VALUES ({id}, {lon}, {lat}, 'Germany');").format(id=sql.Placeholder(), lon=sql.Placeholder(), lat=sql.Placeholder())
+            # print(query.as_string(cursor))
+            # print(values)
+            cursor.execute(query, values)
+        print('Done')
+        return matrix_points
 
 
 # Calculating interpolation data for matrix using the according function from the API, and writing it into the database in bulk
-def create_matrix_data():
+def create_matrix_data(cursor, amount_points):
     print('Calculating interpolation data for matrix')
     # start_time = time.time()
-    with psycopg2.connect(database=param_postgres['dbName'], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"],
-                          keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5) as connection:
-        with connection.cursor() as cursor:
-            cursor.execute("SELECT id, lon, lat FROM stations WHERE file is NULL;")
-            matrix_points = cursor.fetchall()
-            update_data = []
-            for point in matrix_points:
-                id = point[0]
-                lon = point[1]
-                lat = point[2]
-                interpolation_data = get_interpolation_data_for_point(lat, lon, sql.SQL('*'))
-
-                for year, value in interpolation_data.items():
-                    update_data.append({'year': year, 'value': value, 'id': id})
-
-            query = sql.SQL("""UPDATE stations SET "%(year)s" = %(value)s WHERE id = %(id)s; """)
-            psycopg2.extras.execute_batch(cursor, query, update_data)   # Multiple times faster than using execute() in a for loop
+    cursor.execute("SELECT id, lon, lat FROM stations WHERE file is NULL;")
+    matrix_points = cursor.fetchall()
+    update_data = []
+    for i, point in enumerate(matrix_points):
+        id = point[0]
+        lon = point[1]
+        lat = point[2]
+        interpolation_data = get_interpolation_data_for_point(lat, lon, sql.SQL('*'), cursor)
+
+        for year, value in interpolation_data.items():
+            update_data.append({'year': year, 'value': value, 'id': id})
+        if i % round((amount_points / 10)) == 0:
+            finished = i / amount_points * 100
+            print(round(finished), end="% ... ")
+    print('', end=' 100%, Done. \n')
+
+    query = sql.SQL("""UPDATE stations SET "%(year)s" = %(value)s WHERE id = %(id)s; """)
+    print('Writing interpolation data to database')
+    psycopg2.extras.execute_batch(cursor, query, update_data)   # Multiple times faster than using execute() in a for loop
     # print((time.time() - start_time), 'seconds')
+    print('Done')
 
 
 # Dumping all existing data from database. Inserting station data into database in bulk.
-def insert_data(station_list, db_name):
+def insert_data(station_list, cursor):
     print('Inserting data into database')
-    with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
-        with connection.cursor() as cursor:
-
-            if len(station_list) > 0:
-                # print(stationList)
-                cursor.execute("DELETE FROM stations;")
+    if len(station_list) > 0:
+        # print(stationList)
+        cursor.execute("DELETE FROM stations;")
 
-                df_columns = list(station_list)
-                # create (col1,col2,...)
+        df_columns = list(station_list)
+        # create (col1,col2,...)
 
-                # As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups
-                columns = ['"' + column + '"' for column in df_columns]
-                # for column in df_columns:
-                #     columns.append('"' + column + '"')
-                columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '')
-                station_list = station_list.round(decimals=3)
+        # As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups
+        columns = ['"' + column + '"' for column in df_columns]
+        # for column in df_columns:
+        #     columns.append('"' + column + '"')
+        columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '')
+        station_list = station_list.round(decimals=3)
 
-                # create VALUES('%s', '%s",...) one '%s' per column
-                values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
+        # create VALUES('%s', '%s",...) one '%s' per column
+        values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
 
-                # create INSERT INTO table (columns) VALUES('%s',...)
-                insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values)
-                psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values)
+        # create INSERT INTO table (columns) VALUES('%s',...)
+        insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values)
+        psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values)
+    print('Done')
 
 
 def export(station_list):
-    check_for_db_existence(station_list, param_postgres['dbName'])
-    insert_data(station_list, param_postgres['dbName'])
-    insert_empty_matrix_into_db()
-    create_matrix_data()
+    db_name, user, pw, host, port = param_postgres['dbName'], param_postgres["user"], param_postgres["password"], param_postgres["host"], param_postgres["port"]
+
+    with psycopg2.connect(database='postgres', user=user, password=pw, host=param_postgres["host"], port=port) as connection:
+        connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)  # Needs to be in AUTOCOMMIT mode for creating database
+        with connection.cursor() as cursor:
+            new_db = False if check_for_db_existence(cursor) else True
+
+    with psycopg2.connect(database=db_name, user=user, password=pw, host=param_postgres["host"], port=port) as connection:
+        with connection.cursor() as cursor:
+            if new_db: create_table(station_list, cursor)
+            insert_data(station_list, cursor)
+            amount_points = insert_empty_matrix_into_db(cursor)
+
+    with psycopg2.connect(database=db_name, user=user, password=pw, host=param_postgres["host"], port=port,
+                          keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5) as connection:
+        with connection.cursor() as cursor:
+            create_matrix_data(cursor, amount_points)
     print('Installation successful. You can run the api.py now.')
diff --git a/dataacquisition/GetAverageData.py b/dataacquisition/GetAverageData.py
index 188a830..bebb4fc 100644
--- a/dataacquisition/GetAverageData.py
+++ b/dataacquisition/GetAverageData.py
@@ -80,17 +80,15 @@ def calc_idw(neighbours, years):
 
 
 # Collecting preparation data and execute interpolation
-def get_interpolation_data_for_point(lat, lon, columns):
-    with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
-        with connection.cursor() as cursor:
-            if '*' in str(columns):
-                year_columns = get_year_columns(cursor)
-            else:
-                year_columns = (str(columns).replace("""SQL('""", "").replace('"', '').replace("')", "")).split(',')
-            neighbours = get_neighbours(cursor, lat, lon, columns)
-            avg_data = calc_idw(neighbours, year_columns)
-            # print(avg_data)
-            return avg_data
+def get_interpolation_data_for_point(lat, lon, columns, cursor):
+    if '*' in str(columns):
+        year_columns = get_year_columns(cursor)
+    else:
+        year_columns = (str(columns).replace("""SQL('""", "").replace('"', '').replace("')", "")).split(',')
+    neighbours = get_neighbours(cursor, lat, lon, columns)
+    avg_data = calc_idw(neighbours, year_columns)
+    # print(avg_data)
+    return avg_data
 
 
 # get_average_data_for_point(52.5, 13.4)
diff --git a/dataacquisition/api.py b/dataacquisition/api.py
index 2366bfe..350becc 100644
--- a/dataacquisition/api.py
+++ b/dataacquisition/api.py
@@ -41,7 +41,9 @@ def index():
     if 'lat' in request.args or 'lng' in request.args:
         lat = request.args['lat']
         lon = request.args['lon']
-        return get_interpolation_data_for_point(lat, lon, columns)
+        with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
+            with connection.cursor() as cursor:
+                return get_interpolation_data_for_point(lat, lon, columns, cursor)
 
     else:
         if 'id' in request.args:
diff --git a/dataacquisition/sandbox.py b/dataacquisition/sandbox.py
index 1cb8907..4fff5af 100644
--- a/dataacquisition/sandbox.py
+++ b/dataacquisition/sandbox.py
@@ -1,58 +1,10 @@
-import configparser
+import time
 
-import psycopg2
-from psycopg2 import sql
-cfg = configparser.ConfigParser()
-cfg.read('config.ini')
+top = 101
 
-assert "POSTGRES" in cfg, "missing POSTGRES in config.ini"
+for i in range(0, top):
+    time.sleep(0.1)
+    if i % 10 == 0:
+        finished = i / top * 100
 
-param_postgres = cfg["POSTGRES"]
-param_interpol = cfg["INTERPOLATION"]
-#
-# query = sql.SQL("select {0} from {1} where {2} LIKE {3} {4}").format(sql.SQL(', ').join([sql.Identifier('foo'), sql.Identifier('bar')]), sql.Identifier('table'), sql.Identifier('coulumn'), sql.Placeholder(),
-#                                                                      sql.SQL('AND 1 = 1'))
-#
-# with psycopg2.connect(database='temperatures_berteld_morstein', user='postgres', password='postgres', host='localhost', port=5432) as connection:
-#     with connection.cursor() as cursor:
-#         print(query.as_string(cursor))
-
-
-# weights = [2, 1.25, 0.75, 0.6, 0.4]
-# neighbours = [{'2015': 'NaN', '2010': 9.333, '2014': 'NaN', '1998': 'NaN', 'distance': 0.0223606797750006}, {'2015': 11.233, '2010': 8.883, '2014': 11.458, '1998': 10.05, 'distance': 0.0300000000000011}, {'2015': 11.133, '2010': 8.767, '2014': 11.467, '1998': 'NaN', 'distance': 0.108166538263921}, {'2015': 10.667, '2010': 8.208, '2014': 11.025, '1998': 9.8, 'distance': 0.111803398874988}, {'2015': 10.667, '2010': 8.35, '2014': 10.908, '1998': 'NaN', 'distance': 0.176918060129539}]
-#
-# for weight, neighbour in zip(weights, neighbours):
-#     print(weight, neighbour['2015'])
-
-
-# list_a = [1,2,3,4,5,6]
-# list_b = [x * 2 for x in list_a if x % 2 == 0]
-# print(list_b)
-
-# list_a = [1,2,3,4,5,6,7,8,9,10]
-# new_list = list_a[0::2]
-# print(new_list)
-
-
-def get_neighbours(lat, lon, columns):
-    values = ''  # Used in second parameter of cursor.execute() (Avoids SQL injection)
-    for n in [lat, lon]:
-        values = (*values, n)  # adding n to existing tuple
-    with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
-        with connection.cursor() as cursor:
-            query = sql.SQL("""
-                        SELECT array_to_json(array_agg(row_to_json(t))) from (
-                            SELECT {columns}, ST_Distance(ST_MakePoint(lat, lon), ST_MakePoint({lon}, {lat})) AS distance 
-                            FROM stations 
-                            WHERE file IS NOT NULL 
-                            ORDER BY distance 
-                            LIMIT {amount_neighbours}
-                        ) t;
-                            """).format(columns=columns, lon=sql.Placeholder(), lat=sql.Placeholder(), amount_neighbours=sql.SQL(param_interpol["amount_neighbours"]))
-            cursor.execute(query, values)
-            neighbours = cursor.fetchall()[0][0]
-            print(neighbours)
-    # return neighbours
-
-
-get_neighbours(53.42, 9.24, sql.SQL('"2015", "2014", "2010"'))
\ No newline at end of file
+        print(round(finished), end="% ... ")
diff --git a/dataacquisition/sandbox_sqlite.py b/dataacquisition/sandbox_sqlite.py
new file mode 100644
index 0000000..15c862f
--- /dev/null
+++ b/dataacquisition/sandbox_sqlite.py
@@ -0,0 +1,22 @@
+import os
+import sqlite3
+from pathlib import Path
+from sqlite3 import Error
+cwd = Path(os.path.dirname(os.path.abspath(__file__)))
+print(cwd)
+
+
+def create_connection(db_file):
+    """ create a database connection to a SQLite database """
+    conn = None
+    try:
+        conn = sqlite3.connect(db_file)
+        print(sqlite3.version)
+    except Error as e:
+        print(e)
+    finally:
+        if conn:
+            conn.close()
+
+
+create_connection('temperatures.db')    # ':memory:' for saving in RAM
-- 
GitLab