Description
Background
dlt
Incremental by default uses dlt state to track last values. this is often not necessary if external scheduler is present. the case we want to support is Airflow scheduler. Users should be able to opt in a dlt resource that uses Incremental to take start and end date from the Airflow task context and skip using dlt state
Tasks
-
- add
allow_external_schedulers
to Incremental constructor (the default is False). if flag is set, external schedulers will be probed to take current load range
- add
-
- allow to disable and enable via resource.incremental.allow_external_schedulers (mind that resource.incremental may also be a wrapper so make sure that wrapper is passing it to inner incremental - you must write test for it, like we test similar behavior for primary key)
-
- bind the range in
bind
method: set start and end date from context
- bind the range in
-
- implement Airflow context helper see below
-
- write test in
test_airflow_wrapper
where we make sure that correct values are passed at run time. ideally we'd test a few dag runs for backfill and forward schedule. also test disable/enable at the resource level per task (2)
- write test in
-
- this needs to be documented: in incremental.md in docs, in airflow wrapper docstring and in deploying on Airflow Composer in walkthroughs (@rudolfix will help with this)
Airflow Helper
After making sure that task context is present, take:
execution_date = ti.execution_date
next_execution_date = ti.next_execution_date
and set them as start_value and end_value of the incremental. be smart with the type: get the type from type hint of the Incremental (which is a generic!) if not present use initial_value
or start_value
or last_value
type to get the expected type.
- in case of date time generate a UTC timezone pendulum datetime from what Airflow is passing
- in case of float or int, generate unix timestamp
- in case of str generate ISO string for UTC
- in case of date, generate date
this needs to be unit tested. why we do that? because people use different datetime representation
Metadata
Metadata
Assignees
Labels
Type
Projects
Status