Skip to content
Snippets Groups Projects
DwdAcquisition.py 7.06 KiB
"""Data Acquisition for DWD global monthly Air Temperature 
Author: Peter Morstein
"""

import pandas as pd
from ftplib import FTP
import numpy as np
#import ExportToWorldShape as exportToWorldShape
import ExportToDatabase as exportToDatabase

""" 
example files:
dwd gauge monthly mean: https://opendata.dwd.de/climate_environment/CDC/observations_global/CLIMAT/monthly/qc/air_temperature_mean/historical/10961_195301_201812.txt
"""

stationURL = "https://opendata.dwd.de/climate_environment/CDC/help/stations_list_CLIMAT_data.txt"
dwdFtpServer = "opendata.dwd.de"
dwdFtpUri = "/climate_environment/CDC/observations_global/CLIMAT/monthly/qc/air_temperature_mean/historical/"
countryAnnualTemp = pd.DataFrame([])
stationGPD = None

# load the all available stations from DWD service
# @return: complete list of available dwd stations
def loadDWDGauges():
    global stationList
    print("load DWD Gauges")
    # load station list from dwd
    stationList = pd.read_csv(stationURL, delimiter=";", skiprows=0, usecols=[0,2,3,5], names=["id","lat","lon","country"], header=0, encoding="ISO-8859-1 ")
    stationList = stationList.dropna(how="any", axis=0) 
    stationList['country'] = stationList['country'].str.strip()
    stationList['lon'] = stationList['lon'].str.strip()
    stationList['lat'] = stationList['lat'].str.strip()
    
    return stationList

def filterDWDGauges():
    global stationList
    stationList = stationList.loc[stationList['country'] == "Germany"]
    
    stationList['lat'] = pd.to_numeric(stationList['lat'], errors='coerce', downcast='float')
    
    stationList = stationList.loc[stationList['lat']>40]
    
    return stationList

# load station file names from DWD an join the filename with the stationList
def loadAndJoinDWDClimateFilenames():
    global stationList
    print("load dwd climate filenames")
    # load climate files from dwd
    dwdFTP = FTP(dwdFtpServer)
    dwdFTP.login()
    dwdFTP.cwd(dwdFtpUri)
    
    fileList = pd.DataFrame({'id':[],"file":[]})
    ftpIds = []
    ftpFileNames = []
    
    for file_name in dwdFTP.nlst():
        
        gaugeID = file_name.split("_")[0]
        
        if gaugeID in stationList["id"].tolist():
            ftpFileNames.append(file_name)
            ftpIds.append(file_name.split("_")[0])
            
    fileList = pd.DataFrame({'id':ftpIds,"file":ftpFileNames})
    ftpIds.clear()
    ftpFileNames.clear()
    
    dwdFTP.quit()
    
    # filter climate files list by longest timeseries 
    # (because: there are multiple timeseries-files per station with same historical values)
    longestSeries = pd.DataFrame()
    for index, ftpFiles in fileList.groupby("id", axis=0):
        longestSeries = longestSeries.append(ftpFiles.iloc[-1])
    fileList.drop(fileList.index, inplace=True)
    
    # concat climate files with station list
    stationList = stationList.set_index("id").join(longestSeries.set_index("id"), on="id")
    stationList = stationList.dropna(axis=0, how="any")
    stationList = stationList[stationList.country!=""]
    
    # with open("stationList.pickle","wb") as pf:
    #      pickle.dump(stationList, pf)


# here we have to try some interpolations for missing values
def fillMissingData(annualData):
    months = ["Jan", "Feb", "Mrz","Apr","Mai","Jun","Jul","Aug","Sep","Okt","Nov","Dez"]
    
    for y in range(0,len(annualData)):
        
        # check month for nan values
        for m in range(0,len(months)):
            #print(annualData.iloc[y].loc[months[m]])
            if np.isnan(annualData.iloc[y].loc[months[m]]):
                
                prevYear = None
                nextYear = None
                prevMonth = m-1
                nextMonth = m+1
                
                if y >= 1:
                    prevYear = y-1
                if y < len(annualData)-1:
                    nextYear = y+1
                
                averageList = []
                if prevYear != None:
                    averageList.append(annualData.iloc[prevYear].loc[months[m]])
                
                if nextYear != None:
                    averageList.append(annualData.iloc[nextYear].loc[months[m]])
                
                if prevMonth >= 0:
                    averageList.append(annualData.iloc[y].loc[months[prevMonth]])
                
                if prevMonth < 0 and prevYear != None:
                     prevMonth = len(months)-1
                     averageList.append(annualData.iloc[prevYear].loc[months[prevMonth]])
                
                if nextMonth < len(months):
                    averageList.append(annualData.iloc[y].loc[months[nextMonth]])
                
                if nextMonth >= len(months) and nextYear!=None:
                     nextMonth = 0
                     averageList.append(annualData.iloc[nextYear].loc[months[nextMonth]])
                                        
                annualData.iat[y,m] = np.round(np.nanmean(averageList),2)

    annualData["mean"] = np.round(annualData.iloc[:,0:11].mean(axis=1,skipna=True),2)
    
    return annualData


# load Temperatures from DWD gauges
def loadTemperatureFromDWDGauges():
    global climateCountry
    global stationList
    global annualData
    global worldTemperature
    
    print("load station temperatures")
    
    for index, gaugeCountry in stationList.groupby("country", axis=0):
        
        print(index,": ",len(gaugeCountry.country)," gauges to load")
        gaugeURLs = "https://"+dwdFtpServer+dwdFtpUri+gaugeCountry.file
        gaugeIds = gaugeCountry.index
        i = 0
        for gid, gurl in zip(gaugeIds, gaugeURLs):
            annualData = pd.read_csv(gurl, delimiter=";")
            annualData = annualData.set_index("Jahr")
            annualData["mean"] = annualData.mean(axis=1)
            #annualData = fillMissingData(annualData)
            
            for dataIndex, annualMean in annualData.iterrows():
                try:
                    stationList.at[gid, dataIndex] = annualMean["mean"]
                except:
                    continue
            i += 1
            
            if i % round((len(gaugeCountry.country) / 10)) == 0:
                finished = i/len(gaugeCountry.country) * 100
                
                print(np.round(finished), end="% ... ")
            if i == (len(gaugeCountry.country)):
                print('', end=' 100%, Done. \n')
            
    stationList.columns = stationList.columns.astype(str)
    stationList = stationList.sort_index(axis=1, ascending=False)


def start():
    global stationList
    print("___ DWD Acquisition start___")
    
    loadDWDGauges()
    filterDWDGauges()
    loadAndJoinDWDClimateFilenames()
    loadTemperatureFromDWDGauges()

    # with open("./pickle/stationList_germany.pickle", "wb") as pickleFile:
    #     pickle.dump(stationList, pickleFile)
        # stationList = pickle.load(pickleFile)
        # stationList = pd.read_pickle('./pickle/stationList_germany.pickle')

    # export station list to different outputs
    #exportToWorldShape.export(stationList)

    exportToDatabase.export(stationList)
    
    print("___DWD Acquisition finished___")


if __name__ == '__main__':
    start()