8000 opt in Incremental to external schedulers · Issue #515 · dlt-hub/dlt · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
opt in Incremental to external schedulers #515
Closed
@rudolfix

Description

@rudolfix

Background
dlt Incremental by default uses dlt state to track last values. this is often not necessary if external scheduler is present. the case we want to support is Airflow scheduler. Users should be able to opt in a dlt resource that uses Incremental to take start and end date from the Airflow task context and skip using dlt state

Tasks

    • add allow_external_schedulers to Incremental constructor (the default is False). if flag is set, external schedulers will be probed to take current load range
    • allow to disable and enable via resource.incremental.allow_external_schedulers (mind that resource.incremental may also be a wrapper so make sure that wrapper is passing it to inner incremental - you must write test for it, like we test similar behavior for primary key)
    • bind the range in bind method: set start and end date from context
    • implement Airflow context helper see below
    • write test in test_airflow_wrapper where we make sure that correct values are passed at run time. ideally we'd test a few dag runs for backfill and forward schedule. also test disable/enable at the resource level per task (2)
    • this needs to be documented: in incremental.md in docs, in airflow wrapper docstring and in deploying on Airflow Composer in walkthroughs (@rudolfix will help with this)

Airflow Helper
After making sure that task context is present, take:

execution_date = ti.execution_date
next_execution_date = ti.next_execution_date

and set them as start_value and end_value of the incremental. be smart with the type: get the type from type hint of the Incremental (which is a generic!) if not present use initial_value or start_value or last_value type to get the expected type.

  • in case of date time generate a UTC timezone pendulum datetime from what Airflow is passing
  • in case of float or int, generate unix timestamp
  • in case of str generate ISO string for UTC
  • in case of date, generate date

this needs to be unit tested. why we do that? because people use different datetime representation

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions

    0