Skip to content
Snippets Groups Projects
ExportToDatabase.py 8.15 KiB
Newer Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 14 13:43:45 2021

export the stationlist to database

@author: geopeter
"""
import psycopg2
import psycopg2.extras
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import configparser
from dataacquisition.GetAverageData import get_interpolation_data_for_point
cfg = configparser.ConfigParser()
cfg.read('config.ini')
assert "POSTGRES" in cfg, "missing POSTGRES in config.ini"
param_postgres = cfg["POSTGRES"]
Clemens Berteld's avatar
Clemens Berteld committed
param_interpol = cfg["INTERPOLATION"]
Clemens Berteld's avatar
Clemens Berteld committed
# Create DB "temperatures_berteld_morstein"
def create_db(db_name):
    print("Create DB: ", db_name)
    connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
    connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)  # Needs to be in AUTOCOMMIT mode for creating database
    with connection.cursor() as cursor:
        create_db_query = sql.SQL("""CREATE DATABASE {};""".format(db_name))
        cursor.execute(create_db_query)
        connection.close()


def drop_db(db_name):
    if dbexists(db_name):
        print("Drop DB: ", db_name)
            with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
                with connection.cursor() as cursor:
                    cursor.execute("DROP DATABASE {};".format(db_name))
            # do nothing, because test db is clean
            print(error.message)
            return

Clemens Berteld's avatar
Clemens Berteld committed
# Checks, if database "temperatures_berteld_morstein" exists
        with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
            with connection.cursor() as cursor:
                cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(db_name))
                db_exists = cursor.fetchall()[0]
                return True
    except(Exception, psycopg2.DatabaseError) as error:
        # do nothing, because test db is clean
        return False
Clemens Berteld's avatar
Clemens Berteld committed
# If database "temperatures_berteld_morstein" not exists, create it
def check_for_db_existence(station_list, db_name):
    print("Checking for database existence")
Clemens Berteld's avatar
Clemens Berteld committed
        print('DB existing')
        create_db(db_name)
        create_table(station_list, db_name)

Clemens Berteld's avatar
Clemens Berteld committed
# Connect to DB "temperatures_berteld_morstein" to create table "temperatures". Also installs PostGIS
def create_table(station_list, db_name):
    df_columns = list(station_list)
Clemens Berteld's avatar
Clemens Berteld committed
    columns = ['id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT']
    for column in df_columns:
        if str(column).startswith('19') or str(column).startswith('20'):
            columns.append('"{}" NUMERIC'.format(column))
    columns_clean = str(columns).strip('[]').replace("'", "")

    with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
        with connection.cursor() as cursor:
Clemens Berteld's avatar
Clemens Berteld committed
            query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean))
            cursor.execute(query)
            cursor.execute('CREATE EXTENSION postgis;')


Clemens Berteld's avatar
Clemens Berteld committed
# Loading matrix coordinates from csv and writing it into table "stations" in db "temperatures_berteld_morstein"
def insert_empty_matrix_into_db():
Clemens Berteld's avatar
Clemens Berteld committed
    print('Inserting empty matrix into database')
    matrix_density = param_interpol['matrix_density']
    with psycopg2.connect(database=param_postgres['dbName'], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
        with connection.cursor() as cursor:
Clemens Berteld's avatar
Clemens Berteld committed
            with open('clipped_matrix_{}x{}.csv'.format(matrix_density, matrix_density), 'r') as matrix:
                matrix_data = matrix.readlines()
                for line in matrix_data[1:]:
                    values = ''  # Used in second parameter of cursor.execute() (Avoids SQL injection)
                    data = line.split(';')
                    id = int("9999" + data[0].replace('"', ''))
                    lon = float(data[1])
                    lat = float(data[2].replace('\n', ''))
                    for n in [id, lon, lat]:
                        values = (*values, n)  # adding n to existing tuple

                    query = sql.SQL("INSERT INTO STATIONS (id, lon, lat, country) "
                                    "VALUES ({id}, {lon}, {lat}, 'Germany');").format(id=sql.Placeholder(), lon=sql.Placeholder(), lat=sql.Placeholder())
                    # print(query.as_string(cursor))
                    # print(values)
                    cursor.execute(query, values)


Clemens Berteld's avatar
Clemens Berteld committed
# Calculating interpolation data for matrix using the according function from the API, and writing it into the database in bulk
def create_matrix_data():
Clemens Berteld's avatar
Clemens Berteld committed
    print('Calculating interpolation data for matrix')
    # start_time = time.time()
    with psycopg2.connect(database=param_postgres['dbName'], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"],
                          keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5) as connection:
        with connection.cursor() as cursor:
            cursor.execute("SELECT id, lon, lat FROM stations WHERE file is NULL;")
            matrix_points = cursor.fetchall()
            update_data = []
            for point in matrix_points:
                id = point[0]
                lon = point[1]
                lat = point[2]
                interpolation_data = get_interpolation_data_for_point(lat, lon, sql.SQL('*'))

                for year, value in interpolation_data.items():
                    update_data.append({'year': year, 'value': value, 'id': id})

            query = sql.SQL("""UPDATE stations SET "%(year)s" = %(value)s WHERE id = %(id)s; """)
Clemens Berteld's avatar
Clemens Berteld committed
            psycopg2.extras.execute_batch(cursor, query, update_data)   # Multiple times faster than using execute() in a for loop
    # print((time.time() - start_time), 'seconds')
Clemens Berteld's avatar
Clemens Berteld committed
# Dumping all existing data from database. Inserting station data into database in bulk.
Clemens Berteld's avatar
Clemens Berteld committed
    print('Inserting data into database')
    with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
        with connection.cursor() as cursor:

                # print(stationList)
Clemens Berteld's avatar
Clemens Berteld committed
                cursor.execute("DELETE FROM stations;")

                # create (col1,col2,...)
Clemens Berteld's avatar
Clemens Berteld committed

                # As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups
                columns = ['"' + column + '"' for column in df_columns]
                # for column in df_columns:
                #     columns.append('"' + column + '"')
Clemens Berteld's avatar
Clemens Berteld committed
                columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '')
                station_list = station_list.round(decimals=3)

                # create VALUES('%s', '%s",...) one '%s' per column
Clemens Berteld's avatar
Clemens Berteld committed
                values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))

                # create INSERT INTO table (columns) VALUES('%s',...)
Clemens Berteld's avatar
Clemens Berteld committed
                insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values)
                psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values)

def export(station_list):
    check_for_db_existence(station_list, param_postgres['dbName'])
    insert_data(station_list, param_postgres['dbName'])
    insert_empty_matrix_into_db()
Clemens Berteld's avatar
Clemens Berteld committed
    create_matrix_data()
    print('Installation successful. You can run the api.py now.')