Skip to content
Snippets Groups Projects
Commit 4138e75d authored by Clemens Berteld's avatar Clemens Berteld
Browse files

Lots of cleanups. Database connections are pretty efficient now.

parent 4d421157
No related branches found
No related tags found
No related merge requests found
......@@ -177,10 +177,12 @@ def loadTemperatureFromDWDGauges():
continue
i += 1
if i % 10 == 0:
if i % round((len(gaugeCountry.country) / 10)) == 0:
finished = i/len(gaugeCountry.country) * 100
print(np.round(finished), end="% ... ")
if i == (len(gaugeCountry.country)):
print('', end=' 100%, Done. \n')
stationList.columns = stationList.columns.astype(str)
stationList = stationList.sort_index(axis=1, ascending=False)
......
......@@ -25,15 +25,11 @@ stationGPD = None
# Create DB "temperatures_berteld_morstein"
def create_db(db_name):
print("Create DB: ", db_name)
connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database
with connection.cursor() as cursor:
create_db_query = sql.SQL("""CREATE DATABASE {};""".format(db_name))
cursor.execute(create_db_query)
cursor.close()
connection.close()
def create_db(cursor):
print("Creating DB: ", param_postgres['dbName'])
create_db_query = sql.SQL("""CREATE DATABASE {};""".format(param_postgres['dbName']))
cursor.execute(create_db_query)
print('Done')
def drop_db(db_name):
......@@ -50,31 +46,31 @@ def drop_db(db_name):
# Checks, if database "temperatures_berteld_morstein" exists
def dbexists(db_name):
def dbexists(cursor):
try:
with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(db_name))
db_exists = cursor.fetchall()[0]
return True
cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(param_postgres['dbName']))
db_exists = cursor.fetchall()[0]
print('DB existing')
print('Updating data')
return True
except(Exception, psycopg2.DatabaseError) as error:
# do nothing, because test db is clean
print('No DB found')
return False
# If database "temperatures_berteld_morstein" not exists, create it
def check_for_db_existence(station_list, db_name):
def check_for_db_existence(cursor):
print("Checking for database existence")
if dbexists(db_name):
print('DB existing')
if dbexists(cursor):
return True
else:
create_db(db_name)
create_table(station_list, db_name)
create_db(cursor)
return False
# Connect to DB "temperatures_berteld_morstein" to create table "temperatures". Also installs PostGIS
def create_table(station_list, db_name):
def create_table(station_list, cursor):
print('Creating table stations')
df_columns = list(station_list)
columns = ['id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT']
for column in df_columns:
......@@ -82,93 +78,109 @@ def create_table(station_list, db_name):
columns.append('"{}" NUMERIC'.format(column))
columns_clean = str(columns).strip('[]').replace("'", "")
with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean))
#print(query)
cursor.execute(query)
cursor.execute('CREATE EXTENSION postgis;')
query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean))
#print(query)
cursor.execute(query)
cursor.execute('CREATE EXTENSION postgis;')
print('Done')
# Loading matrix coordinates from csv and writing it into table "stations" in db "temperatures_berteld_morstein"
def insert_empty_matrix_into_db():
def insert_empty_matrix_into_db(cursor):
print('Inserting empty matrix into database')
matrix_density = param_interpol['matrix_density']
with psycopg2.connect(database=param_postgres['dbName'], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
with open('clipped_matrix_{}x{}.csv'.format(matrix_density, matrix_density), 'r') as matrix:
matrix_data = matrix.readlines()
for line in matrix_data[1:]:
values = '' # Used in second parameter of cursor.execute() (Avoids SQL injection)
data = line.split(';')
id = int("9999" + data[0].replace('"', ''))
lon = float(data[1])
lat = float(data[2].replace('\n', ''))
for n in [id, lon, lat]:
values = (*values, n) # adding n to existing tuple
query = sql.SQL("INSERT INTO STATIONS (id, lon, lat, country) "
"VALUES ({id}, {lon}, {lat}, 'Germany');").format(id=sql.Placeholder(), lon=sql.Placeholder(), lat=sql.Placeholder())
# print(query.as_string(cursor))
# print(values)
cursor.execute(query, values)
with open('clipped_matrix_{}x{}.csv'.format(matrix_density, matrix_density), 'r') as matrix:
matrix_data = matrix.readlines()
matrix_points = 0
for line in matrix_data[1:]:
matrix_points += 1
values = '' # Used in second parameter of cursor.execute() (Avoids SQL injection)
data = line.split(';')
id = int("9999" + data[0].replace('"', ''))
lon = float(data[1])
lat = float(data[2].replace('\n', ''))
for n in [id, lon, lat]:
values = (*values, n) # adding n to existing tuple
query = sql.SQL("INSERT INTO STATIONS (id, lon, lat, country) "
"VALUES ({id}, {lon}, {lat}, 'Germany');").format(id=sql.Placeholder(), lon=sql.Placeholder(), lat=sql.Placeholder())
# print(query.as_string(cursor))
# print(values)
cursor.execute(query, values)
print('Done')
return matrix_points
# Calculating interpolation data for matrix using the according function from the API, and writing it into the database in bulk
def create_matrix_data():
def create_matrix_data(cursor, amount_points):
print('Calculating interpolation data for matrix')
# start_time = time.time()
with psycopg2.connect(database=param_postgres['dbName'], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"],
keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT id, lon, lat FROM stations WHERE file is NULL;")
matrix_points = cursor.fetchall()
update_data = []
for point in matrix_points:
id = point[0]
lon = point[1]
lat = point[2]
interpolation_data = get_interpolation_data_for_point(lat, lon, sql.SQL('*'))
for year, value in interpolation_data.items():
update_data.append({'year': year, 'value': value, 'id': id})
query = sql.SQL("""UPDATE stations SET "%(year)s" = %(value)s WHERE id = %(id)s; """)
psycopg2.extras.execute_batch(cursor, query, update_data) # Multiple times faster than using execute() in a for loop
cursor.execute("SELECT id, lon, lat FROM stations WHERE file is NULL;")
matrix_points = cursor.fetchall()
update_data = []
for i, point in enumerate(matrix_points):
id = point[0]
lon = point[1]
lat = point[2]
interpolation_data = get_interpolation_data_for_point(lat, lon, sql.SQL('*'), cursor)
for year, value in interpolation_data.items():
update_data.append({'year': year, 'value': value, 'id': id})
if i % round((amount_points / 10)) == 0:
finished = i / amount_points * 100
print(round(finished), end="% ... ")
print('', end=' 100%, Done. \n')
query = sql.SQL("""UPDATE stations SET "%(year)s" = %(value)s WHERE id = %(id)s; """)
print('Writing interpolation data to database')
psycopg2.extras.execute_batch(cursor, query, update_data) # Multiple times faster than using execute() in a for loop
# print((time.time() - start_time), 'seconds')
print('Done')
# Dumping all existing data from database. Inserting station data into database in bulk.
def insert_data(station_list, db_name):
def insert_data(station_list, cursor):
print('Inserting data into database')
with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
if len(station_list) > 0:
# print(stationList)
cursor.execute("DELETE FROM stations;")
if len(station_list) > 0:
# print(stationList)
cursor.execute("DELETE FROM stations;")
df_columns = list(station_list)
# create (col1,col2,...)
df_columns = list(station_list)
# create (col1,col2,...)
# As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups
columns = ['"' + column + '"' for column in df_columns]
# for column in df_columns:
# columns.append('"' + column + '"')
columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '')
station_list = station_list.round(decimals=3)
# As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups
columns = ['"' + column + '"' for column in df_columns]
# for column in df_columns:
# columns.append('"' + column + '"')
columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '')
station_list = station_list.round(decimals=3)
# create VALUES('%s', '%s",...) one '%s' per column
values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
# create VALUES('%s', '%s",...) one '%s' per column
values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
# create INSERT INTO table (columns) VALUES('%s',...)
insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values)
psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values)
# create INSERT INTO table (columns) VALUES('%s',...)
insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values)
psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values)
print('Done')
def export(station_list):
check_for_db_existence(station_list, param_postgres['dbName'])
insert_data(station_list, param_postgres['dbName'])
insert_empty_matrix_into_db()
create_matrix_data()
db_name, user, pw, host, port = param_postgres['dbName'], param_postgres["user"], param_postgres["password"], param_postgres["host"], param_postgres["port"]
with psycopg2.connect(database='postgres', user=user, password=pw, host=param_postgres["host"], port=port) as connection:
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database
with connection.cursor() as cursor:
new_db = False if check_for_db_existence(cursor) else True
with psycopg2.connect(database=db_name, user=user, password=pw, host=param_postgres["host"], port=port) as connection:
with connection.cursor() as cursor:
if new_db: create_table(station_list, cursor)
insert_data(station_list, cursor)
amount_points = insert_empty_matrix_into_db(cursor)
with psycopg2.connect(database=db_name, user=user, password=pw, host=param_postgres["host"], port=port,
keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5) as connection:
with connection.cursor() as cursor:
create_matrix_data(cursor, amount_points)
print('Installation successful. You can run the api.py now.')
......@@ -80,17 +80,15 @@ def calc_idw(neighbours, years):
# Collecting preparation data and execute interpolation
def get_interpolation_data_for_point(lat, lon, columns):
with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
if '*' in str(columns):
year_columns = get_year_columns(cursor)
else:
year_columns = (str(columns).replace("""SQL('""", "").replace('"', '').replace("')", "")).split(',')
neighbours = get_neighbours(cursor, lat, lon, columns)
avg_data = calc_idw(neighbours, year_columns)
# print(avg_data)
return avg_data
def get_interpolation_data_for_point(lat, lon, columns, cursor):
if '*' in str(columns):
year_columns = get_year_columns(cursor)
else:
year_columns = (str(columns).replace("""SQL('""", "").replace('"', '').replace("')", "")).split(',')
neighbours = get_neighbours(cursor, lat, lon, columns)
avg_data = calc_idw(neighbours, year_columns)
# print(avg_data)
return avg_data
# get_average_data_for_point(52.5, 13.4)
......@@ -41,7 +41,9 @@ def index():
if 'lat' in request.args or 'lng' in request.args:
lat = request.args['lat']
lon = request.args['lon']
return get_interpolation_data_for_point(lat, lon, columns)
with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
return get_interpolation_data_for_point(lat, lon, columns, cursor)
else:
if 'id' in request.args:
......
import configparser
import time
import psycopg2
from psycopg2 import sql
cfg = configparser.ConfigParser()
cfg.read('config.ini')
top = 101
assert "POSTGRES" in cfg, "missing POSTGRES in config.ini"
for i in range(0, top):
time.sleep(0.1)
if i % 10 == 0:
finished = i / top * 100
param_postgres = cfg["POSTGRES"]
param_interpol = cfg["INTERPOLATION"]
#
# query = sql.SQL("select {0} from {1} where {2} LIKE {3} {4}").format(sql.SQL(', ').join([sql.Identifier('foo'), sql.Identifier('bar')]), sql.Identifier('table'), sql.Identifier('coulumn'), sql.Placeholder(),
# sql.SQL('AND 1 = 1'))
#
# with psycopg2.connect(database='temperatures_berteld_morstein', user='postgres', password='postgres', host='localhost', port=5432) as connection:
# with connection.cursor() as cursor:
# print(query.as_string(cursor))
# weights = [2, 1.25, 0.75, 0.6, 0.4]
# neighbours = [{'2015': 'NaN', '2010': 9.333, '2014': 'NaN', '1998': 'NaN', 'distance': 0.0223606797750006}, {'2015': 11.233, '2010': 8.883, '2014': 11.458, '1998': 10.05, 'distance': 0.0300000000000011}, {'2015': 11.133, '2010': 8.767, '2014': 11.467, '1998': 'NaN', 'distance': 0.108166538263921}, {'2015': 10.667, '2010': 8.208, '2014': 11.025, '1998': 9.8, 'distance': 0.111803398874988}, {'2015': 10.667, '2010': 8.35, '2014': 10.908, '1998': 'NaN', 'distance': 0.176918060129539}]
#
# for weight, neighbour in zip(weights, neighbours):
# print(weight, neighbour['2015'])
# list_a = [1,2,3,4,5,6]
# list_b = [x * 2 for x in list_a if x % 2 == 0]
# print(list_b)
# list_a = [1,2,3,4,5,6,7,8,9,10]
# new_list = list_a[0::2]
# print(new_list)
def get_neighbours(lat, lon, columns):
values = '' # Used in second parameter of cursor.execute() (Avoids SQL injection)
for n in [lat, lon]:
values = (*values, n) # adding n to existing tuple
with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
query = sql.SQL("""
SELECT array_to_json(array_agg(row_to_json(t))) from (
SELECT {columns}, ST_Distance(ST_MakePoint(lat, lon), ST_MakePoint({lon}, {lat})) AS distance
FROM stations
WHERE file IS NOT NULL
ORDER BY distance
LIMIT {amount_neighbours}
) t;
""").format(columns=columns, lon=sql.Placeholder(), lat=sql.Placeholder(), amount_neighbours=sql.SQL(param_interpol["amount_neighbours"]))
cursor.execute(query, values)
neighbours = cursor.fetchall()[0][0]
print(neighbours)
# return neighbours
get_neighbours(53.42, 9.24, sql.SQL('"2015", "2014", "2010"'))
\ No newline at end of file
print(round(finished), end="% ... ")
import os
import sqlite3
from pathlib import Path
from sqlite3 import Error
cwd = Path(os.path.dirname(os.path.abspath(__file__)))
print(cwd)
def create_connection(db_file):
""" create a database connection to a SQLite database """
conn = None
try:
conn = sqlite3.connect(db_file)
print(sqlite3.version)
except Error as e:
print(e)
finally:
if conn:
conn.close()
create_connection('temperatures.db') # ':memory:' for saving in RAM
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment