Skip to content
Snippets Groups Projects
Commit b0abf22d authored by Peter Morstein's avatar Peter Morstein
Browse files

add config.ini; add tests; create and drop DB

parent 3bdd1fcb
No related branches found
No related tags found
No related merge requests found
......@@ -11,43 +11,66 @@ import psycopg2
import psycopg2.extras
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
# import pandas as pd
# import numpy as np
import configparser
stationGPD = None
# Connect to DB "postgres" to check for database "temperatures_berteld_morstein"
def check_for_db_existence(stationList):
print("Checking for database existence")
with psycopg2.connect(database='postgres', user='postgres', password='postgres', host='localhost', port=5432) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE 'temperatures_berteld_morstein';")
cfg = configparser.ConfigParser()
cfg.read('config.ini')
try:
db_exists = cursor.fetchall()[0]
print('DB existing')
assert "POSTGRES" in cfg, "missing POSTGRES in config.ini"
except IndexError: # DB temperatures_berteld_morstein doesn't exist
print('DB not existing')
try:
create_db(connection, cursor)
create_table(stationList)
print('Successfully created database and table')
except (Exception, psycopg2.DatabaseError) as error:
print(error)
param_postgres = cfg["POSTGRES"]
stationGPD = None
# Use existing connection to DB "postgres" to create DB "temperatures_berteld_morstein"
def create_db(connection, cursor):
def create_db(dbName):
print("Create DB: ",dbName)
connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database
create_db_query = sql.SQL("CREATE DATABASE temperatures_berteld_morstein;")
print(create_db_query)
cursor.execute(create_db_query)
with connection.cursor() as cursor:
create_db_query = sql.SQL("""CREATE DATABASE {};""".format(dbName))
cursor.execute(create_db_query)
connection.close()
def drop_db(dbName):
if dbexists(dbName):
print("Drop DB: ",dbName)
try:
connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database
with connection.cursor() as cursor:
cursor.execute("DROP DATABASE {};".format(dbName));
connection.close()
except(psycopg2.DatabaseError) as error:
# do nothing, because test db is clean
print(error.message)
return
def dbexists(dbName):
try:
connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
cursor = connection.cursor()
cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(dbName))
db_exists = cursor.fetchall()[0]
connection.close()
return True
except(Exception, psycopg2.DatabaseError) as error:
# do nothing, because test db is clean
return False
# Connect to DB "postgres" to check for database "temperatures_berteld_morstein"
def check_for_db_existence(stationList, dbName):
print("Checking for database existence")
if dbexists(dbName):
print('DB existing exists')
else:
create_db(dbName)
create_table(stationList, dbName)
# Connect to DB "temperatures_berteld_morstein" to create table "temperatures"
def create_table(stationList):
def create_table(stationList, dbName):
df_columns = list(stationList)
columns = ['id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT']
for column in df_columns:
......@@ -55,15 +78,15 @@ def create_table(stationList):
columns.append('"{}" NUMERIC'.format(column))
columns_clean = str(columns).strip('[]').replace("'", "")
with psycopg2.connect(database='temperatures_berteld_morstein', user='postgres', password='postgres', host='localhost', port=5432) as connection:
with psycopg2.connect(database=dbName, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean))
print(query)
#print(query)
cursor.execute(query)
def insert_data(stationList):
with psycopg2.connect(database='temperatures_berteld_morstein', user='postgres', password='postgres', host='localhost', port=5432) as connection:
def insert_data(stationList, dbName):
with psycopg2.connect(database=dbName, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
if len(stationList) > 0:
......@@ -87,7 +110,6 @@ def insert_data(stationList):
insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values)
psycopg2.extras.execute_batch(cursor, insert_stmt, stationList.values)
def export(stationList):
check_for_db_existence(stationList)
insert_data(stationList)
check_for_db_existence(stationList, param_postgres['dbName'])
insert_data(stationList, param_postgres['dbName'])
......@@ -3,6 +3,14 @@ import json
import psycopg2
from flask import Flask, jsonify, request
from psycopg2 import sql
import configparser
cfg = configparser.ConfigParser()
cfg.read('config.ini')
assert "POSTGRES" in cfg, "missing POSTGRES in config.ini"
param_postgres = cfg["POSTGRES"]
app = Flask(__name__)
app.config['ENV'] = "development"
......@@ -46,7 +54,7 @@ def index():
"{} "
") t;").format(columns, wheres)
with psycopg2.connect(database='temperatures_berteld_morstein', user='postgres', password='postgres', host='localhost', port=5432) as connection:
with psycopg2.connect(database=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
with connection.cursor() as cursor:
print(query.as_string(cursor))
print(values)
......
[POSTGRES]
host = localhost
port = 5432
user = postgres
password = postgres
dbName = temperatures_berteld_morstein
\ No newline at end of file
......@@ -9,17 +9,83 @@ Created on Wed Jul 14 13:45:28 2021
import unittest
import pickle
import ExportToDatabase as cut
import configparser
import psycopg2
import time
cfg = configparser.ConfigParser()
cfg.read('config.ini')
class TestExportToDatabase(unittest.TestCase):
stationList = None
assert "POSTGRES" in cfg, "missing POSTGRES in config.ini"
def testExport(self):
global stationList
with open("./pickle/stationList_germany.pickle", "rb") as pickleFile:
stationList = pickle.load(pickleFile)
stationList = cut.export(stationList)
param_postgres = cfg["POSTGRES"]
stationList = None
testDB = 'weathertestdb'
class TestExportToDatabase(unittest.TestCase):
def testDBExists(self):
print("__Test DB Exists")
#given
cut.drop_db(testDB)
#test
exists = cut.dbexists(testDB)
self.assertFalse(exists)
#finished
print("Test OKAY__")
def testCreateDB(self):
print("__Test CreateDB")
try:
# given
cut.drop_db(testDB)
# test
cut.create_db(testDB)
self.assertTrue(cut.dbexists(testDB))
# finished
cut.drop_db(testDB)
print("Test OKAY__")
except(Exception, psycopg2.DatabaseError) as error:
self.assertRaises(error)
print("Test FAILED__")
def testCreateTable(self):
print("__Test CreateTable")
try:
# given
cut.drop_db(testDB)
cut.create_db(testDB)
# test
with open("./pickle/stationList_with_temperature.pickle", "rb") as pickleFile:
stationList = pickle.load(pickleFile)
stationList = stationList.loc[stationList['country']=="Germany"]
cut.create_table(stationList, testDB)
connection = psycopg2.connect(dbname=testDB, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
cursor = connection.cursor()
columns = cursor.execute("SELECT count(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'stations';")
self.assertTrue(cursor.fetchall()[0][0]>=50)
# finished
connection.close()
cut.drop_db(testDB)
print("Test OKAY__")
except(Exception, psycopg2.DatabaseError) as error:
connection.close()
self.assertRaises(error.message)
print("Test FAILED__")
if __name__ == '__main__':
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment