Skip to content
Snippets Groups Projects
ExportToDatabase.py 9.47 KiB
Newer Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 14 13:43:45 2021

export the stationlist to database

@author: geopeter
"""
import psycopg2
import psycopg2.extras
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import configparser
Clemens Berteld's avatar
Clemens Berteld committed
from api.GetAverageData import get_interpolation_data_for_point
cfg = configparser.ConfigParser()
Peter Morstein's avatar
Peter Morstein committed
cfg.read('../config.ini')
assert "POSTGRES" in cfg, "missing POSTGRES in config.ini"
param_postgres = cfg["POSTGRES"]
Clemens Berteld's avatar
Clemens Berteld committed
param_interpol = cfg["INTERPOLATION"]
# get_average_data_for_point(52.5, 13.4)

Clemens Berteld's avatar
Clemens Berteld committed
# Create DB "temperatures_berteld_morstein"
Peter Morstein's avatar
Peter Morstein committed
def create_db(cursor, db_name):
    print("Creating DB: ", db_name)
    create_db_query = sql.SQL("""CREATE DATABASE {};""".format(db_name))
    cursor.execute(create_db_query)
    print('Done')
Peter Morstein's avatar
Peter Morstein committed
def drop_db(cursor, db_name):
    print("Drop DB: ", db_name)
    
    try:
        if dbexists(cursor, db_name):
            cursor.execute("DROP DATABASE {};".format(db_name))
    except psycopg2.DatabaseError as error:
        # do nothing, because test db is clean
        print(error.message)
        return
Clemens Berteld's avatar
Clemens Berteld committed
# Checks, if database "temperatures_berteld_morstein" exists
Peter Morstein's avatar
Peter Morstein committed
def dbexists(cursor, db_name):
Peter Morstein's avatar
Peter Morstein committed
        cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(db_name))
        db_exists = cursor.fetchall()[0]
        print('DB existing')
        print('Updating data')
        return True
    except(Exception, psycopg2.DatabaseError) as error:
Clemens Berteld's avatar
Clemens Berteld committed
# If database "temperatures_berteld_morstein" not exists, create it
Peter Morstein's avatar
Peter Morstein committed
def check_for_db_existence(cursor, db_name):
    print("Checking for database existence")
Peter Morstein's avatar
Peter Morstein committed
    if dbexists(cursor, db_name):
Peter Morstein's avatar
Peter Morstein committed
        create_db(cursor, db_name)
Peter Morstein's avatar
Peter Morstein committed
def check_for_stations_existence(cursor, table_name):
    query = sql.SQL("SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{}'".format(table_name))
    try:
        cursor.execute(query)
        results = cursor.fetchall()[0]
        return table_name in str(results)
    except(Exception, psycopg2.DatabaseError):
        return False
Clemens Berteld's avatar
Clemens Berteld committed
# Connect to DB "temperatures_berteld_morstein" to create table "temperatures". Also installs PostGIS
def create_table(station_list, cursor):
    print('Creating table stations')
    columns = ['station_id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT', 'transparent BOOL']
Clemens Berteld's avatar
Clemens Berteld committed
    for column in df_columns:
Peter Morstein's avatar
Peter Morstein committed
        if str(column).startswith('18') or str(column).startswith('19') or str(column).startswith('20'):
Clemens Berteld's avatar
Clemens Berteld committed
            columns.append('"{}" NUMERIC'.format(column))
    columns_clean = str(columns).strip('[]').replace("'", "")

    query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean))
Peter Morstein's avatar
Peter Morstein committed
    
    cursor.execute(query)
    cursor.execute('CREATE EXTENSION postgis;')
    print('Done')
Clemens Berteld's avatar
Clemens Berteld committed
# Loading matrix coordinates from csv and writing it into table "stations" in db "temperatures_berteld_morstein"
def insert_empty_matrix_into_db(cursor):
Clemens Berteld's avatar
Clemens Berteld committed
    print('Inserting empty matrix into database')
    matrix_density = param_interpol['matrix_density']
    with open('matrix_{}x{}_4326_with_transparency.csv'.format(matrix_density, matrix_density), 'r') as matrix:
        matrix_data = matrix.readlines()
        matrix_points = 0
        for line in matrix_data[1:]:
            matrix_points += 1
            values = ''  # Used in second parameter of cursor.execute() (Avoids SQL injection)
            data = line.split(';')
            id = int("9999" + data[0].replace('"', ''))
            transparent = data[1]
            lon = float(data[3])
            lat = float(data[2].replace('\n', ''))
            for n in [id, transparent, lon, lat]:
                values = (*values, n)  # adding n to existing tuple

            query = sql.SQL("INSERT INTO STATIONS (station_id, transparent, lon, lat, country) "
                            "VALUES ({id}, {transparent}, {lon}, {lat}, 'Germany');").format(id=sql.Placeholder(), transparent=sql.Placeholder(), lon=sql.Placeholder(), lat=sql.Placeholder())
            # print(query.as_string(cursor))
            # print(values)
            cursor.execute(query, values)
        print('Done')
        return matrix_points
Clemens Berteld's avatar
Clemens Berteld committed
# Calculating interpolation data for matrix using the according function from the API, and writing it into the database in bulk
def create_matrix_data(cursor, amount_points):
Clemens Berteld's avatar
Clemens Berteld committed
    print('Calculating interpolation data for matrix')
    # start_time = time.time()
Clemens Berteld's avatar
Clemens Berteld committed
    cursor.execute("SELECT station_id, lon, lat FROM stations WHERE file is NULL;")
    matrix_points = cursor.fetchall()
    update_data = []
    for i, point in enumerate(matrix_points):
        id = point[0]
        lon = point[1]
        lat = point[2]
        interpolation_data = get_interpolation_data_for_point(lat, lon, sql.SQL('*'), cursor)

        for year, value in interpolation_data.items():
            update_data.append({'year': year, 'value': value, 'id': id})
        if i % round((amount_points / 10)) == 0:
            finished = i / amount_points * 100
            print(round(finished), end="% ... ")
    print('', end=' 100%, Done. \n')

Clemens Berteld's avatar
Clemens Berteld committed
    query = sql.SQL("""UPDATE stations SET "%(year)s" = %(value)s WHERE station_id = %(id)s; """)
    print('Writing interpolation data to database')
    psycopg2.extras.execute_batch(cursor, query, update_data)   # Multiple times faster than using execute() in a for loop
Clemens Berteld's avatar
Clemens Berteld committed
    # print((time.time() - start_time), 'seconds')
Clemens Berteld's avatar
Clemens Berteld committed
# Dumping all existing data from database. Inserting station data into database in bulk.
Peter Morstein's avatar
Peter Morstein committed
def createInsertStatement(cursor, station_list):
    # create INSERT INTO table (columns) VALUES('%s',...)
Clemens Berteld's avatar
Clemens Berteld committed
    # station_list.columns.astype(str)

    df_columns = list(station_list)
Clemens Berteld's avatar
Clemens Berteld committed
    columns = ['"' + column + '"' for column in df_columns]
    columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '')
Peter Morstein's avatar
Peter Morstein committed
    value_template = "({})".format(",".join(["%s" for _ in df_columns]))
    values = ','.join(cursor.mogrify(value_template, value.array).decode("utf-8") for i, value in station_list.iterrows())
Peter Morstein's avatar
Peter Morstein committed
    
Peter Morstein's avatar
Peter Morstein committed
    df_columns = str(df_columns).strip('[]')
    station_list = station_list.round(decimals=3)
    # create INSERT INTO table (columns) VALUES('%s',...)
Peter Morstein's avatar
Peter Morstein committed
    insert_stmt = """INSERT INTO {} ({}) VALUES {};""".format('stations', columns, values)
    
    return insert_stmt

Clemens Berteld's avatar
Clemens Berteld committed

def insert_data(station_list, cursor):
Clemens Berteld's avatar
Clemens Berteld committed
    print('Inserting data into database')
    if len(station_list) > 0:
        # print(stationList)
        cursor.execute("DELETE FROM stations;")
        #df_columns = list(station_list)
        # As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups
        #columns = ['"' + column + '"' for column in df_columns]
        # for column in df_columns:
        #     columns.append('"' + column + '"')
        #columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '')
        
        station_list["station_id"] = station_list.index
        station_list = station_list.round(decimals=3)
        # create VALUES('%s', '%s",...) one '%s' per column
        #values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
        # create INSERT INTO table (columns) VALUES('%s',...)
        #insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values)
Peter Morstein's avatar
Peter Morstein committed
        insert_stmt = createInsertStatement(cursor, station_list)
Peter Morstein's avatar
Peter Morstein committed
        
Peter Morstein's avatar
Peter Morstein committed
        #psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values)
        cursor.execute(insert_stmt)
Peter Morstein's avatar
Peter Morstein committed
        
Peter Morstein's avatar
Peter Morstein committed
    print('Inserting data into database Done')
    db_name, user, pw, host, port = param_postgres['dbName'], param_postgres["user"], param_postgres["password"], param_postgres["host"], param_postgres["port"]
Peter Morstein's avatar
Peter Morstein committed
    
    try:
        connection = psycopg2.connect(database='postgres', user=user, password=pw, host=param_postgres["host"], port=port)
        connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)  # Needs to be in AUTOCOMMIT mode for creating database
        with connection.cursor() as cursor:
Peter Morstein's avatar
Peter Morstein committed
            new_db = False if check_for_db_existence(cursor, db_name) else True
    
        station_exists = False
        connection = psycopg2.connect(database=db_name, user=user, password=pw, host=param_postgres["host"], port=port)
        connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)  # Needs to be in AUTOCOMMIT mode for creating database
        with connection.cursor() as cursor:
Peter Morstein's avatar
Peter Morstein committed
            station_exists = check_for_stations_existence(cursor, "stations")
Peter Morstein's avatar
Peter Morstein committed
            if station_exists == False: 
Peter Morstein's avatar
Peter Morstein committed
                create_table(station_list, cursor)        
Peter Morstein's avatar
Peter Morstein committed
            
Peter Morstein's avatar
Peter Morstein committed
            insert_data(station_list, cursor)
            amount_points = insert_empty_matrix_into_db(cursor)
Peter Morstein's avatar
Peter Morstein committed
    
Peter Morstein's avatar
Peter Morstein committed
            # connection = psycopg2.connect(database=db_name, user=user, password=pw, host=param_postgres["host"], port=port,
            #                       keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5)
            with connection.cursor() as cursor:
                create_matrix_data(cursor, amount_points)
        
        print('Installation successful. You can run the api.py now.')
    except(Exception, psycopg2.DatabaseError) as error:
        print(error)
Peter Morstein's avatar
Peter Morstein committed
    
    finally:
        if connection:
            connection.close()