diff --git a/dataacquisition/DwdAcquisition.py b/dataacquisition/DwdAcquisition.py index 1959456dd76b2fcf07d9389485a80d0318c6c66f..b22851d9a75142d66624573f4e1f41dd5cf19d95 100644 --- a/dataacquisition/DwdAcquisition.py +++ b/dataacquisition/DwdAcquisition.py @@ -177,10 +177,12 @@ def loadTemperatureFromDWDGauges(): continue i += 1 - if i % 10 == 0: + if i % round((len(gaugeCountry.country) / 10)) == 0: finished = i/len(gaugeCountry.country) * 100 print(np.round(finished), end="% ... ") + if i == (len(gaugeCountry.country)): + print('', end=' 100%, Done. \n') stationList.columns = stationList.columns.astype(str) stationList = stationList.sort_index(axis=1, ascending=False) diff --git a/dataacquisition/ExportToDatabase.py b/dataacquisition/ExportToDatabase.py index 09f5604146981d2c7d91cdac26b711d5348691b6..284f403c92e007ffbe5b6371b3c61989253ceab2 100644 --- a/dataacquisition/ExportToDatabase.py +++ b/dataacquisition/ExportToDatabase.py @@ -25,15 +25,11 @@ stationGPD = None # Create DB "temperatures_berteld_morstein" -def create_db(db_name): - print("Create DB: ", db_name) - connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) - connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database - with connection.cursor() as cursor: - create_db_query = sql.SQL("""CREATE DATABASE {};""".format(db_name)) - cursor.execute(create_db_query) - cursor.close() - connection.close() +def create_db(cursor): + print("Creating DB: ", param_postgres['dbName']) + create_db_query = sql.SQL("""CREATE DATABASE {};""".format(param_postgres['dbName'])) + cursor.execute(create_db_query) + print('Done') def drop_db(db_name): @@ -50,31 +46,31 @@ def drop_db(db_name): # Checks, if database "temperatures_berteld_morstein" exists -def dbexists(db_name): +def dbexists(cursor): try: - with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: - with connection.cursor() as cursor: - cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(db_name)) - - db_exists = cursor.fetchall()[0] - return True + cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(param_postgres['dbName'])) + db_exists = cursor.fetchall()[0] + print('DB existing') + print('Updating data') + return True except(Exception, psycopg2.DatabaseError) as error: - # do nothing, because test db is clean + print('No DB found') return False # If database "temperatures_berteld_morstein" not exists, create it -def check_for_db_existence(station_list, db_name): +def check_for_db_existence(cursor): print("Checking for database existence") - if dbexists(db_name): - print('DB existing') + if dbexists(cursor): + return True else: - create_db(db_name) - create_table(station_list, db_name) + create_db(cursor) + return False # Connect to DB "temperatures_berteld_morstein" to create table "temperatures". Also installs PostGIS -def create_table(station_list, db_name): +def create_table(station_list, cursor): + print('Creating table stations') df_columns = list(station_list) columns = ['id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT'] for column in df_columns: @@ -82,93 +78,109 @@ def create_table(station_list, db_name): columns.append('"{}" NUMERIC'.format(column)) columns_clean = str(columns).strip('[]').replace("'", "") - with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: - with connection.cursor() as cursor: - query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean)) - #print(query) - cursor.execute(query) - cursor.execute('CREATE EXTENSION postgis;') + query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean)) + #print(query) + cursor.execute(query) + cursor.execute('CREATE EXTENSION postgis;') + print('Done') # Loading matrix coordinates from csv and writing it into table "stations" in db "temperatures_berteld_morstein" -def insert_empty_matrix_into_db(): +def insert_empty_matrix_into_db(cursor): print('Inserting empty matrix into database') matrix_density = param_interpol['matrix_density'] - with psycopg2.connect(database=param_postgres['dbName'], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: - with connection.cursor() as cursor: - with open('clipped_matrix_{}x{}.csv'.format(matrix_density, matrix_density), 'r') as matrix: - matrix_data = matrix.readlines() - for line in matrix_data[1:]: - values = '' # Used in second parameter of cursor.execute() (Avoids SQL injection) - data = line.split(';') - id = int("9999" + data[0].replace('"', '')) - lon = float(data[1]) - lat = float(data[2].replace('\n', '')) - for n in [id, lon, lat]: - values = (*values, n) # adding n to existing tuple - - query = sql.SQL("INSERT INTO STATIONS (id, lon, lat, country) " - "VALUES ({id}, {lon}, {lat}, 'Germany');").format(id=sql.Placeholder(), lon=sql.Placeholder(), lat=sql.Placeholder()) - # print(query.as_string(cursor)) - # print(values) - cursor.execute(query, values) + + with open('clipped_matrix_{}x{}.csv'.format(matrix_density, matrix_density), 'r') as matrix: + matrix_data = matrix.readlines() + matrix_points = 0 + for line in matrix_data[1:]: + matrix_points += 1 + values = '' # Used in second parameter of cursor.execute() (Avoids SQL injection) + data = line.split(';') + id = int("9999" + data[0].replace('"', '')) + lon = float(data[1]) + lat = float(data[2].replace('\n', '')) + for n in [id, lon, lat]: + values = (*values, n) # adding n to existing tuple + + query = sql.SQL("INSERT INTO STATIONS (id, lon, lat, country) " + "VALUES ({id}, {lon}, {lat}, 'Germany');").format(id=sql.Placeholder(), lon=sql.Placeholder(), lat=sql.Placeholder()) + # print(query.as_string(cursor)) + # print(values) + cursor.execute(query, values) + print('Done') + return matrix_points # Calculating interpolation data for matrix using the according function from the API, and writing it into the database in bulk -def create_matrix_data(): +def create_matrix_data(cursor, amount_points): print('Calculating interpolation data for matrix') # start_time = time.time() - with psycopg2.connect(database=param_postgres['dbName'], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"], - keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5) as connection: - with connection.cursor() as cursor: - cursor.execute("SELECT id, lon, lat FROM stations WHERE file is NULL;") - matrix_points = cursor.fetchall() - update_data = [] - for point in matrix_points: - id = point[0] - lon = point[1] - lat = point[2] - interpolation_data = get_interpolation_data_for_point(lat, lon, sql.SQL('*')) - - for year, value in interpolation_data.items(): - update_data.append({'year': year, 'value': value, 'id': id}) - - query = sql.SQL("""UPDATE stations SET "%(year)s" = %(value)s WHERE id = %(id)s; """) - psycopg2.extras.execute_batch(cursor, query, update_data) # Multiple times faster than using execute() in a for loop + cursor.execute("SELECT id, lon, lat FROM stations WHERE file is NULL;") + matrix_points = cursor.fetchall() + update_data = [] + for i, point in enumerate(matrix_points): + id = point[0] + lon = point[1] + lat = point[2] + interpolation_data = get_interpolation_data_for_point(lat, lon, sql.SQL('*'), cursor) + + for year, value in interpolation_data.items(): + update_data.append({'year': year, 'value': value, 'id': id}) + if i % round((amount_points / 10)) == 0: + finished = i / amount_points * 100 + print(round(finished), end="% ... ") + print('', end=' 100%, Done. \n') + + query = sql.SQL("""UPDATE stations SET "%(year)s" = %(value)s WHERE id = %(id)s; """) + print('Writing interpolation data to database') + psycopg2.extras.execute_batch(cursor, query, update_data) # Multiple times faster than using execute() in a for loop # print((time.time() - start_time), 'seconds') + print('Done') # Dumping all existing data from database. Inserting station data into database in bulk. -def insert_data(station_list, db_name): +def insert_data(station_list, cursor): print('Inserting data into database') - with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: - with connection.cursor() as cursor: - - if len(station_list) > 0: - # print(stationList) - cursor.execute("DELETE FROM stations;") + if len(station_list) > 0: + # print(stationList) + cursor.execute("DELETE FROM stations;") - df_columns = list(station_list) - # create (col1,col2,...) + df_columns = list(station_list) + # create (col1,col2,...) - # As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups - columns = ['"' + column + '"' for column in df_columns] - # for column in df_columns: - # columns.append('"' + column + '"') - columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '') - station_list = station_list.round(decimals=3) + # As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups + columns = ['"' + column + '"' for column in df_columns] + # for column in df_columns: + # columns.append('"' + column + '"') + columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '') + station_list = station_list.round(decimals=3) - # create VALUES('%s', '%s",...) one '%s' per column - values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) + # create VALUES('%s', '%s",...) one '%s' per column + values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) - # create INSERT INTO table (columns) VALUES('%s',...) - insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values) - psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values) + # create INSERT INTO table (columns) VALUES('%s',...) + insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values) + psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values) + print('Done') def export(station_list): - check_for_db_existence(station_list, param_postgres['dbName']) - insert_data(station_list, param_postgres['dbName']) - insert_empty_matrix_into_db() - create_matrix_data() + db_name, user, pw, host, port = param_postgres['dbName'], param_postgres["user"], param_postgres["password"], param_postgres["host"], param_postgres["port"] + + with psycopg2.connect(database='postgres', user=user, password=pw, host=param_postgres["host"], port=port) as connection: + connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database + with connection.cursor() as cursor: + new_db = False if check_for_db_existence(cursor) else True + + with psycopg2.connect(database=db_name, user=user, password=pw, host=param_postgres["host"], port=port) as connection: + with connection.cursor() as cursor: + if new_db: create_table(station_list, cursor) + insert_data(station_list, cursor) + amount_points = insert_empty_matrix_into_db(cursor) + + with psycopg2.connect(database=db_name, user=user, password=pw, host=param_postgres["host"], port=port, + keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5) as connection: + with connection.cursor() as cursor: + create_matrix_data(cursor, amount_points) print('Installation successful. You can run the api.py now.') diff --git a/dataacquisition/GetAverageData.py b/dataacquisition/GetAverageData.py index 188a830b0f27e0f14d7cfc59b3fa89d468570d2f..bebb4fcc9d7c09eed5fff513c4e0b50cbdab7627 100644 --- a/dataacquisition/GetAverageData.py +++ b/dataacquisition/GetAverageData.py @@ -80,17 +80,15 @@ def calc_idw(neighbours, years): # Collecting preparation data and execute interpolation -def get_interpolation_data_for_point(lat, lon, columns): - with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: - with connection.cursor() as cursor: - if '*' in str(columns): - year_columns = get_year_columns(cursor) - else: - year_columns = (str(columns).replace("""SQL('""", "").replace('"', '').replace("')", "")).split(',') - neighbours = get_neighbours(cursor, lat, lon, columns) - avg_data = calc_idw(neighbours, year_columns) - # print(avg_data) - return avg_data +def get_interpolation_data_for_point(lat, lon, columns, cursor): + if '*' in str(columns): + year_columns = get_year_columns(cursor) + else: + year_columns = (str(columns).replace("""SQL('""", "").replace('"', '').replace("')", "")).split(',') + neighbours = get_neighbours(cursor, lat, lon, columns) + avg_data = calc_idw(neighbours, year_columns) + # print(avg_data) + return avg_data # get_average_data_for_point(52.5, 13.4) diff --git a/dataacquisition/api.py b/dataacquisition/api.py index 2366bfead036fefd646234941d37b53b987c6d24..350becc2fa110636f8e33feee46b8fc9cf0f87b5 100644 --- a/dataacquisition/api.py +++ b/dataacquisition/api.py @@ -41,7 +41,9 @@ def index(): if 'lat' in request.args or 'lng' in request.args: lat = request.args['lat'] lon = request.args['lon'] - return get_interpolation_data_for_point(lat, lon, columns) + with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: + with connection.cursor() as cursor: + return get_interpolation_data_for_point(lat, lon, columns, cursor) else: if 'id' in request.args: diff --git a/dataacquisition/sandbox.py b/dataacquisition/sandbox.py index 1cb8907a5b42369202f8951ff226699337757b67..4fff5afc85cda81d63dbb80bdffd71a62361b29e 100644 --- a/dataacquisition/sandbox.py +++ b/dataacquisition/sandbox.py @@ -1,58 +1,10 @@ -import configparser +import time -import psycopg2 -from psycopg2 import sql -cfg = configparser.ConfigParser() -cfg.read('config.ini') +top = 101 -assert "POSTGRES" in cfg, "missing POSTGRES in config.ini" +for i in range(0, top): + time.sleep(0.1) + if i % 10 == 0: + finished = i / top * 100 -param_postgres = cfg["POSTGRES"] -param_interpol = cfg["INTERPOLATION"] -# -# query = sql.SQL("select {0} from {1} where {2} LIKE {3} {4}").format(sql.SQL(', ').join([sql.Identifier('foo'), sql.Identifier('bar')]), sql.Identifier('table'), sql.Identifier('coulumn'), sql.Placeholder(), -# sql.SQL('AND 1 = 1')) -# -# with psycopg2.connect(database='temperatures_berteld_morstein', user='postgres', password='postgres', host='localhost', port=5432) as connection: -# with connection.cursor() as cursor: -# print(query.as_string(cursor)) - - -# weights = [2, 1.25, 0.75, 0.6, 0.4] -# neighbours = [{'2015': 'NaN', '2010': 9.333, '2014': 'NaN', '1998': 'NaN', 'distance': 0.0223606797750006}, {'2015': 11.233, '2010': 8.883, '2014': 11.458, '1998': 10.05, 'distance': 0.0300000000000011}, {'2015': 11.133, '2010': 8.767, '2014': 11.467, '1998': 'NaN', 'distance': 0.108166538263921}, {'2015': 10.667, '2010': 8.208, '2014': 11.025, '1998': 9.8, 'distance': 0.111803398874988}, {'2015': 10.667, '2010': 8.35, '2014': 10.908, '1998': 'NaN', 'distance': 0.176918060129539}] -# -# for weight, neighbour in zip(weights, neighbours): -# print(weight, neighbour['2015']) - - -# list_a = [1,2,3,4,5,6] -# list_b = [x * 2 for x in list_a if x % 2 == 0] -# print(list_b) - -# list_a = [1,2,3,4,5,6,7,8,9,10] -# new_list = list_a[0::2] -# print(new_list) - - -def get_neighbours(lat, lon, columns): - values = '' # Used in second parameter of cursor.execute() (Avoids SQL injection) - for n in [lat, lon]: - values = (*values, n) # adding n to existing tuple - with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: - with connection.cursor() as cursor: - query = sql.SQL(""" - SELECT array_to_json(array_agg(row_to_json(t))) from ( - SELECT {columns}, ST_Distance(ST_MakePoint(lat, lon), ST_MakePoint({lon}, {lat})) AS distance - FROM stations - WHERE file IS NOT NULL - ORDER BY distance - LIMIT {amount_neighbours} - ) t; - """).format(columns=columns, lon=sql.Placeholder(), lat=sql.Placeholder(), amount_neighbours=sql.SQL(param_interpol["amount_neighbours"])) - cursor.execute(query, values) - neighbours = cursor.fetchall()[0][0] - print(neighbours) - # return neighbours - - -get_neighbours(53.42, 9.24, sql.SQL('"2015", "2014", "2010"')) \ No newline at end of file + print(round(finished), end="% ... ") diff --git a/dataacquisition/sandbox_sqlite.py b/dataacquisition/sandbox_sqlite.py new file mode 100644 index 0000000000000000000000000000000000000000..15c862fe5f78cd0c88bc9feff75307795c144bfe --- /dev/null +++ b/dataacquisition/sandbox_sqlite.py @@ -0,0 +1,22 @@ +import os +import sqlite3 +from pathlib import Path +from sqlite3 import Error +cwd = Path(os.path.dirname(os.path.abspath(__file__))) +print(cwd) + + +def create_connection(db_file): + """ create a database connection to a SQLite database """ + conn = None + try: + conn = sqlite3.connect(db_file) + print(sqlite3.version) + except Error as e: + print(e) + finally: + if conn: + conn.close() + + +create_connection('temperatures.db') # ':memory:' for saving in RAM