Skip to content
Snippets Groups Projects
DwdAcquisition.py 8.62 KiB
Newer Older
Peter Morstein's avatar
Peter Morstein committed
"""Data Acquisition for DWD global monthly Air Temperature 
Author: Peter Morstein
"""

import pandas as pd
from ftplib import FTP
import pickle
import numpy as np
import ExportToWorldShape as exportToWorldShape
import ExportToDatabase as exportToDatabase
Clemens Berteld's avatar
Clemens Berteld committed
from IPython.display import display
from pip._internal.utils.misc import tabulate

""" 
example files:
dwd gauge monthly mean: https://opendata.dwd.de/climate_environment/CDC/observations_global/CLIMAT/monthly/qc/air_temperature_mean/historical/10961_195301_201812.txt
"""
Peter Morstein's avatar
Peter Morstein committed

stationURL = "https://opendata.dwd.de/climate_environment/CDC/help/stations_list_CLIMAT_data.txt"
dwdFtpServer = "opendata.dwd.de"
dwdFtpUri = "/climate_environment/CDC/observations_global/CLIMAT/monthly/qc/air_temperature_mean/historical/"
countryAnnualTemp = pd.DataFrame([])
stationGPD = None
Peter Morstein's avatar
Peter Morstein committed

# load the all available stations from DWD service
# @return: complete list of available dwd stations
Peter Morstein's avatar
Peter Morstein committed
def loadDWDGauges():
    global stationList
    print("load DWD Gauges")
Peter Morstein's avatar
Peter Morstein committed
    # load station list from dwd
    stationList = pd.read_csv(stationURL, delimiter=";", skiprows=0, usecols=[0,2,3,5], names=["id","lon","lat","country"], header=0, encoding="ISO-8859-1 ")
    stationList = stationList.dropna(how="any", axis=0) 
    stationList['country'] = stationList['country'].str.strip()
    stationList['lon'] = stationList['lon'].str.strip()
    stationList['lat'] = stationList['lat'].str.strip()
    
Peter Morstein's avatar
Peter Morstein committed
    # rename countries to merge with geopandas world shape file
Peter Morstein's avatar
Peter Morstein committed
    stationList.loc[stationList['country']=="Korea, Dem. People's Rep.", 'country'] = 'South Korea'
    stationList.loc[stationList['country']=="Slovakia (Slovak. Rep.)", 'country'] = 'Slovakia'
    stationList.loc[stationList['country']=="Slowenia", 'country'] = 'Slovenia'
    stationList.loc[stationList['country']=="Russian Federation", 'country'] = 'Russia'
    stationList.loc[stationList['country']=="Bosnia and Herzegowina", 'country'] = 'Bosnia and Herz.'
    stationList.loc[stationList['country']=="Slovakia (Slovak. Rep.)", 'country'] = 'Slovakia'
    stationList.loc[stationList['country']=="Croatia/Hrvatska", 'country'] = 'Croatia'
    stationList.loc[stationList['country']=="Moldova, Rep. Of", 'country'] = 'Moldova'
    stationList.loc[stationList['country']=="United Kingdom of Great Britain and N.-Ireland ", 'country'] = 'United Kingdom'
    stationList.loc[stationList['country']=="Czech Republic", 'country'] = 'Czechia'
    stationList.loc[stationList['country']=="Somalia", 'country'] = 'Somaliland'
    stationList.loc[stationList['country']=="Iran (Islamic Rep. of)", 'country'] = 'Iran'
    stationList.loc[stationList['country']=="Mauretania", 'country'] = 'Mauritania'
    stationList.loc[stationList['country']=="Central African Republic", 'country'] = 'Central African Rep.'
    stationList.loc[stationList['country']=="South Sudan", 'country'] = 'S. Sudan'
    stationList.loc[stationList['country']=="Dem. Republic of the Congo", 'country'] = 'Dem. Rep. Congo'
    stationList.loc[stationList['country']=="Mauretania", 'country'] = 'Somalia'
    stationList.loc[stationList['country']=="Syrian Arab Rep.", 'country'] = 'Syria'
    stationList.loc[stationList['country']=="Australien, SW-Pazifik", 'country'] = 'Australia'
    stationList.loc[stationList['country']=="Western-Sahara",'country'] = "W. Sahara"
    

# load station file names from DWD an join the filename with the stationList
def loadAndJoinDWDClimateFilenames():
    global stationList
    print("load dwd climate filenames")
Peter Morstein's avatar
Peter Morstein committed
    # load climate files from dwd
    dwdFTP = FTP(dwdFtpServer)
    dwdFTP.login()
    dwdFTP.cwd(dwdFtpUri)
    
    fileList = pd.DataFrame({'id':[],"file":[]})
    ftpIds = []
    ftpFileNames = []
Peter Morstein's avatar
Peter Morstein committed
    for file_name in dwdFTP.nlst():
        
        gaugeID = file_name.split("_")[0]
        
        if gaugeID in stationList["id"].tolist():
            ftpFileNames.append(file_name)
            ftpIds.append(file_name.split("_")[0])
            
Peter Morstein's avatar
Peter Morstein committed
    fileList = pd.DataFrame({'id':ftpIds,"file":ftpFileNames})
    ftpIds.clear()
    ftpFileNames.clear()
    
    dwdFTP.quit()
    
Peter Morstein's avatar
Peter Morstein committed
    # filter climate files list by longest timeseries 
    # (because: there are multiple timeseries-files per station with same historical values)
Peter Morstein's avatar
Peter Morstein committed
    longestSeries = pd.DataFrame()
    for index, ftpFiles in fileList.groupby("id", axis=0):
        longestSeries = longestSeries.append(ftpFiles.iloc[-1])
    fileList.drop(fileList.index, inplace=True)
    
    # concat climate files with station list
    stationList = stationList.set_index("id").join(longestSeries.set_index("id"), on="id")
    stationList = stationList.dropna(axis=0, how="any")
    stationList = stationList[stationList.country!=""]
    
    # with open("stationList.pickle","wb") as pf:
    #      pickle.dump(stationList, pf)

# here we have to try some interpolations for missing values
Peter Morstein's avatar
Peter Morstein committed
def fillMissingData(annualData):
    months = ["Jan", "Feb", "Mrz","Apr","Mai","Jun","Jul","Aug","Sep","Okt","Nov","Dez"]
    
    for y in range(0,len(annualData)):
        
        # check month for nan values
        for m in range(0,len(months)):
            #print(annualData.iloc[y].loc[months[m]])
            if np.isnan(annualData.iloc[y].loc[months[m]]):
                
                prevYear = None
                nextYear = None
                prevMonth = m-1
                nextMonth = m+1
                
                if y >= 1:
                    prevYear = y-1
                if y < len(annualData)-1:
                    nextYear = y+1
                
                averageList = []
                if prevYear != None:
                    averageList.append(annualData.iloc[prevYear].loc[months[m]])
                
                if nextYear != None:
                    averageList.append(annualData.iloc[nextYear].loc[months[m]])
                
                if prevMonth >= 0:
                    averageList.append(annualData.iloc[y].loc[months[prevMonth]])
                
                if prevMonth < 0 and prevYear != None:
                     prevMonth = len(months)-1
                     averageList.append(annualData.iloc[prevYear].loc[months[prevMonth]])
                
                if nextMonth < len(months):
                    averageList.append(annualData.iloc[y].loc[months[nextMonth]])
                
                if nextMonth >= len(months) and nextYear!=None:
                     nextMonth = 0
                     averageList.append(annualData.iloc[nextYear].loc[months[nextMonth]])
                                        
                annualData.iat[y,m] = np.round(np.nanmean(averageList),2)

    annualData["mean"] = np.round(annualData.iloc[:,0:11].mean(axis=1,skipna=True),2)
    
    return annualData

# load Temperatures from DWD gauges
Peter Morstein's avatar
Peter Morstein committed
def loadTemperatureFromDWDGauges():
    global climateCountry
    global stationList
    global annualData
    global worldTemperature
    
    print("load station temperatures")
Peter Morstein's avatar
Peter Morstein committed
    
    for index, gaugeCountry in stationList.groupby("country", axis=0):
        
        print(index,": ",len(gaugeCountry.country)," gauges to load")
Peter Morstein's avatar
Peter Morstein committed
        gaugeURLs = "https://"+dwdFtpServer+dwdFtpUri+gaugeCountry.file
        gaugeIds = gaugeCountry.index
Peter Morstein's avatar
Peter Morstein committed
        for gid, gurl in zip(gaugeIds, gaugeURLs):
            annualData = pd.read_csv(gurl, delimiter=";")
            annualData = annualData.set_index("Jahr")
Peter Morstein's avatar
Peter Morstein committed
            annualData["mean"] = annualData.mean(axis=1)
            #annualData = fillMissingData(annualData)
Peter Morstein's avatar
Peter Morstein committed
            
            for dataIndex, annualMean in annualData.iterrows():
                try:
                    stationList.at[gid, dataIndex] = annualMean["mean"]
                except:
                    continue
            i += 1
Peter Morstein's avatar
Peter Morstein committed
            
            if i % 10 == 0:
                finished = i/len(gaugeCountry.country) * 100
                
                print(np.round(finished), end="% ... ")
Peter Morstein's avatar
Peter Morstein committed
            
    stationList.columns = stationList.columns.astype(str)
    stationList = stationList.sort_index(axis=1, ascending=False)
def start():
Peter Morstein's avatar
Peter Morstein committed
    global stationList
    print("___ DWD Acquisition start___")
Peter Morstein's avatar
Peter Morstein committed
    
    loadDWDGauges()
    stationList = stationList.loc[stationList['country'] == "Germany"]
Clemens Berteld's avatar
Clemens Berteld committed
    loadAndJoinDWDClimateFilenames()
    loadTemperatureFromDWDGauges()

    # with open("./pickle/stationList_germany.pickle", "wb") as pickleFile:
    #     pickle.dump(stationList, pickleFile)
        # stationList = pickle.load(pickleFile)
        # stationList = pd.read_pickle('./pickle/stationList_germany.pickle')


Peter Morstein's avatar
Peter Morstein committed
    
    # export station list to different outputs
    #exportToWorldShape.export(stationList)
    exportToDatabase.export(stationList)
Peter Morstein's avatar
Peter Morstein committed
    
    print("___DWD Acquisition finished___")
if __name__ == '__main__':