diff --git a/dataacquisition/ExportToDatabase.py b/dataacquisition/ExportToDatabase.py index 8523ef4e2ea80a8bbfc7340e0577cdad57ca8b97..0c18685140f3dc9eb21f3941a03f30163abd93a3 100644 --- a/dataacquisition/ExportToDatabase.py +++ b/dataacquisition/ExportToDatabase.py @@ -22,78 +22,82 @@ param_postgres = cfg["POSTGRES"] stationGPD = None + # Use existing connection to DB "postgres" to create DB "temperatures_berteld_morstein" -def create_db(dbName): - print("Create DB: ",dbName) +def create_db(db_name): + print("Create DB: ", db_name) connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database with connection.cursor() as cursor: - create_db_query = sql.SQL("""CREATE DATABASE {};""".format(dbName)) + create_db_query = sql.SQL("""CREATE DATABASE {};""".format(db_name)) cursor.execute(create_db_query) connection.close() - -def drop_db(dbName): - if dbexists(dbName): - print("Drop DB: ",dbName) + + +def drop_db(db_name): + if dbexists(db_name): + print("Drop DB: ", db_name) try: connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) - connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database + connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database with connection.cursor() as cursor: - cursor.execute("DROP DATABASE {};".format(dbName)); + cursor.execute("DROP DATABASE {};".format(db_name)) connection.close() - except(psycopg2.DatabaseError) as error: + except psycopg2.DatabaseError as error: # do nothing, because test db is clean print(error.message) return -def dbexists(dbName): - try: - connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) - cursor = connection.cursor() - cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(dbName)) - - db_exists = cursor.fetchall()[0] - connection.close() - return True - except(Exception, psycopg2.DatabaseError) as error: - # do nothing, because test db is clean - return False + +def dbexists(db_name): + try: + connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) + cursor = connection.cursor() + cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(db_name)) + + db_exists = cursor.fetchall()[0] + connection.close() + return True + except(Exception, psycopg2.DatabaseError) as error: + # do nothing, because test db is clean + return False # Connect to DB "postgres" to check for database "temperatures_berteld_morstein" -def check_for_db_existence(stationList, dbName): +def check_for_db_existence(station_list, db_name): print("Checking for database existence") - if dbexists(dbName): + if dbexists(db_name): print('DB existing exists') else: - create_db(dbName) - create_table(stationList, dbName) + create_db(db_name) + create_table(station_list, db_name) + # Connect to DB "temperatures_berteld_morstein" to create table "temperatures" -def create_table(stationList, dbName): - df_columns = list(stationList) +def create_table(station_list, db_name): + df_columns = list(station_list) columns = ['id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT'] for column in df_columns: if str(column).startswith('19') or str(column).startswith('20'): columns.append('"{}" NUMERIC'.format(column)) columns_clean = str(columns).strip('[]').replace("'", "") - with psycopg2.connect(database=dbName, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: + with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: with connection.cursor() as cursor: query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean)) #print(query) cursor.execute(query) -def insert_data(stationList, dbName): - with psycopg2.connect(database=dbName, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: +def insert_data(station_list, db_name): + with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: with connection.cursor() as cursor: - if len(stationList) > 0: + if len(station_list) > 0: # print(stationList) cursor.execute("DELETE FROM stations;") - df_columns = list(stationList) + df_columns = list(station_list) # create (col1,col2,...) # As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups @@ -101,15 +105,16 @@ def insert_data(stationList, dbName): for column in df_columns: columns.append('"' + column + '"') columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '') - stationList = stationList.round(decimals=3) + station_list = station_list.round(decimals=3) # create VALUES('%s', '%s",...) one '%s' per column values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) # create INSERT INTO table (columns) VALUES('%s',...) insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values) - psycopg2.extras.execute_batch(cursor, insert_stmt, stationList.values) + psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values) + -def export(stationList): - check_for_db_existence(stationList, param_postgres['dbName']) - insert_data(stationList, param_postgres['dbName']) +def export(station_list): + check_for_db_existence(station_list, param_postgres['dbName']) + insert_data(station_list, param_postgres['dbName']) diff --git a/dataacquisition/GetAverageData.py b/dataacquisition/GetAverageData.py new file mode 100644 index 0000000000000000000000000000000000000000..38d17169c9835a7d51ac7941ac231280a55bf75c --- /dev/null +++ b/dataacquisition/GetAverageData.py @@ -0,0 +1,71 @@ +import configparser + +import psycopg2 +from psycopg2 import sql + +cfg = configparser.ConfigParser() +cfg.read('config.ini') +assert "POSTGRES" in cfg, "missing POSTGRES in config.ini" +param_postgres = cfg["POSTGRES"] + + +def get_year_columns(cursor): + columns = [] + query = sql.SQL("SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'stations';") + cursor.execute(query) + results = cursor.fetchall() + for result in results: + try: + columns.append(int(result[0])) + except ValueError: + pass + return columns + + +def get_neighbours(cursor, lat, lon, columns): + values = '' # Used in second parameter of cursor.execute() (Avoids SQL injection) + for n in [lat, lon]: + values = (*values, n) # adding n to existing tuple + + query = sql.SQL(""" + SELECT array_to_json(array_agg(row_to_json(t))) from ( + SELECT {columns} FROM stations + ORDER BY ST_Distance(ST_MakePoint(lon, lat), ST_MakePoint({lon}, {lat})) + LIMIT 10 + ) t; + """).format(columns=columns, lon=sql.Placeholder(), lat=sql.Placeholder()) + cursor.execute(query, values) + neighbours = cursor.fetchall()[0][0] + return neighbours + + +def calc_averages(neighbours, years): + averages = {} + for year in years: + values = [] + # print(neighbours) + for neighbour in neighbours: + + # print(neighbour[str(year)]) + if not neighbour[str(year)] == 'NaN': values.append(neighbour[str(year)]) + avg = round(sum(values) / len(values), 3) + averages[year] = avg + return averages + + +def get_average_data_for_point(lat, lon, columns): + with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: + with connection.cursor() as cursor: + print(columns) + if '*' in str(columns): + year_columns = get_year_columns(cursor) + else: + year_columns = (str(columns).replace("""SQL('""", "").replace('"', '').replace("')", "")).split(',') + print(year_columns) + neighbours = get_neighbours(cursor, lat, lon, columns) + avg_data = calc_averages(neighbours, year_columns) + print(avg_data) + return avg_data + + +# get_average_data_for_point(52.5, 13.4) diff --git a/dataacquisition/api.py b/dataacquisition/api.py index aaa673ae1b45cd546553fd9428406327f4b36262..d4d984c020563dd0bcf20a9d68793d4ac0a40a9e 100644 --- a/dataacquisition/api.py +++ b/dataacquisition/api.py @@ -4,6 +4,7 @@ import psycopg2 from flask import Flask, jsonify, request from psycopg2 import sql import configparser +from GetAverageData import get_average_data_for_point cfg = configparser.ConfigParser() cfg.read('config.ini') @@ -25,12 +26,6 @@ def index(): wheres = sql.SQL('') # where filters values = '' # Used in second parameter of cursor.execute() (Avoids SQL injection) - if 'id' in request.args: - station_id = request.args['id'] - wheres = wheres + (sql.SQL("AND id = {values} ").format(column=sql.Identifier('stations', 'id'), values=sql.Placeholder())) - for n in [int(station_id)]: - values = (*values, n) # adding n to existing tuple - if 'years' in request.args: years = request.args['years'].split(',') years_clean = [] @@ -39,29 +34,41 @@ def index(): years_clean.append('"' + year + '"') years_clean = str(years_clean).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '') - columns = sql.SQL('id, ') + sql.SQL(years_clean) - - # Just for development - # if 'country' in request.args: - # country = request.args['country'] - # wheres = wheres + (sql.SQL("AND LOWER({column}) LIKE {values} ").format(column=sql.Identifier('stations', 'country'), values=sql.Placeholder())) - # for n in [country]: - # values = (*values, n) # adding n to existing tuple - - query = sql.SQL("SELECT array_to_json(array_agg(row_to_json(t))) from (" - "SELECT {} FROM stations " - "WHERE lon IS NOT NULL " # Unnecessary filter, just so the real filters can always be written with AND - "{} " - ") t;").format(columns, wheres) - - with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: - with connection.cursor() as cursor: - print(query.as_string(cursor)) - print(values) - cursor.execute(query, values) - - results = cursor.fetchall()[0][0] - return jsonify(results) + columns = sql.SQL(years_clean) + + if 'lat' in request.args or 'lng' in request.args: + lat = request.args['lat'] + lon = request.args['lon'] + return get_average_data_for_point(lat, lon, columns) + + else: + if 'id' in request.args: + station_id = request.args['id'] + wheres = wheres + (sql.SQL("AND id = {values} ").format(column=sql.Identifier('stations', 'id'), values=sql.Placeholder())) + for n in [int(station_id)]: + values = (*values, n) # adding n to existing tuple + + # Just for development + # if 'country' in request.args: + # country = request.args['country'] + # wheres = wheres + (sql.SQL("AND LOWER({column}) LIKE {values} ").format(column=sql.Identifier('stations', 'country'), values=sql.Placeholder())) + # for n in [country]: + # values = (*values, n) # adding n to existing tuple + + query = sql.SQL("SELECT array_to_json(array_agg(row_to_json(t))) from (" + "SELECT id, {} FROM stations " + "WHERE lon IS NOT NULL " # Unnecessary filter, just so the real filters can always be written with AND + "{} " + ") t;").format(columns, wheres) + + with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: + with connection.cursor() as cursor: + print(query.as_string(cursor)) + print(values) + cursor.execute(query, values) + + results = cursor.fetchall()[0][0] + return jsonify(results) app.run(host='127.0.0.1', port=42000)