You need to sign in or sign up before continuing.
Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 14 13:43:45 2021
export the stationlist to database
@author: geopeter
"""
import psycopg2
import psycopg2.extras
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
#from api.GetAverageData import get_interpolation_data_for_point
cfg = configparser.ConfigParser()
assert "POSTGRES" in cfg, "missing POSTGRES in config.ini"
param_postgres = cfg["POSTGRES"]

Clemens Berteld
committed
# get_average_data_for_point(52.5, 13.4)
def create_db(cursor):
print("Creating DB: ", param_postgres['dbName'])
create_db_query = sql.SQL("""CREATE DATABASE {};""".format(param_postgres['dbName']))
cursor.execute(create_db_query)
print('Done')

Clemens Berteld
committed
def drop_db(db_name):
if dbexists(db_name):
print("Drop DB: ", db_name)
with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
cursor.execute("DROP DATABASE {};".format(db_name))

Clemens Berteld
committed
except psycopg2.DatabaseError as error:
# do nothing, because test db is clean
print(error.message)
return

Clemens Berteld
committed
# Checks, if database "temperatures_berteld_morstein" exists
def dbexists(cursor):

Clemens Berteld
committed
try:
cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(param_postgres['dbName']))
db_exists = cursor.fetchall()[0]
print('DB existing')
print('Updating data')
return True

Clemens Berteld
committed
except(Exception, psycopg2.DatabaseError) as error:
print('No DB found')

Clemens Berteld
committed
return False
# If database "temperatures_berteld_morstein" not exists, create it
def check_for_db_existence(cursor):
print("Checking for database existence")
if dbexists(cursor):
return True
create_db(cursor)
return False

Clemens Berteld
committed
# Connect to DB "temperatures_berteld_morstein" to create table "temperatures". Also installs PostGIS
def create_table(station_list, cursor):
print('Creating table stations')

Clemens Berteld
committed
df_columns = list(station_list)
columns = ['id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT']
for column in df_columns:
if str(column).startswith('19') or str(column).startswith('20'):
columns.append('"{}" NUMERIC'.format(column))
columns_clean = str(columns).strip('[]').replace("'", "")
query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean))
#print(query)
cursor.execute(query)
cursor.execute('CREATE EXTENSION postgis;')
print('Done')
# Loading matrix coordinates from csv and writing it into table "stations" in db "temperatures_berteld_morstein"
def insert_empty_matrix_into_db(cursor):
print('Inserting empty matrix into database')
matrix_density = param_interpol['matrix_density']
with open('clipped_matrix_{}x{}.csv'.format(matrix_density, matrix_density), 'r') as matrix:
matrix_data = matrix.readlines()
matrix_points = 0
for line in matrix_data[1:]:
matrix_points += 1
values = '' # Used in second parameter of cursor.execute() (Avoids SQL injection)
data = line.split(';')
id = int("9999" + data[0].replace('"', ''))
lon = float(data[1])
lat = float(data[2].replace('\n', ''))
for n in [id, lon, lat]:
values = (*values, n) # adding n to existing tuple
query = sql.SQL("INSERT INTO STATIONS (id, lon, lat, country) "
"VALUES ({id}, {lon}, {lat}, 'Germany');").format(id=sql.Placeholder(), lon=sql.Placeholder(), lat=sql.Placeholder())
# print(query.as_string(cursor))
# print(values)
cursor.execute(query, values)
print('Done')
return matrix_points
# Calculating interpolation data for matrix using the according function from the API, and writing it into the database in bulk
def create_matrix_data(cursor, amount_points):
print('Calculating interpolation data for matrix')
# start_time = time.time()
cursor.execute("SELECT id, lon, lat FROM stations WHERE file is NULL;")
matrix_points = cursor.fetchall()
update_data = []
for i, point in enumerate(matrix_points):
id = point[0]
lon = point[1]
lat = point[2]
interpolation_data = get_interpolation_data_for_point(lat, lon, sql.SQL('*'), cursor)
for year, value in interpolation_data.items():
update_data.append({'year': year, 'value': value, 'id': id})
if i % round((amount_points / 10)) == 0:
finished = i / amount_points * 100
print(round(finished), end="% ... ")
print('', end=' 100%, Done. \n')
query = sql.SQL("""UPDATE stations SET "%(year)s" = %(value)s WHERE id = %(id)s; """)
print('Writing interpolation data to database')
psycopg2.extras.execute_batch(cursor, query, update_data) # Multiple times faster than using execute() in a for loop
print('Done')
# Dumping all existing data from database. Inserting station data into database in bulk.
def createInsertStatement(station_list):
# create INSERT INTO table (columns) VALUES('%s',...)
station_list.columns.astype(str)
df_columns = list(station_list)
station_list = station_list.round(decimals=3)
values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
# create INSERT INTO table (columns) VALUES('%s',...)
insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', df_columns, values)
return insert_stmt
def insert_data(station_list, cursor):
if len(station_list) > 0:
# print(stationList)
cursor.execute("DELETE FROM stations;")
# create (col1,col2,...)
# As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups
#columns = ['"' + column + '"' for column in df_columns]
# for column in df_columns:
# columns.append('"' + column + '"')
#columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '')
station_list = station_list.round(decimals=3)
# create VALUES('%s', '%s",...) one '%s' per column
#values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
# create INSERT INTO table (columns) VALUES('%s',...)
#insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values)
insert_stmt = createInsertStatement(station_list)
psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values)
print('Done')

Clemens Berteld
committed

Clemens Berteld
committed
def export(station_list):
db_name, user, pw, host, port = param_postgres['dbName'], param_postgres["user"], param_postgres["password"], param_postgres["host"], param_postgres["port"]
with psycopg2.connect(database='postgres', user=user, password=pw, host=param_postgres["host"], port=port) as connection:
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database
with connection.cursor() as cursor:
new_db = False if check_for_db_existence(cursor) else True
with psycopg2.connect(database=db_name, user=user, password=pw, host=param_postgres["host"], port=port) as connection:
with connection.cursor() as cursor:
if new_db: create_table(station_list, cursor)
insert_data(station_list, cursor)
amount_points = insert_empty_matrix_into_db(cursor)
with psycopg2.connect(database=db_name, user=user, password=pw, host=param_postgres["host"], port=port,
keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5) as connection:
with connection.cursor() as cursor:
create_matrix_data(cursor, amount_points)
print('Installation successful. You can run the api.py now.')