8000 Graphs with GraphIn inputs fail when no input is provided, even if the downstream op has a default · Issue #30581 · dagster-io/dagster · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
Graphs with GraphIn inputs fail when no input is provided, even if the downstream op has a default #30581
Open
@seanmabli

Description

@seanmabli

What's the issue?

When using a GraphIn to define a graph input that feeds into an op with a default input value, running the graph via .to_job() fails if no input is provided. The job raises an error indicating that the input is required, even though the downstream op is designed to handle missing input by falling back to its default.

This behavior is inconsistent with what happens when the same graph is wrapped inside another graph, where no input is passed and the default is respected as expected.

What did you expect to happen?

I expected the graph_input to be treated as optional at the graph level, since the op it feeds into accepts a default value and can execute without external input. The graph-level job should have run just like the wrapped version does.

How to reproduce?

The code below is a simple example of the problem:

import pandas as pd
from dagster import (
    op, 
    graph,
    GraphIn, 
    GraphOut, 
    In, 
    Out, 
    Nothing, 
    Definitions
)
from typing import Union

@op(
    ins={
        "input_data": In(
            dagster_type=Union[pd.DataFrame, Nothing], 
            default_value=None
        )
    },
    out=Out(pd.DataFrame)
)
def 
8000
process_data(input_data):
    if input_data is None:
        return pd.DataFrame({
            'id': [1, 2, 3],
            'name': ['Alice', 'Bob', 'Charlie'],
            'value': [10, 20, 30]
        })
    else:
        processed = input_data.copy()
        if 'value' in processed.columns:
            processed['value'] = processed['value'] * 2
        return processed

@graph(
    ins={
        "graph_input": GraphIn(description="Input data for processing")
    },
    out={
        "graph_output": GraphOut(description="Processed output data")
    }
)
def data_processing_graph(graph_input):
    result = process_data(input_data=graph_input)
    return {"graph_output": result}

first_job = data_processing_graph.to_job(name="first_job")

@graph
def wrapper_graph():
    result = data_processing_graph()
    return result

second_job = wrapper_graph.to_job(name="second_job")

defs = Definitions(
    jobs=[first_job, second_job]
)

if __name__ == "__main__":
    print("RUNNING FIRST JOB")
    try:
        result1 = first_job.execute_in_process()
        print(result1)
    except Exception as e:
        print(e)
    print("RUNNING SECOND JOB")
    try:
        result2 = second_job.execute_in_process()
        print(result2)
    except Exception as e:
        print(e)

Output:

RUNNING FIRST JOB
In top-level graph of job 'first_job', input graph_input must get a value from the inputs section of its configuration.
RUNNING SECOND JOB
2025-06-08 19:46:04 -0400 - dagster - DEBUG - second_job - b9315291-7940-4288-bf16-a1620d0eccb2 - 7769 - RUN_START - Started execution of run for "second_job".
2025-06-08 19:46:04 -0400 - dagster - DEBUG - second_job - b9315291-7940-4288-bf16-a1620d0eccb2 - 7769 - ENGINE_EVENT - Executing steps in process (pid: 7769)
2025-06-08 19:46:04 -0400 - dagster - DEBUG - second_job - b9315291-7940-4288-bf16-a1620d0eccb2 - 7769 - data_processing_graph.process_data - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2025-06-08 19:46:04 -0400 - dagster - DEBUG - second_job - b9315291-7940-4288-bf16-a1620d0eccb2 - 7769 - data_processing_graph.process_data - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2025-06-08 19:46:04 -0400 - dagster - DEBUG - second_job - b9315291-7940-4288-bf16-a1620d0eccb2 - 7769 - LOGS_CAPTURED - Started capturing logs in process (pid: 7769).
2025-06-08 19:46:04 -0400 - dagster - DEBUG - second_job - b9315291-7940-4288-bf16-a1620d0eccb2 - 7769 - data_processing_graph.process_data - STEP_START - Started execution of step "data_processing_graph.process_data".
2025-06-08 19:46:04 -0400 - dagster - DEBUG - second_job - b9315291-7940-4288-bf16-a1620d0eccb2 - 7769 - data_processing_graph.process_data - STEP_INPUT - Got input "input_data" of type "DataFrame?". (Type check passed).
2025-06-08 19:46:04 -0400 - dagster - DEBUG - second_job - b9315291-7940-4288-bf16-a1620d0eccb2 - 7769 - data_processing_graph.process_data - STEP_OUTPUT - Yielded output "result" of type "DataFrame". (Type check passed).
2025-06-08 19:46:04 -0400 - dagster - DEBUG - second_job - b9315291-7940-4288-bf16-a1620d0eccb2 - 7769 - data_processing_graph.process_data - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2025-06-08 19:46:04 -0400 - dagster - DEBUG - second_job - b9315291-7940-4288-bf16-a1620d0eccb2 - 7769 - data_processing_graph.process_data - STEP_SUCCESS - Finished execution of step "data_processing_graph.process_data" in 8.53ms.
2025-06-08 19:46:04 -0400 - dagster - DEBUG - second_job - b9315291-7940-4288-bf16-a1620d0eccb2 - 7769 - ENGINE_EVENT - Finished steps in process (pid: 7769) in 25ms
2025-06-08 19:46:04 -0400 - dagster - DEBUG - second_job - b9315291-7940-4288-bf16-a1620d0eccb2 - 7769 - RUN_SUCCESS - Finished execution of run for "second_job".
<dagster._core.execution.execute_in_process_result.ExecuteInProcessResult object at 0x11b268a30>

Dagster version

dagster, version 1.10.18

Deployment type

Local

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

Metadata

Metadata

Assignees

No one assigned

    Labels

    type: bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0