"""Data Acquisition for DWD global monthly Air Temperature Author: Peter Morstein """ import pandas as pd from ftplib import FTP import pickle import numpy as np import ExportToWorldShape as exportToWorldShape import ExportToDatabase as exportToDatabase from IPython.display import display from pip._internal.utils.misc import tabulate """ example files: dwd gauge monthly mean: https://opendata.dwd.de/climate_environment/CDC/observations_global/CLIMAT/monthly/qc/air_temperature_mean/historical/10961_195301_201812.txt """ stationURL = "https://opendata.dwd.de/climate_environment/CDC/help/stations_list_CLIMAT_data.txt" dwdFtpServer = "opendata.dwd.de" dwdFtpUri = "/climate_environment/CDC/observations_global/CLIMAT/monthly/qc/air_temperature_mean/historical/" countryAnnualTemp = pd.DataFrame([]) stationGPD = None # load the all available stations from DWD service # @return: complete list of available dwd stations def loadDWDGauges(): global stationList print("load DWD Gauges") # load station list from dwd stationList = pd.read_csv(stationURL, delimiter=";", skiprows=0, usecols=[0,2,3,5], names=["id","lon","lat","country"], header=0, encoding="ISO-8859-1 ") stationList = stationList.dropna(how="any", axis=0) stationList['country'] = stationList['country'].str.strip() stationList['lon'] = stationList['lon'].str.strip() stationList['lat'] = stationList['lat'].str.strip() # rename countries to merge with geopandas world shape file stationList.loc[stationList['country']=="Korea, Dem. People's Rep.", 'country'] = 'South Korea' stationList.loc[stationList['country']=="Slovakia (Slovak. Rep.)", 'country'] = 'Slovakia' stationList.loc[stationList['country']=="Slowenia", 'country'] = 'Slovenia' stationList.loc[stationList['country']=="Russian Federation", 'country'] = 'Russia' stationList.loc[stationList['country']=="Bosnia and Herzegowina", 'country'] = 'Bosnia and Herz.' stationList.loc[stationList['country']=="Slovakia (Slovak. Rep.)", 'country'] = 'Slovakia' stationList.loc[stationList['country']=="Croatia/Hrvatska", 'country'] = 'Croatia' stationList.loc[stationList['country']=="Moldova, Rep. Of", 'country'] = 'Moldova' stationList.loc[stationList['country']=="United Kingdom of Great Britain and N.-Ireland ", 'country'] = 'United Kingdom' stationList.loc[stationList['country']=="Czech Republic", 'country'] = 'Czechia' stationList.loc[stationList['country']=="Somalia", 'country'] = 'Somaliland' stationList.loc[stationList['country']=="Iran (Islamic Rep. of)", 'country'] = 'Iran' stationList.loc[stationList['country']=="Mauretania", 'country'] = 'Mauritania' stationList.loc[stationList['country']=="Central African Republic", 'country'] = 'Central African Rep.' stationList.loc[stationList['country']=="South Sudan", 'country'] = 'S. Sudan' stationList.loc[stationList['country']=="Dem. Republic of the Congo", 'country'] = 'Dem. Rep. Congo' stationList.loc[stationList['country']=="Mauretania", 'country'] = 'Somalia' stationList.loc[stationList['country']=="Syrian Arab Rep.", 'country'] = 'Syria' stationList.loc[stationList['country']=="Australien, SW-Pazifik", 'country'] = 'Australia' stationList.loc[stationList['country']=="Western-Sahara",'country'] = "W. Sahara" # load station file names from DWD an join the filename with the stationList def loadAndJoinDWDClimateFilenames(): global stationList print("load dwd climate filenames") # load climate files from dwd dwdFTP = FTP(dwdFtpServer) dwdFTP.login() dwdFTP.cwd(dwdFtpUri) fileList = pd.DataFrame({'id':[],"file":[]}) ftpIds = [] ftpFileNames = [] for file_name in dwdFTP.nlst(): gaugeID = file_name.split("_")[0] if gaugeID in stationList["id"].tolist(): ftpFileNames.append(file_name) ftpIds.append(file_name.split("_")[0]) fileList = pd.DataFrame({'id':ftpIds,"file":ftpFileNames}) ftpIds.clear() ftpFileNames.clear() dwdFTP.quit() # filter climate files list by longest timeseries # (because: there are multiple timeseries-files per station with same historical values) longestSeries = pd.DataFrame() for index, ftpFiles in fileList.groupby("id", axis=0): longestSeries = longestSeries.append(ftpFiles.iloc[-1]) fileList.drop(fileList.index, inplace=True) # concat climate files with station list stationList = stationList.set_index("id").join(longestSeries.set_index("id"), on="id") stationList = stationList.dropna(axis=0, how="any") stationList = stationList[stationList.country!=""] # with open("stationList.pickle","wb") as pf: # pickle.dump(stationList, pf) # here we have to try some interpolations for missing values def fillMissingData(annualData): months = ["Jan", "Feb", "Mrz","Apr","Mai","Jun","Jul","Aug","Sep","Okt","Nov","Dez"] for y in range(0,len(annualData)): # check month for nan values for m in range(0,len(months)): #print(annualData.iloc[y].loc[months[m]]) if np.isnan(annualData.iloc[y].loc[months[m]]): prevYear = None nextYear = None prevMonth = m-1 nextMonth = m+1 if y >= 1: prevYear = y-1 if y < len(annualData)-1: nextYear = y+1 averageList = [] if prevYear != None: averageList.append(annualData.iloc[prevYear].loc[months[m]]) if nextYear != None: averageList.append(annualData.iloc[nextYear].loc[months[m]]) if prevMonth >= 0: averageList.append(annualData.iloc[y].loc[months[prevMonth]]) if prevMonth < 0 and prevYear != None: prevMonth = len(months)-1 averageList.append(annualData.iloc[prevYear].loc[months[prevMonth]]) if nextMonth < len(months): averageList.append(annualData.iloc[y].loc[months[nextMonth]]) if nextMonth >= len(months) and nextYear!=None: nextMonth = 0 averageList.append(annualData.iloc[nextYear].loc[months[nextMonth]]) annualData.iat[y,m] = np.round(np.nanmean(averageList),2) annualData["mean"] = np.round(annualData.iloc[:,0:11].mean(axis=1,skipna=True),2) return annualData # load Temperatures from DWD gauges def loadTemperatureFromDWDGauges(): global climateCountry global stationList global annualData global worldTemperature print("load station temperatures") for index, gaugeCountry in stationList.groupby("country", axis=0): print(index,": ",len(gaugeCountry.country)," gauges to load") gaugeURLs = "https://"+dwdFtpServer+dwdFtpUri+gaugeCountry.file gaugeIds = gaugeCountry.index i = 0 for gid, gurl in zip(gaugeIds, gaugeURLs): annualData = pd.read_csv(gurl, delimiter=";") annualData = annualData.set_index("Jahr") annualData["mean"] = annualData.mean(axis=1) #annualData = fillMissingData(annualData) for dataIndex, annualMean in annualData.iterrows(): try: stationList.at[gid, dataIndex] = annualMean["mean"] except: continue i += 1 if i % 10 == 0: finished = i/len(gaugeCountry.country) * 100 print(np.round(finished), end="% ... ") stationList.columns = stationList.columns.astype(str) stationList = stationList.sort_index(axis=1, ascending=False) def start(): global stationList print("___ DWD Acquisition start___") loadDWDGauges() stationList = stationList.loc[stationList['country'] == "Germany"] loadAndJoinDWDClimateFilenames() loadTemperatureFromDWDGauges() # with open("./pickle/stationList_germany.pickle", "wb") as pickleFile: # pickle.dump(stationList, pickleFile) # stationList = pickle.load(pickleFile) # stationList = pd.read_pickle('./pickle/stationList_germany.pickle') # export station list to different outputs #exportToWorldShape.export(stationList) exportToDatabase.export(stationList) print("___DWD Acquisition finished___") if __name__ == '__main__': start()