#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Jul 14 13:43:45 2021 export the stationlist to database @author: geopeter """ import psycopg2 import psycopg2.extras from psycopg2 import sql from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import configparser import os import sys from api.GetAverageData import get_interpolation_data_for_point sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) cfg = configparser.ConfigParser() cfg.read('../config.ini') assert "POSTGRES" in cfg, "missing POSTGRES in config.ini" param_postgres = cfg["POSTGRES"] param_interpol = cfg["INTERPOLATION"] stationGPD = None # Create DB "temperatures_berteld_morstein" def create_db(cursor, db_name): print("Creating DB: ", db_name) create_db_query = sql.SQL("""CREATE DATABASE {};""".format(db_name)) cursor.execute(create_db_query) print('Done') def drop_db(cursor, db_name): print("Drop DB: ", db_name) try: if dbexists(cursor, db_name): cursor.execute("DROP DATABASE {};".format(db_name)) except psycopg2.DatabaseError as error: # do nothing, because test db is clean print(error.message) return # Checks, if database "temperatures_berteld_morstein" exists def dbexists(cursor, db_name): try: cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(db_name)) db_exists = cursor.fetchall()[0] print('DB existing') print('Updating data') return True except(Exception, psycopg2.DatabaseError) as error: print('No DB found') return False # If database "temperatures_berteld_morstein" not exists, create it def check_for_db_existence(cursor, db_name): print("Checking for database existence") if dbexists(cursor, db_name): return True else: create_db(cursor, db_name) return False def check_for_stations_existence(cursor, table_name): query = sql.SQL("SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{}'".format(table_name)) try: cursor.execute(query) results = cursor.fetchall()[0] return table_name in str(results) except(Exception, psycopg2.DatabaseError): return False # Connect to DB "temperatures_berteld_morstein" to create table "temperatures". Also installs PostGIS def create_table(station_list, cursor): print('Creating table stations') df_columns = list(station_list) columns = ['station_id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT', 'transparent BOOL'] for column in df_columns: if str(column).startswith('18') or str(column).startswith('19') or str(column).startswith('20'): columns.append('"{}" NUMERIC'.format(column)) columns_clean = str(columns).strip('[]').replace("'", "") query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean)) cursor.execute(query) cursor.execute('CREATE EXTENSION postgis;') print('Done') # Loading matrix coordinates from csv and writing it into table "stations" in db "temperatures_berteld_morstein" def insert_empty_matrix_into_db(cursor): print('Inserting empty matrix into database') matrix_density = param_interpol['matrix_density'] with open('matrix_{}x{}_4326_with_transparency.csv'.format(matrix_density, matrix_density), 'r') as matrix: matrix_data = matrix.readlines() matrix_points = 0 for line in matrix_data[1:]: matrix_points += 1 values = '' # Used in second parameter of cursor.execute() (Avoids SQL injection) data = line.split(';') id = int("9999" + data[0].replace('"', '')) transparent = data[1] lon = float(data[3]) lat = float(data[2].replace('\n', '')) for n in [id, transparent, lon, lat]: values = (*values, n) # adding n to existing tuple query = sql.SQL("INSERT INTO STATIONS (station_id, transparent, lon, lat, country) " "VALUES ({id}, {transparent}, {lon}, {lat}, 'Germany');").format(id=sql.Placeholder(), transparent=sql.Placeholder(), lon=sql.Placeholder(), lat=sql.Placeholder()) # print(query.as_string(cursor)) # print(values) cursor.execute(query, values) print('Done') return matrix_points # Calculating interpolation data for matrix using the according function from the API, and writing it into the database in bulk def create_matrix_data(cursor, amount_points): print('Calculating interpolation data for matrix') # start_time = time.time() cursor.execute("SELECT station_id, lon, lat FROM stations WHERE file is NULL;") matrix_points = cursor.fetchall() update_data = [] for i, point in enumerate(matrix_points): id = point[0] lon = point[1] lat = point[2] interpolation_data = get_interpolation_data_for_point(lat, lon, sql.SQL('*'), cursor) for year, value in interpolation_data.items(): update_data.append({'year': year, 'value': value, 'id': id}) if i % round((amount_points / 10)) == 0: finished = i / amount_points * 100 print(round(finished), end="% ... ") print('', end=' 100%, Done. \n') query = sql.SQL("""UPDATE stations SET "%(year)s" = %(value)s WHERE station_id = %(id)s; """) print('Writing interpolation data to database') psycopg2.extras.execute_batch(cursor, query, update_data) # Multiple times faster than using execute() in a for loop print('Done') # Dumping all existing data from database. Inserting station data into database in bulk. def createInsertStatement(cursor, station_list): df_columns = list(station_list) columns = ['"' + column + '"' for column in df_columns] columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '') value_template = "({})".format(",".join(["%s" for _ in df_columns])) values = ','.join(cursor.mogrify(value_template, value.array).decode("utf-8") for i, value in station_list.iterrows()) df_columns = str(df_columns).strip('[]') station_list = station_list.round(decimals=3) # create INSERT INTO table (columns) VALUES('%s',...) insert_stmt = """INSERT INTO {} ({}) VALUES {};""".format('stations', columns, values) return insert_stmt def insert_data(station_list, cursor): print('Inserting data into database') if len(station_list) > 0: cursor.execute("DELETE FROM stations;") station_list["station_id"] = station_list.index station_list = station_list.round(decimals=3) insert_stmt = createInsertStatement(cursor, station_list) cursor.execute(insert_stmt) print('Inserting data into database Done') def export(station_list): db_name, user, pw, host, port = param_postgres['dbName'], param_postgres["user"], param_postgres["password"], param_postgres["host"], param_postgres["port"] try: connection = psycopg2.connect(database='postgres', user=user, password=pw, host=param_postgres["host"], port=port) connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database with connection.cursor() as cursor: new_db = False if check_for_db_existence(cursor, db_name) else True connection = psycopg2.connect(database=db_name, user=user, password=pw, host=param_postgres["host"], port=port) connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database with connection.cursor() as cursor: if new_db: create_table(station_list, cursor) insert_data(station_list, cursor) amount_points = insert_empty_matrix_into_db(cursor) create_matrix_data(cursor, amount_points) print('Installation successful. You can run the api.py now.') except(Exception, psycopg2.DatabaseError) as error: print(error) finally: if connection: connection.commit() connection.close()