Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 14 13:45:28 2021
@author: geopeter
import dataacquisition.ExportToDatabase as cut
import configparser
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from psycopg2 import sql
cfg = configparser.ConfigParser()
assert "POSTGRES" in cfg, "missing POSTGRES in config.ini"
param_postgres = cfg["POSTGRES"]
stationList = None
testDB = 'weathertestdb'
class TestExportToDatabase(unittest.TestCase):
print("__Test DB Exists")
#given
check = False
connection = psycopg2.connect(database="postgres", user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with connection.cursor() as cursor:
cut.drop_db(connection, testDB)
connection = psycopg2.connect(database="postgres", user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
with connection.cursor() as cursor:
exists = cut.dbexists(cursor, testDB)
self.assertFalse(exists)
check = True
if connection:
connection.close()
self.assertTrue(check)
#finished
print("Test OKAY__")
print("__Test CreateDB")
try:
# given
check = False
connection = psycopg2.connect(database="postgres", user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with connection.cursor() as cursor:
cut.drop_db(cursor, testDB)
#test
cut.create_db(cursor, testDB)
check = True
# finished
cut.drop_db(cursor, testDB)
if connection:
connection.close()
# # finished
# with psycopg2.connect(database="postgres", user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection:
# connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
# with connection.cursor() as cursor:
self.assertTrue(check)
print("Test OKAY__")
except(Exception, psycopg2.DatabaseError) as error:
self.assertRaises(error)
print("Test FAILED__")
def testForStationExistence(self):
print("__Test Check for Table Existence")
# given
check = False
self.dropDB()
self.createDB()
try:
connection = psycopg2.connect(dbname=testDB, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
with connection.cursor() as cursor:
exists = cut.check_for_stations_existence(cursor, "stations")
self.assertFalse(exists)
check = True
finally:
if connection:
connection.close()
self.assertTrue(check)
print("__Test CreateTable")
try:
# given
connection = psycopg2.connect(dbname="postgres", user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with connection.cursor() as cursor:
cut.drop_db(cursor, testDB)
cut.create_db(cursor, testDB)
if connection:
connection.close()
with open("./pickle/stationList_with_temperature.pickle", "rb") as pickleFile:
stationList = pickle.load(pickleFile)
stationList = stationList.loc[stationList['country']=="Germany"]
connection = psycopg2.connect(dbname=testDB, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cursor = connection.cursor()
columns = cursor.execute("SELECT count(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'stations';")
# finished´
if connection:
connection.close()
connection = psycopg2.connect(dbname="postgres", user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with connection.cursor() as cursor:
cut.drop_db(cursor, testDB)
if connection:
connection.close()
print("Test OKAY__")
except(Exception, psycopg2.DatabaseError) as error:
self.assertRaises(error.message)
print("Test FAILED__")
def testCreateInsertStatement(self):
print("__Test CreateInsertStatement")
with open("./pickle/stationList_with_temperature.pickle", "rb") as pickleFile:
stationList = pickle.load(pickleFile)
stationList = stationList.loc[stationList['country']=="Germany"]
stationList["station_id"] = stationList.index
connection = psycopg2.connect(dbname=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
with connection.cursor() as cursor:
insert_stmt = cut.createInsertStatement(cursor, stationList)
self.assertTrue(insert_stmt.startswith("INSERT INTO"))
self.assertTrue(insert_stmt.endswith("');"))
def _testIntegrationTest(self):
print("__Test Integrationtest")
with open("./pickle/stationList_with_temperature.pickle", "rb") as pickleFile:
stationList = pickle.load(pickleFile)
stationList = stationList.loc[stationList['country']=="Germany"]
# cut.drop_db(param_postgres['dbName']);
stationList["station_id"] = stationList.index
stationList.columns = stationList.columns.astype(str)
connection = psycopg2.connect(dbname=param_postgres["dbName"], user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
with connection.cursor() as cursor:
stationsExists = cut.check_for_stations_existence(cursor, "stations")
self.assertTrue(stationsExists)
cursor.execute(sql.SQL("SELECT * FROM stations"))
result = cursor.fetchall()
self.assertTrue(len(result)>1)
def dropDB(self):
connection = psycopg2.connect(dbname="postgres", user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with connection.cursor() as cursor:
cut.drop_db(cursor, testDB)
if connection:
connection.close()
def createDB(self):
try:
connection = psycopg2.connect(dbname="postgres", user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"])
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with connection.cursor() as cursor:
cut.create_db(cursor, testDB)
finally:
if connection:
connection.close()
if __name__ == '__main__':
unittest.main()