Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 14 13:43:45 2021
export the stationlist to database
@author: geopeter
"""
import psycopg2
import psycopg2.extras
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from dataacquisition.GetAverageData import get_interpolation_data_for_point
cfg = configparser.ConfigParser()
cfg.read('config.ini')
assert "POSTGRES" in cfg, "missing POSTGRES in config.ini"
param_postgres = cfg["POSTGRES"]
stationGPD = None

Clemens Berteld
committed
# Use existing connection to DB "postgres" to create DB "temperatures_berteld_morstein"

Clemens Berteld
committed
def create_db(db_name):
print("Create DB: ", db_name)
connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database
with connection.cursor() as cursor:

Clemens Berteld
committed
create_db_query = sql.SQL("""CREATE DATABASE {};""".format(db_name))
cursor.execute(create_db_query)

Clemens Berteld
committed
def drop_db(db_name):
if dbexists(db_name):
print("Drop DB: ", db_name)
with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
cursor.execute("DROP DATABASE {};".format(db_name))

Clemens Berteld
committed
except psycopg2.DatabaseError as error:
# do nothing, because test db is clean
print(error.message)
return

Clemens Berteld
committed
def dbexists(db_name):
try:
with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(db_name))

Clemens Berteld
committed
db_exists = cursor.fetchall()[0]
return True

Clemens Berteld
committed
except(Exception, psycopg2.DatabaseError) as error:
# do nothing, because test db is clean
return False
# Connect to DB "postgres" to check for database "temperatures_berteld_morstein"

Clemens Berteld
committed
def check_for_db_existence(station_list, db_name):
print("Checking for database existence")

Clemens Berteld
committed
if dbexists(db_name):
print('DB existing exists')
else:

Clemens Berteld
committed
create_db(db_name)
create_table(station_list, db_name)
# Connect to DB "temperatures_berteld_morstein" to create table "temperatures"

Clemens Berteld
committed
def create_table(station_list, db_name):
df_columns = list(station_list)
columns = ['id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT']
for column in df_columns:
if str(column).startswith('19') or str(column).startswith('20'):
columns.append('"{}" NUMERIC'.format(column))
columns_clean = str(columns).strip('[]').replace("'", "")

Clemens Berteld
committed
with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean))
cursor.execute('CREATE EXTENSION postgis;')
def insert_empty_matrix_into_db():
with psycopg2.connect(database=param_postgres['dbName'], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
with open('clipped_matrix_25x25.csv', 'r') as matrix:
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
matrix_data = matrix.readlines()
for line in matrix_data[1:]:
values = '' # Used in second parameter of cursor.execute() (Avoids SQL injection)
data = line.split(';')
id = int("9999" + data[0].replace('"', ''))
lon = float(data[1])
lat = float(data[2].replace('\n', ''))
for n in [id, lon, lat]:
values = (*values, n) # adding n to existing tuple
query = sql.SQL("INSERT INTO STATIONS (id, lon, lat, country) "
"VALUES ({id}, {lon}, {lat}, 'Germany');").format(id=sql.Placeholder(), lon=sql.Placeholder(), lat=sql.Placeholder())
# print(query.as_string(cursor))
# print(values)
cursor.execute(query, values)
print('Inserted empty matrix into database')
def create_matrix_data():
with psycopg2.connect(database=param_postgres['dbName'], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"],
keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT id, lon, lat FROM stations WHERE file is NULL;")
matrix_points = cursor.fetchall()
update_data = []
for point in matrix_points:
id = point[0]
lon = point[1]
lat = point[2]
interpolation_data = get_interpolation_data_for_point(lat, lon, sql.SQL('*'))
for year, value in interpolation_data.items():
update_data.append({'year': year, 'value': value, 'id': id})
query = sql.SQL("""UPDATE stations SET "%(year)s" = %(value)s WHERE id = %(id)s; """)
psycopg2.extras.execute_batch(cursor, query, update_data) # 2 times faster than using execute() in a for loop, ~20 mins instead of 40

Clemens Berteld
committed
def insert_data(station_list, db_name):
with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:

Clemens Berteld
committed
if len(station_list) > 0:

Clemens Berteld
committed
df_columns = list(station_list)
# As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups
columns = ['"' + column + '"' for column in df_columns]
# for column in df_columns:
# columns.append('"' + column + '"')
columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '')

Clemens Berteld
committed
station_list = station_list.round(decimals=3)
# create VALUES('%s', '%s",...) one '%s' per column
values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
# create INSERT INTO table (columns) VALUES('%s',...)
insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values)

Clemens Berteld
committed
psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values)

Clemens Berteld
committed
def export(station_list):
check_for_db_existence(station_list, param_postgres['dbName'])
insert_data(station_list, param_postgres['dbName'])
insert_empty_matrix_into_db()
start_time = time.time()
create_matrix_data()
print((time.time() - start_time), 'seconds')