#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Jul 14 13:43:45 2021 export the stationlist to database @author: geopeter """ import psycopg2 import psycopg2.extras from psycopg2 import sql from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import configparser from api.GetAverageData import get_interpolation_data_for_point cfg = configparser.ConfigParser() cfg.read('../config.ini') assert "POSTGRES" in cfg, "missing POSTGRES in config.ini" param_postgres = cfg["POSTGRES"] param_interpol = cfg["INTERPOLATION"] stationGPD = None # get_average_data_for_point(52.5, 13.4) # Create DB "temperatures_berteld_morstein" def create_db(cursor): print("Creating DB: ", param_postgres['dbName']) create_db_query = sql.SQL("""CREATE DATABASE {};""".format(param_postgres['dbName'])) cursor.execute(create_db_query) print('Done') def drop_db(db_name): if dbexists(db_name): print("Drop DB: ", db_name) try: with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: with connection.cursor() as cursor: cursor.execute("DROP DATABASE {};".format(db_name)) except psycopg2.DatabaseError as error: # do nothing, because test db is clean print(error.message) return # Checks, if database "temperatures_berteld_morstein" exists def dbexists(cursor): try: cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(param_postgres['dbName'])) db_exists = cursor.fetchall()[0] print('DB existing') print('Updating data') return True except(Exception, psycopg2.DatabaseError) as error: print('No DB found') return False # If database "temperatures_berteld_morstein" not exists, create it def check_for_db_existence(cursor): print("Checking for database existence") if dbexists(cursor): return True else: create_db(cursor) return False # Connect to DB "temperatures_berteld_morstein" to create table "temperatures". Also installs PostGIS def create_table(station_list, cursor): print('Creating table stations') df_columns = list(station_list) columns = ['station_id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT', 'transparent BOOL'] for column in df_columns: if str(column).startswith('19') or str(column).startswith('20'): columns.append('"{}" NUMERIC'.format(column)) columns_clean = str(columns).strip('[]').replace("'", "") query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean)) #print(query) cursor.execute(query) cursor.execute('CREATE EXTENSION postgis;') print('Done') # Loading matrix coordinates from csv and writing it into table "stations" in db "temperatures_berteld_morstein" def insert_empty_matrix_into_db(cursor): print('Inserting empty matrix into database') matrix_density = param_interpol['matrix_density'] with open('matrix_{}x{}_4326_with_transparency.csv'.format(matrix_density, matrix_density), 'r') as matrix: matrix_data = matrix.readlines() matrix_points = 0 for line in matrix_data[1:]: matrix_points += 1 values = '' # Used in second parameter of cursor.execute() (Avoids SQL injection) data = line.split(';') id = int("9999" + data[0].replace('"', '')) transparent = data[1] lon = float(data[3]) lat = float(data[2].replace('\n', '')) for n in [id, transparent, lon, lat]: values = (*values, n) # adding n to existing tuple query = sql.SQL("INSERT INTO STATIONS (station_id, transparent, lon, lat, country) " "VALUES ({id}, {transparent}, {lon}, {lat}, 'Germany');").format(id=sql.Placeholder(), transparent=sql.Placeholder(), lon=sql.Placeholder(), lat=sql.Placeholder()) # print(query.as_string(cursor)) # print(values) cursor.execute(query, values) print('Done') return matrix_points # Calculating interpolation data for matrix using the according function from the API, and writing it into the database in bulk def create_matrix_data(cursor, amount_points): print('Calculating interpolation data for matrix') # start_time = time.time() cursor.execute("SELECT station_id, lon, lat FROM stations WHERE file is NULL;") matrix_points = cursor.fetchall() update_data = [] for i, point in enumerate(matrix_points): id = point[0] lon = point[1] lat = point[2] interpolation_data = get_interpolation_data_for_point(lat, lon, sql.SQL('*'), cursor) for year, value in interpolation_data.items(): update_data.append({'year': year, 'value': value, 'id': id}) if i % round((amount_points / 10)) == 0: finished = i / amount_points * 100 print(round(finished), end="% ... ") print('', end=' 100%, Done. \n') query = sql.SQL("""UPDATE stations SET "%(year)s" = %(value)s WHERE station_id = %(id)s; """) print('Writing interpolation data to database') psycopg2.extras.execute_batch(cursor, query, update_data) # Multiple times faster than using execute() in a for loop # print((time.time() - start_time), 'seconds') print('Done') # Dumping all existing data from database. Inserting station data into database in bulk. def createInsertStatement(station_list): # create INSERT INTO table (columns) VALUES('%s',...) # station_list.columns.astype(str) df_columns = list(station_list) columns = ['"' + column + '"' for column in df_columns] columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '') values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) df_columns = str(df_columns).strip('[]') station_list = station_list.round(decimals=3) # create INSERT INTO table (columns) VALUES('%s',...) insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values) return insert_stmt def insert_data(station_list, cursor): print('Inserting data into database') if len(station_list) > 0: # print(stationList) cursor.execute("DELETE FROM stations;") #df_columns = list(station_list) # create (col1,col2,...) # As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups #columns = ['"' + column + '"' for column in df_columns] # for column in df_columns: # columns.append('"' + column + '"') #columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '') station_list["station_id"] = station_list.index station_list = station_list.round(decimals=3) # create VALUES('%s', '%s",...) one '%s' per column #values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) # create INSERT INTO table (columns) VALUES('%s',...) #insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values) insert_stmt = createInsertStatement(station_list) print(insert_stmt) psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values) print('Done') def export(station_list): db_name, user, pw, host, port = param_postgres['dbName'], param_postgres["user"], param_postgres["password"], param_postgres["host"], param_postgres["port"] with psycopg2.connect(database='postgres', user=user, password=pw, host=param_postgres["host"], port=port) as connection: connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database with connection.cursor() as cursor: new_db = False if check_for_db_existence(cursor) else True with psycopg2.connect(database=db_name, user=user, password=pw, host=param_postgres["host"], port=port) as connection: with connection.cursor() as cursor: if new_db: create_table(station_list, cursor) insert_data(station_list, cursor) amount_points = insert_empty_matrix_into_db(cursor) with psycopg2.connect(database=db_name, user=user, password=pw, host=param_postgres["host"], port=port, keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5) as connection: with connection.cursor() as cursor: create_matrix_data(cursor, amount_points) print('Installation successful. You can run the api.py now.')