#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Jul 14 13:43:45 2021 export the stationlist to database @author: geopeter """ import psycopg2 import psycopg2.extras from psycopg2 import sql from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import configparser cfg = configparser.ConfigParser() cfg.read('config.ini') assert "POSTGRES" in cfg, "missing POSTGRES in config.ini" param_postgres = cfg["POSTGRES"] stationGPD = None # Use existing connection to DB "postgres" to create DB "temperatures_berteld_morstein" def create_db(db_name): print("Create DB: ", db_name) connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database with connection.cursor() as cursor: create_db_query = sql.SQL("""CREATE DATABASE {};""".format(db_name)) cursor.execute(create_db_query) connection.close() def drop_db(db_name): if dbexists(db_name): print("Drop DB: ", db_name) try: connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Needs to be in AUTOCOMMIT mode for creating database with connection.cursor() as cursor: cursor.execute("DROP DATABASE {};".format(db_name)) connection.close() except psycopg2.DatabaseError as error: # do nothing, because test db is clean print(error.message) return def dbexists(db_name): try: connection = psycopg2.connect(dbname='postgres', user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) cursor = connection.cursor() cursor.execute("SELECT datname FROM pg_database WHERE datname LIKE '{}';".format(db_name)) db_exists = cursor.fetchall()[0] connection.close() return True except(Exception, psycopg2.DatabaseError) as error: # do nothing, because test db is clean return False # Connect to DB "postgres" to check for database "temperatures_berteld_morstein" def check_for_db_existence(station_list, db_name): print("Checking for database existence") if dbexists(db_name): print('DB existing exists') else: create_db(db_name) create_table(station_list, db_name) # Connect to DB "temperatures_berteld_morstein" to create table "temperatures" def create_table(station_list, db_name): df_columns = list(station_list) columns = ['id INTEGER', 'lon NUMERIC', 'lat NUMERIC', 'country TEXT', 'file TEXT'] for column in df_columns: if str(column).startswith('19') or str(column).startswith('20'): columns.append('"{}" NUMERIC'.format(column)) columns_clean = str(columns).strip('[]').replace("'", "") with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: with connection.cursor() as cursor: query = sql.SQL("""CREATE TABLE stations ({});""".format(columns_clean)) #print(query) cursor.execute(query) def insert_data(station_list, db_name): with psycopg2.connect(database=db_name, user=param_postgres["user"], password=param_postgres["password"], host=param_postgres["host"], port=param_postgres["port"]) as connection: with connection.cursor() as cursor: if len(station_list) > 0: # print(stationList) cursor.execute("DELETE FROM stations;") df_columns = list(station_list) # create (col1,col2,...) # As integers like 2018, 2017, etc. are not possible as column names, double quotes have to be added. This requires some tricks and cleanups columns = [] for column in df_columns: columns.append('"' + column + '"') columns = str(columns).replace('[', '').replace(']', '').replace("'", "").replace('\n', '').replace(' ', '') station_list = station_list.round(decimals=3) # create VALUES('%s', '%s",...) one '%s' per column values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) # create INSERT INTO table (columns) VALUES('%s',...) insert_stmt = """INSERT INTO {} ({}) {}""".format('stations', columns, values) psycopg2.extras.execute_batch(cursor, insert_stmt, station_list.values) def export(station_list): check_for_db_existence(station_list, param_postgres['dbName']) insert_data(station_list, param_postgres['dbName'])