8000 RESTAPI: paginator defined at client level not inherited by resources · Issue #2586 · dlt-hub/dlt · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
RESTAPI: paginator defined at client level not inherited by resources #2586
Open
@kanenorman

Description

@kanenorman

dlt version

dlt 1.10.0

Describe the problem

TL;DR: Pagination does not work as expected when the paginator is defined only at the client level, because it is not inherited by child resources.

Expected Behavior

According to the documentation, the RESTAPI source should inherit the paginator from the client configuration and make subsequent paginated requests accordingly.

Actual Behavior

If the paginator is defined only at the client level:

  • Only a single request is made (e.g., returning 100 items if page[limit]=100 is specified).
  • No additional paginated requests are performed.
  • As a result, only a partial dataset is returned.

Workaround

To ensure proper pagination, the paginator must be manually set on each resource endpoint or as a resource default, even if the resource endpoints share the same client.

Steps to reproduce

Step 1: Get an API Key

Request a free API key from MBTA's API portal.

Step 2: Reproduce the Bug

Run the following code with the paginator configured at the client level.
Only a single request will be made, returning 100 items, which matches the specified page[limit] parameter in the query. No additional paginated requests are made. (This is unexpected behavior)

from typing import Generator, List, Dict, Any

import dlt
from dlt.sources.rest_api import RESTAPIConfig
from dlt.sources.rest_api import rest_api_resources
from dlt.sources.helpers.rest_client.auth import APIKeyAuth


@dlt.resource()
def routes() -> Generator[List[Dict[str, Any]], Any, Any]:
    yield [{"route_ids": "Red,Blue"}]


@dlt.source
def api_source():
    config: RESTAPIConfig = {
        "client": {
            "base_url": "https://api-v3.mbta.com/",
            "auth": APIKeyAuth(
                name="X-API-Key",
                api_key="YOUR_API_KEY_FROM_STEP_1",
                location="header",
            ),
           # Paginator Configured at Client Level
            "paginator": {
                "type": "json_link",
                "next_url_path": "links.next",
            },
        },
        "resources": [
            {
                "name": "trips",
                "primary_key": "id",
                "write_disposition": "merge",
                "endpoint": {
                    "method": "GET",
                    "data_selector": "data",
                    "path": "trips?filter[route]={route_id}",
                    "params": {
                        "page[limit]": 100,
                        "route_id": {
                            "type": "resolve",
                            "resource": "routes",
                            "field": "route_ids",
                        },
                    },
                },
            },
            routes(),
        ],
    }
    resources = rest_api_resources(config)
    yield from resources


def main() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="rest_api",
        destination="duckdb",
        dataset_name="rest_api_data",
    )

    load_info = pipeline.run(api_source())
    df = pipeline.dataset(dataset_type="default").trips.df()

    print(load_info)
    print(df.head())
    print(f"Number of rows loaded: {df.shape[0]}")


if __name__ == "__main__":
    main()

Output:

/Users/kanenorman/Desktop/dlt_issue/.venv/bin/python /Users/kanenorman/Desktop/dlt_issue/main.py 
Pipeline rest_api load step completed in 0.09 seconds
1 load package(s) were loaded to destination duckdb and into dataset rest_api_data
The duckdb destination used duckdb:////Users/kanenorman/Desktop/dlt_issue/rest_api.duckdb location to store data
Load package 1746138130.322604 is LOADED and contains no failed jobs
   attributes__bikes_allowed  ...         _dlt_id
0                          0  ...  YtRifBngmlSRbw
1                          0  ...  kXTy4Xd4vfU4Mw
2                          0  ...  44u5mwFK2gF8kQ
3                          0  ...  dGFXOYmZHFfPCg
4                          0  ...  xcBn6PrbHclwtw

[5 rows x 20 columns]
Number of rows loaded: 100

Process finished with exit code 0

Step 3: Drop DuckDB database

Remove the DuckDB database so you get a fresh start between runs

rm rest_api.duckdb

Step 4: Workaround

Now configure the paginator at the endpoint level instead.
You'll observe that pagination works correctly and 5,514 items are returned, which is the expected and correct total.

from typing import Generator, List, Dict, Any

import dlt
from dlt.sources.rest_api import RESTAPIConfig
from dlt.sources.rest_api import rest_api_resources
from dlt.sources.helpers.rest_client.auth import APIKeyAuth


@dlt.resource()
def routes() -> Generator[List[Dict[str, Any]], Any, Any]:
    yield [{"route_ids": "Red,Blue"}]


@dlt.source
def api_source():
    config: RESTAPIConfig = {
        "client": {
            "base_url": "https://api-v3.mbta.com/",
            "auth": APIKeyAuth(
                name="X-API-Key",
                api_key="YOUR_API_KEY_FROM_STEP_1",
                location="header",
            ),
        },
        "resources": [
            {
                "name": "trips",
                "primary_key": "id",
                "write_disposition": "merge",
                "endpoint": {
                    "method": "GET",
                    "data_selector": "data",
                    "path": "trips?filter[route]={route_id}",
                    "params": {
                        "page[limit]": 100,
                        "route_id": {
                            "type": "resolve",
                            "resource": "routes",
                            "field": "route_ids",
                        },
                    },
                    # Paginator Configured at Endpoint Level
                    "paginator": {
                        "type": "json_link",
                        "next_url_path": "links.next",
                    },
                },
            },
            routes(),
        ],
    }
    resources = rest_api_resources(config)
    yield from resources


def main() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="rest_api",
        destination="duckdb",
        dataset_name="rest_api_data",
    )

    load_info = pipeline.run(api_source())
    df = pipeline.dataset(dataset_type="default").trips.df()

    print(load_info)
    print(df.head())
    print(f"Number of rows loaded: {df.shape[0]}")


if __name__ == "__main__":
    main()

Output:

Pipeline rest_api load step completed in 0.53 seconds
1 load package(s) were loaded to destination duckdb and into dataset rest_api_data
The duckdb destination used duckdb:////Users/kanenorman/Desktop/dlt_issue/rest_api.duckdb location to store data
Load package 1746137805.367842 is LOADED and contains no failed jobs
   attributes__bikes_allowed  ...         _dlt_id
0                          0  ...  0ovhG0CiQ8Ahww
1                          0  ...  ZPhYMwSV8di44w
2                          0  ...  +0a9RqkR/Vw+jA
3                          0  ...  HdqBkf55XoiP6w
4                          0  ...  M/QVQZChN44SDg

[5 rows x 20 columns]
Number of rows loaded: 5514

Operating system

macOS

Runtime environment

Local

Python version

3.12

dlt data source

https://api-v3.mbta.com/

dlt destination

DuckDB

Other deployment details

No response

Additional information

I'm still debugging, but I believe the issue originates in the expand_and_index_resources function, specifically during the call to _setup_single_entity_endpoint().

If pagination is not explicitly configured at the resource endpoint level (or resource default level), endpoint.get("paginator") returns None, so a SinglePagePaginator() is assigned, even when the client is correctly configured with a JsonLinkPaginator().

def _setup_single_entity_endpoint(endpoint: Endpoint) -> Endpoint:
    """Tries to guess if the endpoint refers to a single entity and when detected:
    * if `data_selector` was not specified (or is None), "$" is selected
    * if `paginator` was not specified (or is None), SinglePagePaginator is selected

    Endpoint is modified in place and returned
    """
    if single_entity_path(endpoint["path"]):
        if endpoint.get("data_selector") is None:
            endpoint["data_selector"] = "$"
        if endpoint.get("paginator") is None:
            endpoint["paginator"] = SinglePagePaginator()
    return endpoint

Proposed Fix

Include the client in the build_resource_dependency_graph() so that the paginator can be inherited from the client when not specified at the resource endpoint level (or resource default endpoint). This would prevent overwriting a valid client-level paginator with a SinglePagePaginator() during setup.

Metadata

Metadata

Assignees

Labels

questionFurther information is requestedwontfixThis will not be worked on

Type

Projects

Status

In Progress

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions

    0