Description
dlt version
dlt 1.10.0
Describe the problem
TL;DR: Pagination does not work as expected when the paginator is defined only at the client level, because it is not inherited by child resources.
Expected Behavior
According to the documentation, the RESTAPI source should inherit the paginator from the client configuration and make subsequent paginated requests accordingly.
Actual Behavior
If the paginator is defined only at the client level:
- Only a single request is made (e.g., returning 100 items if page[limit]=100 is specified).
- No additional paginated requests are performed.
- As a result, only a partial dataset is returned.
Workaround
To ensure proper pagination, the paginator must be manually set on each resource endpoint or as a resource default, even if the resource endpoints share the same client.
Steps to reproduce
Step 1: Get an API Key
Request a free API key from MBTA's API portal.
Step 2: Reproduce the Bug
Run the following code with the paginator configured at the client level.
Only a single request will be made, returning 100 items, which matches the specified page[limit]
parameter in the query. No additional paginated requests are made. (This is unexpected behavior)
from typing import Generator, List, Dict, Any
import dlt
from dlt.sources.rest_api import RESTAPIConfig
from dlt.sources.rest_api import rest_api_resources
from dlt.sources.helpers.rest_client.auth import APIKeyAuth
@dlt.resource()
def routes() -> Generator[List[Dict[str, Any]], Any, Any]:
yield [{"route_ids": "Red,Blue"}]
@dlt.source
def api_source():
config: RESTAPIConfig = {
"client": {
"base_url": "https://api-v3.mbta.com/",
"auth": APIKeyAuth(
name="X-API-Key",
api_key="YOUR_API_KEY_FROM_STEP_1",
location="header",
),
# Paginator Configured at Client Level
"paginator": {
"type": "json_link",
"next_url_path": "links.next",
},
},
"resources": [
{
"name": "trips",
"primary_key": "id",
"write_disposition": "merge",
"endpoint": {
"method": "GET",
"data_selector": "data",
"path": "trips?filter[route]={route_id}",
"params": {
"page[limit]": 100,
"route_id": {
"type": "resolve",
"resource": "routes",
"field": "route_ids",
},
},
},
},
routes(),
],
}
resources = rest_api_resources(config)
yield from resources
def main() -> None:
pipeline = dlt.pipeline(
pipeline_name="rest_api",
destination="duckdb",
dataset_name="rest_api_data",
)
load_info = pipeline.run(api_source())
df = pipeline.dataset(dataset_type="default").trips.df()
print(load_info)
print(df.head())
print(f"Number of rows loaded: {df.shape[0]}")
if __name__ == "__main__":
main()
Output:
/Users/kanenorman/Desktop/dlt_issue/.venv/bin/python /Users/kanenorman/Desktop/dlt_issue/main.py
Pipeline rest_api load step completed in 0.09 seconds
1 load package(s) were loaded to destination duckdb and into dataset rest_api_data
The duckdb destination used duckdb:////Users/kanenorman/Desktop/dlt_issue/rest_api.duckdb location to store data
Load package 1746138130.322604 is LOADED and contains no failed jobs
attributes__bikes_allowed ... _dlt_id
0 0 ... YtRifBngmlSRbw
1 0 ... kXTy4Xd4vfU4Mw
2 0 ... 44u5mwFK2gF8kQ
3 0 ... dGFXOYmZHFfPCg
4 0 ... xcBn6PrbHclwtw
[5 rows x 20 columns]
Number of rows loaded: 100
Process finished with exit code 0
Step 3: Drop DuckDB database
Remove the DuckDB database so you get a fresh start between runs
rm rest_api.duckdb
Step 4: Workaround
Now configure the paginator at the endpoint level instead.
You'll observe that pagination works correctly and 5,514 items are returned, which is the expected and correct total.
from typing import Generator, List, Dict, Any
import dlt
from dlt.sources.rest_api import RESTAPIConfig
from dlt.sources.rest_api import rest_api_resources
from dlt.sources.helpers.rest_client.auth import APIKeyAuth
@dlt.resource()
def routes() -> Generator[List[Dict[str, Any]], Any, Any]:
yield [{"route_ids": "Red,Blue"}]
@dlt.source
def api_source():
config: RESTAPIConfig = {
"client": {
"base_url": "https://api-v3.mbta.com/",
"auth": APIKeyAuth(
name="X-API-Key",
api_key="YOUR_API_KEY_FROM_STEP_1",
location="header",
),
},
"resources": [
{
"name": "trips",
"primary_key": "id",
"write_disposition": "merge",
"endpoint": {
"method": "GET",
"data_selector": "data",
"path": "trips?filter[route]={route_id}",
"params": {
"page[limit]": 100,
"route_id": {
"type": "resolve",
"resource": "routes",
"field": "route_ids",
},
},
# Paginator Configured at Endpoint Level
"paginator": {
"type": "json_link",
"next_url_path": "links.next",
},
},
},
routes(),
],
}
resources = rest_api_resources(config)
yield from resources
def main() -> None:
pipeline = dlt.pipeline(
pipeline_name="rest_api",
destination="duckdb",
dataset_name="rest_api_data",
)
load_info = pipeline.run(api_source())
df = pipeline.dataset(dataset_type="default").trips.df()
print(load_info)
print(df.head())
print(f"Number of rows loaded: {df.shape[0]}")
if __name__ == "__main__":
main()
Output:
Pipeline rest_api load step completed in 0.53 seconds
1 load package(s) were loaded to destination duckdb and into dataset rest_api_data
The duckdb destination used duckdb:////Users/kanenorman/Desktop/dlt_issue/rest_api.duckdb location to store data
Load package 1746137805.367842 is LOADED and contains no failed jobs
attributes__bikes_allowed ... _dlt_id
0 0 ... 0ovhG0CiQ8Ahww
1 0 ... ZPhYMwSV8di44w
2 0 ... +0a9RqkR/Vw+jA
3 0 ... HdqBkf55XoiP6w
4 0 ... M/QVQZChN44SDg
[5 rows x 20 columns]
Number of rows loaded: 5514
Operating system
macOS
Runtime environment
Local
Python version
3.12
dlt data source
dlt destination
DuckDB
Other deployment details
No response
Additional information
I'm still debugging, but I believe the issue originates in the expand_and_index_resources
function, specifically during the call to _setup_single_entity_endpoint()
.
If pagination is not explicitly configured at the resource endpoint level (or resource default level), endpoint.get("paginator")
returns None
, so a SinglePagePaginator()
is assigned, even when the client is correctly configured with a JsonLinkPaginator()
.
def _setup_single_entity_endpoint(endpoint: Endpoint) -> Endpoint:
"""Tries to guess if the endpoint refers to a single entity and when detected:
* if `data_selector` was not specified (or is None), "$" is selected
* if `paginator` was not specified (or is None), SinglePagePaginator is selected
Endpoint is modified in place and returned
"""
if single_entity_path(endpoint["path"]):
if endpoint.get("data_selector") is None:
endpoint["data_selector"] = "$"
if endpoint.get("paginator") is None:
endpoint["paginator"] = SinglePagePaginator()
return endpoint
Proposed Fix
Include the client in the build_resource_dependency_graph()
so that the paginator can be inherited from the client when not specified at the resource endpoint level (or resource default endpoint). This would prevent overwriting a valid client-level paginator with a SinglePagePaginator()
during setup.
Metadata
Metadata
Assignees
Type
Projects
Status