Skip to content

NASA Earthdata Data Connector Documentation

Documentation for the terrakit.download.data_connectors.nasa_earthdata data connector module.

terrakit.download.data_connectors.nasa_earthdata

NASA_EarthData

Bases: Connector

Class to interact with NASA EarthData connector for listing collections and fetching data.

Source code in terrakit/download/data_connectors/nasa_earthdata.py
class NASA_EarthData(Connector):
    """
    Class to interact with NASA EarthData connector for listing collections and fetching data.
    """

    def __init__(self):
        """
        List available collections.
        """
        self.connector_type = "nasa_earthdata"
        self.collections: list[Any] = load_and_list_collections(
            connector_type="nasa_earthdata"
        )
        self.collections_details = load_and_list_collections(
            as_json=True, connector_type="nasa_earthdata"
        )
        self.lp_search = connect_to_stac(
            stac_url="https://cmr.earthdata.nasa.gov/stac/", subcatalog_name="LPCLOUD"
        )

    def list_collections(self) -> list:
        """
        Returns the current list of collections for th NASA EarthData connector.

        Returns:
            list: The list of collections managed by the class.
        """
        logger.info("Listing available collections")
        return self.collections

    def find_data(
        self,
        data_collection_name,
        date_start,
        date_end,
        area_polygon=None,
        bbox=None,
        bands=[],
        maxcc=100,
        data_connector_spec=None,
    ) -> Union[tuple[list[Any], list[dict[str, Any]]], tuple[None, None]]:
        """
        Finds data items in the specified collection, date range, and area.

        Args:
            data_collection_name (str): The name of the data collection to search.
            date_start (str): The start date for the search (YYYY-MM-DD).
            date_end (str): The end date for the search (YYYY-MM-DD).
            area_polygon (list, optional): Polygon defining the area of interest. Defaults to None.
            bbox (list, optional): Bounding box defining the area of interest [west, south, east, north]. Defaults to None.
            bands (list, optional): List of bands to retrieve. Defaults to [].
            maxcc (int, optional): Maximum cloud cover percentage. Defaults to 100.
            data_connector_spec (dict, optional): Additional data connector specifications. Defaults to None.

        Returns:
            tuple: A tuple containing unique dates and the list of data items.
        """

        # Check credentials have been set correctly.
        if "NASA_EARTH_BEARER_TOKEN" not in os.environ:
            raise TerrakitValidationError(
                message="Error: Missing credentials 'NASA_EARTH_BEARER_TOKEN'. Please update .env with correct credentials."
            )

        # Check data_collection_name exists in self.collections.
        check_collection_exists(data_collection_name, self.collections)

        logger.info("Listing NASA Earthdata data")

        items = find_items(
            self.lp_search,
            bbox,
            date_start,
            date_end,
            collections=[data_collection_name],
            limit=250,
        )

        if maxcc:
            items = [
                item
                for item in items
                if item["properties"].get("eo:cloud_cover") < maxcc
            ]
        items = [
            {
                "id": item["id"],
                "properties": {
                    "datetime": item["properties"]["datetime"],
                    "eo:cloud_cover": item["properties"]["eo:cloud_cover"],
                },
            }
            for item in items
        ]
        unique_dates = sorted(
            set(([X["properties"]["datetime"].split("T")[0] for X in items]))
        )

        logger.info(f"Found {len(unique_dates)} unique dates:  {unique_dates}")

        return unique_dates, items

    def get_data(
        self,
        data_collection_name,
        date_start,
        date_end,
        area_polygon=None,
        bbox=None,
        bands=[],
        maxcc=100,
        data_connector_spec=None,
        save_file=None,
        working_dir=".",
    ) -> Union[xr.DataArray, None]:
        """
        Fetches data from NASA EarthData connector for the specified collection, date range, area, and bands.

        Args:
            data_collection_name (str): The name of the data collection to fetch.
            date_start (str): The start date for the search (YYYY-MM-DD).
            date_end (str): The end date for the search (YYYY-MM-DD).
            area_polygon (list, optional): Polygon defining the area of interest. Defaults to None.
            bbox (list, optional): Bounding box defining the area of interest [west, south, east, north]. Defaults to None.
            bands (list): List of bands to fetch. Defaults to [].
            maxcc (int, optional): Maximum cloud cover percentage. Defaults to 100.
            data_connector_spec (dict, optional): Additional data connector specifications. Defaults to None.
            save_file (str, optional): Path to save the fetched data. Defaults to None.
            working_dir (str, optional): Working directory for temporary files. Defaults to ".".

        Returns:
            xarray: An xarray Datasets containing the fetched data with dimensions (time, band, y, x).
        """
        # Check credentials have been set correctly.
        if "NASA_EARTH_BEARER_TOKEN" not in os.environ:
            raise TerrakitValidationError(
                message="Error: Missing credentials 'NASA_EARTH_BEARER_TOKEN'. Please update .env with correct credentials."
            )

        # Check data_collection_name exists in self.collections.
        if data_collection_name not in self.collections:
            raise TerrakitValueError(
                message=f"Invalid collection '{data_collection_name}'. Please choose from one of the following collection {self.collections}"
            )

        check_area_polygon(
            area_polygon=area_polygon, connector_type=self.connector_type
        )
        temp_creds_req = get_temp_creds()

        session = boto3.Session(
            aws_access_key_id=temp_creds_req["accessKeyId"],
            aws_secret_access_key=temp_creds_req["secretAccessKey"],
            aws_session_token=temp_creds_req["sessionToken"],
            region_name="us-west-2",
        )

        if NASA_EARTH_BEARER_TOKEN:
            rio_env = rio.Env(
                AWSSession(session),
                GDAL_HTTP_AUTH="BEARER",  # pragma: allowlist secret
                GDAL_HTTP_BEARER=NASA_EARTH_BEARER_TOKEN,
                GDAL_DISABLE_READDIR_ON_OPEN="TRUE",
                GDAL_HTTP_COOKIEFILE=os.path.expanduser("~/cookies.txt"),
                GDAL_HTTP_COOKIEJAR=os.path.expanduser("~/cookies.txt"),
            )
        else:
            rio_env = rio.Env(
                AWSSession(session),
                GDAL_DISABLE_READDIR_ON_OPEN="TRUE",
                GDAL_HTTP_COOKIEFILE=os.path.expanduser("~/cookies.txt"),
                GDAL_HTTP_COOKIEJAR=os.path.expanduser("~/cookies.txt"),
            )

        with rio_env:
            unique_dates: Union[list[str], None]
            results: Union[list[dict[str, Any]], None]

            results = find_items(
                self.lp_search,
                bbox,
                date_start,
                date_end,
                collections=[data_collection_name],
                limit=250,
            )
            if maxcc:
                results = [
                    item
                    for item in results
                    if item["properties"].get("eo:cloud_cover") < maxcc
                ]
            unique_dates = sorted(
                set(([X["properties"]["datetime"].split("T")[0] for X in results]))
            )
            # Check that unique dates and find_data results are not None.
            if unique_dates is None and results is None:
                logger.warning("Warning: Unique dates and find_data results are None")
                return None

            ds: xr.DataArray
            ds_list: list[Any] = []
            for udate in unique_dates:  # type: ignore[union-attr]
                date_items: list[Any] = []
                for X in results:  # type: ignore[union-attr]
                    if X["properties"]["datetime"].split("T")[0] == udate:
                        date_items.append(X)
                num_threads = len(bands)
                ans = Parallel(n_jobs=num_threads, prefer="threads")(
                    delayed(get_band)(date_items, b, bbox, temp_creds_req, working_dir)
                    for b in tqdm(bands)
                )
                da = xr.concat(ans, dim="band")

                data_date_datetime = datetime.strptime(udate, "%Y-%m-%d")
                da = da.assign_coords({"band": bands, "time": data_date_datetime})

                ds_list.append(da)
            ds = xr.concat(ds_list, dim="time")

            save_data_array_to_file(ds, save_file)
            deleteList = glob.glob(f"{working_dir}/links_*.vrt", recursive=True)
            for file_to_delete in deleteList:
                try:
                    os.remove(file_to_delete)
                except OSError as err:
                    logger.error("Error while deleting file", err)

            return ds

list_collections

Returns the current list of collections for th NASA EarthData connector.

Returns:

Name Type Description
list list

The list of collections managed by the class.

Source code in terrakit/download/data_connectors/nasa_earthdata.py
def list_collections(self) -> list:
    """
    Returns the current list of collections for th NASA EarthData connector.

    Returns:
        list: The list of collections managed by the class.
    """
    logger.info("Listing available collections")
    return self.collections

find_data

Finds data items in the specified collection, date range, and area.

Parameters:

Name Type Description Default
data_collection_name str

The name of the data collection to search.

required
date_start str

The start date for the search (YYYY-MM-DD).

required
date_end str

The end date for the search (YYYY-MM-DD).

required
area_polygon list

Polygon defining the area of interest. Defaults to None.

None
bbox list

Bounding box defining the area of interest [west, south, east, north]. Defaults to None.

None
bands list

List of bands to retrieve. Defaults to [].

[]
maxcc int

Maximum cloud cover percentage. Defaults to 100.

100
data_connector_spec dict

Additional data connector specifications. Defaults to None.

None

Returns:

Name Type Description
tuple Union[tuple[list[Any], list[dict[str, Any]]], tuple[None, None]]

A tuple containing unique dates and the list of data items.

Source code in terrakit/download/data_connectors/nasa_earthdata.py
def find_data(
    self,
    data_collection_name,
    date_start,
    date_end,
    area_polygon=None,
    bbox=None,
    bands=[],
    maxcc=100,
    data_connector_spec=None,
) -> Union[tuple[list[Any], list[dict[str, Any]]], tuple[None, None]]:
    """
    Finds data items in the specified collection, date range, and area.

    Args:
        data_collection_name (str): The name of the data collection to search.
        date_start (str): The start date for the search (YYYY-MM-DD).
        date_end (str): The end date for the search (YYYY-MM-DD).
        area_polygon (list, optional): Polygon defining the area of interest. Defaults to None.
        bbox (list, optional): Bounding box defining the area of interest [west, south, east, north]. Defaults to None.
        bands (list, optional): List of bands to retrieve. Defaults to [].
        maxcc (int, optional): Maximum cloud cover percentage. Defaults to 100.
        data_connector_spec (dict, optional): Additional data connector specifications. Defaults to None.

    Returns:
        tuple: A tuple containing unique dates and the list of data items.
    """

    # Check credentials have been set correctly.
    if "NASA_EARTH_BEARER_TOKEN" not in os.environ:
        raise TerrakitValidationError(
            message="Error: Missing credentials 'NASA_EARTH_BEARER_TOKEN'. Please update .env with correct credentials."
        )

    # Check data_collection_name exists in self.collections.
    check_collection_exists(data_collection_name, self.collections)

    logger.info("Listing NASA Earthdata data")

    items = find_items(
        self.lp_search,
        bbox,
        date_start,
        date_end,
        collections=[data_collection_name],
        limit=250,
    )

    if maxcc:
        items = [
            item
            for item in items
            if item["properties"].get("eo:cloud_cover") < maxcc
        ]
    items = [
        {
            "id": item["id"],
            "properties": {
                "datetime": item["properties"]["datetime"],
                "eo:cloud_cover": item["properties"]["eo:cloud_cover"],
            },
        }
        for item in items
    ]
    unique_dates = sorted(
        set(([X["properties"]["datetime"].split("T")[0] for X in items]))
    )

    logger.info(f"Found {len(unique_dates)} unique dates:  {unique_dates}")

    return unique_dates, items

get_data

Fetches data from NASA EarthData connector for the specified collection, date range, area, and bands.

Parameters:

Name Type Description Default
data_collection_name str

The name of the data collection to fetch.

required
date_start str

The start date for the search (YYYY-MM-DD).

required
date_end str

The end date for the search (YYYY-MM-DD).

required
area_polygon list

Polygon defining the area of interest. Defaults to None.

None
bbox list

Bounding box defining the area of interest [west, south, east, north]. Defaults to None.

None
bands list

List of bands to fetch. Defaults to [].

[]
maxcc int

Maximum cloud cover percentage. Defaults to 100.

100
data_connector_spec dict

Additional data connector specifications. Defaults to None.

None
save_file str

Path to save the fetched data. Defaults to None.

None
working_dir str

Working directory for temporary files. Defaults to ".".

'.'

Returns:

Name Type Description
xarray Union[DataArray, None]

An xarray Datasets containing the fetched data with dimensions (time, band, y, x).

Source code in terrakit/download/data_connectors/nasa_earthdata.py
def get_data(
    self,
    data_collection_name,
    date_start,
    date_end,
    area_polygon=None,
    bbox=None,
    bands=[],
    maxcc=100,
    data_connector_spec=None,
    save_file=None,
    working_dir=".",
) -> Union[xr.DataArray, None]:
    """
    Fetches data from NASA EarthData connector for the specified collection, date range, area, and bands.

    Args:
        data_collection_name (str): The name of the data collection to fetch.
        date_start (str): The start date for the search (YYYY-MM-DD).
        date_end (str): The end date for the search (YYYY-MM-DD).
        area_polygon (list, optional): Polygon defining the area of interest. Defaults to None.
        bbox (list, optional): Bounding box defining the area of interest [west, south, east, north]. Defaults to None.
        bands (list): List of bands to fetch. Defaults to [].
        maxcc (int, optional): Maximum cloud cover percentage. Defaults to 100.
        data_connector_spec (dict, optional): Additional data connector specifications. Defaults to None.
        save_file (str, optional): Path to save the fetched data. Defaults to None.
        working_dir (str, optional): Working directory for temporary files. Defaults to ".".

    Returns:
        xarray: An xarray Datasets containing the fetched data with dimensions (time, band, y, x).
    """
    # Check credentials have been set correctly.
    if "NASA_EARTH_BEARER_TOKEN" not in os.environ:
        raise TerrakitValidationError(
            message="Error: Missing credentials 'NASA_EARTH_BEARER_TOKEN'. Please update .env with correct credentials."
        )

    # Check data_collection_name exists in self.collections.
    if data_collection_name not in self.collections:
        raise TerrakitValueError(
            message=f"Invalid collection '{data_collection_name}'. Please choose from one of the following collection {self.collections}"
        )

    check_area_polygon(
        area_polygon=area_polygon, connector_type=self.connector_type
    )
    temp_creds_req = get_temp_creds()

    session = boto3.Session(
        aws_access_key_id=temp_creds_req["accessKeyId"],
        aws_secret_access_key=temp_creds_req["secretAccessKey"],
        aws_session_token=temp_creds_req["sessionToken"],
        region_name="us-west-2",
    )

    if NASA_EARTH_BEARER_TOKEN:
        rio_env = rio.Env(
            AWSSession(session),
            GDAL_HTTP_AUTH="BEARER",  # pragma: allowlist secret
            GDAL_HTTP_BEARER=NASA_EARTH_BEARER_TOKEN,
            GDAL_DISABLE_READDIR_ON_OPEN="TRUE",
            GDAL_HTTP_COOKIEFILE=os.path.expanduser("~/cookies.txt"),
            GDAL_HTTP_COOKIEJAR=os.path.expanduser("~/cookies.txt"),
        )
    else:
        rio_env = rio.Env(
            AWSSession(session),
            GDAL_DISABLE_READDIR_ON_OPEN="TRUE",
            GDAL_HTTP_COOKIEFILE=os.path.expanduser("~/cookies.txt"),
            GDAL_HTTP_COOKIEJAR=os.path.expanduser("~/cookies.txt"),
        )

    with rio_env:
        unique_dates: Union[list[str], None]
        results: Union[list[dict[str, Any]], None]

        results = find_items(
            self.lp_search,
            bbox,
            date_start,
            date_end,
            collections=[data_collection_name],
            limit=250,
        )
        if maxcc:
            results = [
                item
                for item in results
                if item["properties"].get("eo:cloud_cover") < maxcc
            ]
        unique_dates = sorted(
            set(([X["properties"]["datetime"].split("T")[0] for X in results]))
        )
        # Check that unique dates and find_data results are not None.
        if unique_dates is None and results is None:
            logger.warning("Warning: Unique dates and find_data results are None")
            return None

        ds: xr.DataArray
        ds_list: list[Any] = []
        for udate in unique_dates:  # type: ignore[union-attr]
            date_items: list[Any] = []
            for X in results:  # type: ignore[union-attr]
                if X["properties"]["datetime"].split("T")[0] == udate:
                    date_items.append(X)
            num_threads = len(bands)
            ans = Parallel(n_jobs=num_threads, prefer="threads")(
                delayed(get_band)(date_items, b, bbox, temp_creds_req, working_dir)
                for b in tqdm(bands)
            )
            da = xr.concat(ans, dim="band")

            data_date_datetime = datetime.strptime(udate, "%Y-%m-%d")
            da = da.assign_coords({"band": bands, "time": data_date_datetime})

            ds_list.append(da)
        ds = xr.concat(ds_list, dim="time")

        save_data_array_to_file(ds, save_file)
        deleteList = glob.glob(f"{working_dir}/links_*.vrt", recursive=True)
        for file_to_delete in deleteList:
            try:
                os.remove(file_to_delete)
            except OSError as err:
                logger.error("Error while deleting file", err)

        return ds