"""Data Acquisition for DWD global monthly Air Temperature Author: Peter Morstein """ import pandas as pd from ftplib import FTP import pickle import numpy as np import ExportToWorldShape as exportToWorldShape import ExportToDatabase as exportToDatabase """ example files: dwd gauge monthly mean: https://opendata.dwd.de/climate_environment/CDC/observations_global/CLIMAT/monthly/qc/air_temperature_mean/historical/10961_195301_201812.txt """ stationURL = "https://opendata.dwd.de/climate_environment/CDC/help/stations_list_CLIMAT_data.txt" dwdFtpServer = "opendata.dwd.de" dwdFtpUri = "/climate_environment/CDC/observations_global/CLIMAT/monthly/qc/air_temperature_mean/historical/" countryAnnualTemp = pd.DataFrame([]) stationGPD = None # load the all available stations from DWD service # @return: complete list of available dwd stations def loadDWDGauges(): global stationList print("load DWD Gauges") # load station list from dwd stationList = pd.read_csv(stationURL, delimiter=";", skiprows=0, usecols=[0,2,3,5], names=["id","lon","lat","country"], header=0, encoding="ISO-8859-1 ") stationList = stationList.dropna(how="any", axis=0) stationList['country'] = stationList['country'].str.strip() stationList['lon'] = stationList['lon'].str.strip() stationList['lat'] = stationList['lat'].str.strip() # rename countries to merge with geopandas world shape file stationList.loc[stationList['country']=="Korea, Dem. People's Rep.", 'country'] = 'South Korea' stationList.loc[stationList['country']=="Slovakia (Slovak. Rep.)", 'country'] = 'Slovakia' stationList.loc[stationList['country']=="Slowenia", 'country'] = 'Slovenia' stationList.loc[stationList['country']=="Russian Federation", 'country'] = 'Russia' stationList.loc[stationList['country']=="Bosnia and Herzegowina", 'country'] = 'Bosnia and Herz.' stationList.loc[stationList['country']=="Slovakia (Slovak. Rep.)", 'country'] = 'Slovakia' stationList.loc[stationList['country']=="Croatia/Hrvatska", 'country'] = 'Croatia' stationList.loc[stationList['country']=="Moldova, Rep. Of", 'country'] = 'Moldova' stationList.loc[stationList['country']=="United Kingdom of Great Britain and N.-Ireland ", 'country'] = 'United Kingdom' stationList.loc[stationList['country']=="Czech Republic", 'country'] = 'Czechia' stationList.loc[stationList['country']=="Somalia", 'country'] = 'Somaliland' stationList.loc[stationList['country']=="Iran (Islamic Rep. of)", 'country'] = 'Iran' stationList.loc[stationList['country']=="Mauretania", 'country'] = 'Mauritania' stationList.loc[stationList['country']=="Central African Republic", 'country'] = 'Central African Rep.' stationList.loc[stationList['country']=="South Sudan", 'country'] = 'S. Sudan' stationList.loc[stationList['country']=="Dem. Republic of the Congo", 'country'] = 'Dem. Rep. Congo' stationList.loc[stationList['country']=="Mauretania", 'country'] = 'Somalia' stationList.loc[stationList['country']=="Syrian Arab Rep.", 'country'] = 'Syria' stationList.loc[stationList['country']=="Australien, SW-Pazifik", 'country'] = 'Australia' stationList.loc[stationList['country']=="Western-Sahara",'country'] = "W. Sahara" # load station file names from DWD an join the filename with the stationList def loadAndJoinDWDClimateFilenames(): global stationList print("load dwd climate filenames") # load climate files from dwd dwdFTP = FTP(dwdFtpServer) dwdFTP.login() dwdFTP.cwd(dwdFtpUri) fileList = pd.DataFrame({'id':[],"file":[]}) ftpIds = [] ftpFileNames = [] for file_name in dwdFTP.nlst(): gaugeID = file_name.split("_")[0] if gaugeID in stationList["id"].tolist(): ftpFileNames.append(file_name) ftpIds.append(file_name.split("_")[0]) fileList = pd.DataFrame({'id':ftpIds,"file":ftpFileNames}) ftpIds.clear() ftpFileNames.clear() dwdFTP.quit() # filter climate files list by longest timeseries # (because: there are multiple timeseries-files per station with same historical values) longestSeries = pd.DataFrame() for index, ftpFiles in fileList.groupby("id", axis=0): longestSeries = longestSeries.append(ftpFiles.iloc[-1]) fileList.drop(fileList.index, inplace=True) # concat climate files with station list stationList = stationList.set_index("id").join(longestSeries.set_index("id"), on="id") stationList = stationList.dropna(axis=0, how="any") stationList = stationList[stationList.country!=""] # with open("stationList.pickle","wb") as pf: # pickle.dump(stationList, pf) # here we have to try some interpolations for missing values def fillMissingData(annualData): months = ["Jan", "Feb", "Mrz","Apr","Mai","Jun","Jul","Aug","Sep","Okt","Nov","Dez"] for y in range(0,len(annualData)): # check month for nan values for m in range(0,len(months)): #print(annualData.iloc[y].loc[months[m]]) if np.isnan(annualData.iloc[y].loc[months[m]]): prevYear = None nextYear = None prevMonth = m-1 nextMonth = m+1 if y >= 1: prevYear = y-1 if y < len(annualData)-1: nextYear = y+1 averageList = [] if prevYear != None: averageList.append(annualData.iloc[prevYear].loc[months[m]]) if nextYear != None: averageList.append(annualData.iloc[nextYear].loc[months[m]]) if prevMonth >= 0: averageList.append(annualData.iloc[y].loc[months[prevMonth]]) if prevMonth < 0 and prevYear != None: prevMonth = len(months)-1 averageList.append(annualData.iloc[prevYear].loc[months[prevMonth]]) if nextMonth < len(months): averageList.append(annualData.iloc[y].loc[months[nextMonth]]) if nextMonth >= len(months) and nextYear!=None: nextMonth = 0 averageList.append(annualData.iloc[nextYear].loc[months[nextMonth]]) annualData.iat[y,m] = np.round(np.nanmean(averageList),2) annualData["mean"] = np.round(annualData.iloc[:,0:11].mean(axis=1,skipna=True),2) return annualData # load Temperatures from DWD gauges def loadTemperatureFromDWDGauges(): global climateCountry global stationList global annualData global worldTemperature print("load station temperatures") for index, gaugeCountry in stationList.groupby("country", axis=0): print(index,": ",len(gaugeCountry.country)," gauges to load") gaugeURLs = "https://"+dwdFtpServer+dwdFtpUri+gaugeCountry.file gaugeIds = gaugeCountry.index i = 0 for gid, gurl in zip(gaugeIds, gaugeURLs): annualData = pd.read_csv(gurl, delimiter=";") annualData = annualData.set_index("Jahr") annualData["mean"] = annualData.mean(axis=1) #annualData = fillMissingData(annualData) for dataIndex, annualMean in annualData.iterrows(): try: stationList.at[gid, dataIndex] = annualMean["mean"] except: continue i += 1 if i % 10 == 0: finished = i/len(gaugeCountry.country) * 100 print(np.round(finished), end="% ... ") stationList.columns = stationList.columns.astype(str) stationList = stationList.sort_index(axis=1, ascending=False) def start(): global stationList print("___ DWD Acquisition start___") loadDWDGauges() stationList = stationList.loc[stationList['country']=="Germany"] loadAndJoinDWDClimateFilenames() loadTemperatureFromDWDGauges() with open("./pickle/stationList_germany.pickle", "wb") as pickleFile: pickle.dump(stationList, pickleFile) # export station list to different outputs #exportToWorldShape.export(stationList) exportToDatabase.export(stationList) print("___DWD Acquisition finished___") if __name__ == '__main__': start()