#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Jul 14 13:43:45 2021 export the stationlist to database @author: geopeter """ import time import psycopg2 import psycopg2.extras from psycopg2 import sql from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import configparser from dataacquisition.GetAverageData import get_interpolation_data_for_point cfg = configparser.ConfigParser() cfg.read('config.ini') assert "POSTGRES" in cfg, "missing POSTGRES in config.ini" param_postgres = cfg["POSTGRES"] param_interpol = cfg["INTERPOLATION"] stationGPD = None # Use existing connection to DB "postgres" to create DB "temperatures_berteld_morstein" def create_db(db_name): print("Create DB: ", db_name) connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database with connection.cursor() as cursor: create_db_query = sql.SQL("""CREATE DATABASE {};""".format(db_name)) cursor.execute(create_db_query) cursor.close() connection.close() def drop_db(db_name): if dbexists(db_name): print("Drop DB: ", db_name) try: with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: with connection.cursor() as cursor: cursor.execute("DROP DATABASE {};".format(db_name)) except psycopg2.DatabaseError as error: # do nothing, because test db is clean print(error.message) return def dbexists(db_name): try: with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: with connection.cursor() as cursor: cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(db_name)) db_exists = cursor.fetchall()[0] return True except(Exception, psycopg2.DatabaseError) as error: # do nothing, because test db is clean return False # Connect to DB "postgres" to check for database "temperatures_berteld_morstein" def check_for_db_existence(station_list, db_name): print("Checking for database existence") if dbexists(db_name): print('DB existing') else: create_db(db_name) create_table(station_list, db_name) # Connect to DB "temperatures_berteld_morstein" to create table "temperatures" def create_table(station_list, db_name): df_columns = list(station_list) columns = ['id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT'] for column in df_columns: if str(column).startswith('19') or str(column).startswith('20'): columns.append('"{}" NUMERIC'.format(column)) columns_clean = str(columns).strip('[]').replace("'", "") with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: with connection.cursor() as cursor: query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean)) #print(query) cursor.execute(query) cursor.execute('CREATE EXTENSION postgis;') def insert_empty_matrix_into_db(): print('Inserting empty matrix into database') matrix_density = param_interpol['matrix_density'] with psycopg2.connect(database=param_postgres['dbName'], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: with connection.cursor() as cursor: with open('clipped_matrix_{}x{}.csv'.format(matrix_density, matrix_density), 'r') as matrix: matrix_data = matrix.readlines() for line in matrix_data[1:]: values = '' # Used in second parameter of cursor.execute() (Avoids SQL injection) data = line.split(';') id = int("9999" + data[0].replace('"', '')) lon = float(data[1]) lat = float(data[2].replace('\n', '')) for n in [id, lon, lat]: values = (*values, n) # adding n to existing tuple query = sql.SQL("INSERT INTO STATIONS (id, lon, lat, country) " "VALUES ({id}, {lon}, {lat}, 'Germany');").format(id=sql.Placeholder(), lon=sql.Placeholder(), lat=sql.Placeholder()) # print(query.as_string(cursor)) # print(values) cursor.execute(query, values) def create_matrix_data(): print('Calculating interpolation data for matrix') # start_time = time.time() with psycopg2.connect(database=param_postgres['dbName'], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"], keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5) as connection: with connection.cursor() as cursor: cursor.execute("SELECT id, lon, lat FROM stations WHERE file is NULL;") matrix_points = cursor.fetchall() update_data = [] for point in matrix_points: id = point[0] lon = point[1] lat = point[2] interpolation_data = get_interpolation_data_for_point(lat, lon, sql.SQL('*')) for year, value in interpolation_data.items(): update_data.append({'year': year, 'value': value, 'id': id}) query = sql.SQL("""UPDATE stations SET "%(year)s" = %(value)s WHERE id = %(id)s; """) psycopg2.extras.execute_batch(cursor, query, update_data) # Multiple times faster than using execute() in a for loop # print((time.time() - start_time), 'seconds') def insert_data(station_list, db_name): print('Inserting data into database') with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: with connection.cursor() as cursor: if len(station_list) > 0: # print(stationList) cursor.execute("DELETE FROM stations;") df_columns = list(station_list) # create (col1,col2,...) # As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups columns = ['"' + column + '"' for column in df_columns] # for column in df_columns: # columns.append('"' + column + '"') columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '') station_list = station_list.round(decimals=3) # create VALUES('%s', '%s",...) one '%s' per column values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) # create INSERT INTO table (columns) VALUES('%s',...) insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values) psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values) def export(station_list): check_for_db_existence(station_list, param_postgres['dbName']) insert_data(station_list, param_postgres['dbName']) insert_empty_matrix_into_db() create_matrix_data() print('Installation successful. You can run the api.py now.')