diff --git a/scripts/export.py b/scripts/export.py index 6a239e3dc..3a634202b 100644 --- a/scripts/export.py +++ b/scripts/export.py @@ -14,6 +14,8 @@ ERA5LandExporter, ) +from src.exporters import FEWSNetKenyaLivelihoodExporter + from scripts.utils import get_data_path @@ -159,8 +161,14 @@ def export_kenya_boundaries(): exporter.export() +def export_fewsnet_shapefiles(): + data_path = get_data_path() + exporter = FEWSNetKenyaLivelihoodExporter(data_path) + exporter.export() + + if __name__ == "__main__": - export_era5_land() + # export_era5_land() # export_era5() # export_vhi() # export_chirps() @@ -169,3 +177,4 @@ def export_kenya_boundaries(): # export_esa() # export_s5() # export_kenya_boundaries() + export_fewsnet_shapefiles() diff --git a/scripts/preprocess.py b/scripts/preprocess.py index b613fe0f5..8256ce627 100644 --- a/scripts/preprocess.py +++ b/scripts/preprocess.py @@ -1,6 +1,7 @@ import sys sys.path.append("..") + from src.preprocess import ( VHIPreprocessor, CHIRPSPreprocessor, @@ -11,6 +12,7 @@ SRTMPreprocessor, ERA5MonthlyMeanPreprocessor, KenyaASALMask, + FEWSNetLivelihoodPreprocessor, ) from src.preprocess.admin_boundaries import KenyaAdminPreprocessor @@ -137,15 +139,32 @@ def preprocess_era5(): processor.preprocess(subset_str="kenya", regrid=regrid_path) +def preprocess_livelihood_zones(): + data_path = get_data_path() + + from pathlib import Path + + data_path = Path("/Volumes/Lees_Extend/data/ecmwf_sowc/data/") + + regrid_path = data_path / "interim/chirps_preprocessed/data_kenya.nc" + assert regrid_path.exists(), f"{regrid_path} not available" + + processor = FEWSNetLivelihoodPreprocessor(data_path) + processor.preprocess( + reference_nc_filepath=regrid_path, country_to_preprocess="kenya" + ) + + if __name__ == "__main__": - process_vci_2018() - process_precip_2018() - process_era5POS_2018() - process_gleam() - process_esa_cci_landcover() - preprocess_srtm() - preprocess_era5() - preprocess_kenya_boundaries(selection="level_1") - preprocess_kenya_boundaries(selection="level_2") - preprocess_kenya_boundaries(selection="level_3") - preprocess_asal_mask() + # process_vci_2018() + # process_precip_2018() + # process_era5POS_2018() + # process_gleam() + # process_esa_cci_landcover() + # preprocess_srtm() + # preprocess_era5() + # preprocess_kenya_boundaries(selection="level_1") + # preprocess_kenya_boundaries(selection="level_2") + # preprocess_kenya_boundaries(selection="level_3") + # preprocess_asal_mask() + preprocess_livelihood_zones() diff --git a/src/exporters/__init__.py b/src/exporters/__init__.py index 8a93ff95e..2cd0ea603 100644 --- a/src/exporters/__init__.py +++ b/src/exporters/__init__.py @@ -8,6 +8,7 @@ from .srtm import SRTMExporter from .esa_cci import ESACCIExporter from .admin_boundaries import KenyaAdminExporter +from .fewsnet_shapefiles import FEWSNetKenyaLivelihoodExporter __all__ = [ "ERA5Exporter", @@ -20,4 +21,5 @@ "ESACCIExporter", "ERA5LandExporter", "KenyaAdminExporter", + "FEWSNetKenyaLivelihoodExporter", ] diff --git a/src/exporters/fewsnet_shapefiles.py b/src/exporters/fewsnet_shapefiles.py new file mode 100644 index 000000000..657820ddc --- /dev/null +++ b/src/exporters/fewsnet_shapefiles.py @@ -0,0 +1,55 @@ +from pathlib import Path +import os + +from .base import BaseExporter + + +class FEWSNetExporter(BaseExporter): + """Export FEWSNet data + + https://fews.net/ + + TODO: need to use Selenium to navigate this page? + https://fews.net/data + """ + + data_str: str + + def __init__(self, data_folder: Path = Path("data")) -> None: + super().__init__(data_folder) + # write the download to landcover + self.output_dir = self.raw_folder / "boundaries" / self.data_str + if not self.output_dir.exists(): + self.output_dir.mkdir(parents=True, exist_ok=True) + + def wget_file(self, url_path: str) -> None: + output_file = self.output_dir / url_path.split("/")[-1] + if output_file.exists(): + print(f"{output_file} already exists! Skipping") + return None + + os.system(f"wget {url_path} -P {self.output_dir.as_posix()}") + + def unzip(self, fname: Path) -> None: + print(f"Unzipping {fname.name}") + + os.system(f"unzip {fname.as_posix()} -d {self.output_dir.resolve().as_posix()}") + print(f"{fname.name} unzipped!") + + +class FEWSNetKenyaLivelihoodExporter(FEWSNetExporter): + data_str = "livelihood_zones" + url: str = "http://shapefiles.fews.net.s3.amazonaws.com/LHZ/FEWS_NET_LH_World.zip" + + def export(self) -> None: + """Export functionality for the FEWSNET Livelihood Zones as .shp files + """ + + fname = self.url.split("/")[-1] + # check if the file already exists + if (self.output_dir / fname).exists(): + print("Data already downloaded!") + + else: + self.wget_file(url_path=self.url) + self.unzip(fname=(self.output_dir / fname)) diff --git a/src/preprocess/__init__.py b/src/preprocess/__init__.py index 77d915dff..a1769df6f 100644 --- a/src/preprocess/__init__.py +++ b/src/preprocess/__init__.py @@ -7,6 +7,7 @@ from .esa_cci import ESACCIPreprocessor from .srtm import SRTMPreprocessor from .admin_boundaries import KenyaAdminPreprocessor, KenyaASALMask +from .fewsnet_shapefiles import FEWSNetLivelihoodPreprocessor __all__ = [ "VHIPreprocessor", @@ -19,4 +20,5 @@ "SRTMPreprocessor", "KenyaAdminPreprocessor", "KenyaASALMask", + "FEWSNetLivelihoodPreprocessor", ] diff --git a/src/preprocess/fewsnet_shapefiles.py b/src/preprocess/fewsnet_shapefiles.py new file mode 100644 index 000000000..d5712ec1f --- /dev/null +++ b/src/preprocess/fewsnet_shapefiles.py @@ -0,0 +1,198 @@ +from pathlib import Path +import xarray as xr +from .base import BasePreProcessor +from .utils import SHPtoXarray + +from typing import Optional, Dict + +gpd = None +GeoDataFrame = None + + +class FEWSNetPreprocesser(BasePreProcessor): + """ Preprocesses the FEWSNetwork shapefile data + """ + + country_code_mapping: Dict = { + "AF": "Afghanistan", + "AO": "Angola", + "BF": "Burkina Faso", + "BI": "Burundi", + "CF": "CAR", + "DJ": "Djibouti", + "TZ": "Tanzania", + "ZW": "Zimbabwe", + "ZM": "Zambia", + "YE": "Yemen", + "UG": "Uganda", + "TD": "Chad", + "SV": "El Salvador", + "SN": "Senegal", + "SO": "Somalia", + "SL": "Sierra Leone", + "SD": "Sudan", + "NI": "Nicaragua", + "NG": "Nigeria", + "ET": "Ethiopia", + "NE": "Niger", + "GN": "Guinea", + "MZ": "Mozambique", + "HN": "Honduras", + "MW": "Malawi", + "ML": "Mali", + "MR": "Mauritania", + "HT": "Haiti", + "MG": "Madagascar", + "LR": "Liberia", + "KE": "Kenya", + "LS": "Lesotho", + "CD": "DR Congo", + "SS": "South Sudan", + "RW": "Rwanda", + "NI": "Nicaragua", + "TJ": "Tajikistan", + "GT": "Guatemala", + } + + dataset: str + analysis = True + + def __init__(self, data_folder: Path = Path("data")): + super().__init__(data_folder) + + # try and import geopandas + print("The FEWSNet preprocessor requires the geopandas package") + global gpd + if gpd is None: + import geopandas as gpd + global GeoDataFrame + if GeoDataFrame is None: + from geopandas.geodataframe import GeoDataFrame + + def get_filename(self, var_name: str, country: str) -> str: # type: ignore + new_filename = f"{var_name}_{country}.nc" + return new_filename + + +class FEWSNetLivelihoodPreprocessor(FEWSNetPreprocesser): + dataset = "livelihood_zones" + + def __init__(self, data_folder: Path = Path("data")) -> None: + super().__init__(data_folder) + self.base_raw_dir = self.raw_folder / "boundaries" / self.dataset + + def _preprocess_single( + self, + shp_filepath: Path, + reference_nc_filepath: Path, + var_name: str, + lookup_colname: str, + save: bool = True, + country_to_preprocess: Optional[str] = None, + ) -> None: + """ Preprocess .shp admin boundary files into an `.nc` + file with the same shape as reference_nc_filepath. + + Will create categorical .nc file which will specify + which admin region each pixel is in. + + Arguments + ---------- + shp_filepath: Path + The path to the shapefile + + reference_nc_filepath: Path + The path to the netcdf file with the shape + (must have been run through Preprocessors prior to using) + + var_name: str + the name of the Variable in the xr.Dataset and the name + of the output filename - {var_name}_{self.country}.nc + + lookup_colname: str + the column name to lookup in the shapefile + (read in as geopandas.GeoDataFrame) + + country_to_preprocess: Optional[str] = None + the country you want to preprocess + + """ + assert "interim" in reference_nc_filepath.parts, ( + "Expected " "the target data to have been preprocessed by the pipeline" + ) + + # MUST have a target dataset to create the same shape + target_ds = xr.ones_like(xr.open_dataset(reference_nc_filepath)) + data_var = [d for d in target_ds.data_vars][0] + da = target_ds[data_var] + + # turn the shapefile into a categorical variable (like landcover) + gdf = gpd.read_file(shp_filepath) # type: ignore + shp_to_nc = SHPtoXarray() + + # if supply a country_to_preprocess then only create .nc file for that country + country_lookup = dict( + zip(self.country_code_mapping.values(), self.country_code_mapping.keys()) + ) + if country_to_preprocess is not None: + if country_to_preprocess.capitalize() not in country_lookup.keys(): + assert False, ( + f"Expecting to have one of: \n{country_lookup.keys()}" + f"\nYou supplied: {country_to_preprocess.capitalize()}" + "\nDoes this definitely exist?" + ) + country_code_list = [country_lookup[country_to_preprocess.capitalize()]] + else: + country_code_list = gdf.COUNTRY.unique() + + for country_code in country_code_list: + gdf_country = gdf.loc[gdf.COUNTRY == country_code] + + # create a unique filename for each country + country_str = ( + self.country_code_mapping[country_code].lower().replace(" ", "_") + ) + filename = self.get_filename(var_name, country_str) + if (self.out_dir / filename).exists(): + print( + "** Data already preprocessed! **\nIf you need to " + "process again then move or delete existing file" + f" at: {(self.out_dir / filename).as_posix()}" + ) + continue + + ds = shp_to_nc._to_xarray( + da=da, gdf=gdf_country, var_name=var_name, lookup_colname=lookup_colname + ) + + # save the data + print(f"Saving to {self.out_dir}") + + if self.analysis is True: + assert self.out_dir.parts[-2] == "analysis", ( + "self.analysis should" + "be True and the output directory should be analysis" + ) + + ds.to_netcdf(self.out_dir / filename) + # save key info columns + gdf_country[ + ["OBJECTID", "FNID", "LZNUM", "LZCODE", "LZNAMEEN", "CLASS"] + ].to_csv(self.out_dir / f"{country_str}_lookup_dict.csv") + + print( + f"** {(self.out_dir / filename).as_posix()} and lookup_dict saved! **" + ) + + def preprocess( + self, reference_nc_filepath: Path, country_to_preprocess: Optional[str] = None + ) -> None: + """Preprocess FEWSNet Livelihood Zone shapefiles into xarray objects + """ + self._preprocess_single( + shp_filepath=self.base_raw_dir / "FEWS_NET_LH_World.shp", + lookup_colname="LZCODE", + reference_nc_filepath=reference_nc_filepath, + var_name="livelihood_zone", + country_to_preprocess=country_to_preprocess, + ) diff --git a/src/preprocess/utils.py b/src/preprocess/utils.py index c1e4928f9..cbb9be461 100644 --- a/src/preprocess/utils.py +++ b/src/preprocess/utils.py @@ -10,6 +10,7 @@ Affine = None gpd = None Polygon = None +GeoDataFrame = None def select_bounding_box( @@ -98,6 +99,10 @@ def __init__(self): if Polygon is None: from shapely.geometry import Polygon + global GeoDataFrame + if GeoDataFrame is None: + from geopandas.geodataframe import GeoDataFrame + @staticmethod def transform_from_latlon( lat: xr.DataArray, lon: xr.DataArray @@ -139,70 +144,38 @@ def rasterize( return xr.Dataset({variable_name: (dims, raster)}, coords=spatial_coords) - def shapefile_to_xarray( + def _to_xarray( self, da: xr.DataArray, - shp_path: Path, + gdf: GeoDataFrame, # type: ignore var_name: str = "region", lookup_colname: Optional[str] = None, ) -> xr.Dataset: - """ Create a new coord for the da indicating whether or not it - is inside the shapefile - - Creates a new coord - "var_name" which will have integer values - used to subset da for plotting / analysis - - Arguments: - --------- - :da: xr.DataArray - the `DataArray` with the shape that we want to rasterize the - shapefile onto. - - :shp_path: Path - the path to the .shp file to be converted into a categorical - xr.Dataset. - - :var_name: str = 'region' - the variable name in the new output Dataset - - :lookup_colname: Optional[str] = None - the column that defines the `values` in the lookup - dictionary when defining the (e.g. Region names) - - e.g. 'DISTNAME' in this shapefile below - DISTID DISTNAME geometry - 0 101.0 NAIROBI POLYGON ((36. -1 ... - 1 201.0 KIAMBU POLYGON ((36. -0.7 ... - + """ Returns: ------- :xr.Dataset Dataset with metadata associated with the areas in the shapefile. Stored as `ds.attrs['keys']` & `ds.attrs['values']` - - TODO: add a add_all_cols_as_attrs() function """ - # 1. read in shapefile - gdf = gpd.read_file(shp_path) # type: ignore - # allow the user to see the column headers if lookup_colname is None: print("lookup_colname MUST be provided (see error message below)") - print(gdf.head()) + print(gdf.head()) # type: ignore assert ( - lookup_colname in gdf.columns - ), f"lookup_colname must be one of: {list(gdf.columns)}" + lookup_colname in gdf.columns # type: ignore + ), f"lookup_colname must be one of: {list(gdf.columns)}" # type: ignore # 2. create a list of tuples (shapely.geometry, id) # this allows for many different polygons within a .shp file # (e.g. Admin Regions of Kenya) - shapes = [(shape, n) for n, shape in enumerate(gdf.geometry)] + shapes = [(shape, n) for n, shape in enumerate(gdf.geometry)] # type: ignore # 3. create a new variable set to the id in `shapes` (same shape as da) ds = self.rasterize(shapes=shapes, coords=da.coords, variable_name=var_name) - values = [value for value in gdf[lookup_colname].to_list()] - keys = [str(key) for key in gdf.index.to_list()] + values = [value for value in gdf[lookup_colname].tolist()] # type: ignore + keys = [str(key) for key in gdf.index.tolist()] # type: ignore data_vals = ds[[d for d in ds.data_vars][0]].values unique_values = np.unique(data_vals[~np.isnan(data_vals)]) unique_values = [str(int(v)) for v in unique_values] @@ -228,3 +201,53 @@ def shapefile_to_xarray( print("Are you certain the subset or shapefile are the correct region?") return ds + + def shapefile_to_xarray( + self, + da: xr.DataArray, + shp_path: Path, + var_name: str = "region", + lookup_colname: Optional[str] = None, + ) -> xr.Dataset: + """ Create a new coord for the da indicating whether or not it + is inside the shapefile + + Creates a new coord - "var_name" which will have integer values + used to subset da for plotting / analysis + + Arguments: + --------- + :da: xr.DataArray + the `DataArray` with the shape that we want to rasterize the + shapefile onto. + + :shp_path: Path + the path to the .shp file to be converted into a categorical + xr.Dataset. + + :var_name: str = 'region' + the variable name in the new output Dataset + + :lookup_colname: Optional[str] = None + the column that defines the `values` in the lookup + dictionary when defining the (e.g. Region names) + + e.g. 'DISTNAME' in this shapefile below + DISTID DISTNAME geometry + 0 101.0 NAIROBI POLYGON ((36. -1 ... + 1 201.0 KIAMBU POLYGON ((36. -0.7 ... + + Returns: + ------- + :xr.Dataset + Dataset with metadata associated with the areas in the shapefile. + Stored as `ds.attrs['keys']` & `ds.attrs['values']` + + TODO: add a add_all_cols_as_attrs() function + """ + # 1. read in shapefile + gdf = gpd.read_file(shp_path) # type: ignore + + return self._to_xarray( + da=da, gdf=gdf, var_name=var_name, lookup_colname=lookup_colname + )