8000 title: Bug: Missing columns when using table prefix with MSSQL in dlt · Issue #2688 · dlt-hub/dlt · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

title: Bug: Missing columns when using table prefix with MSSQL in dlt #2688

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
NunoTrigo1986 opened this issue May 27, 2025 · 0 comments
Open

Comments

@NunoTrigo1986
Copy link

dlt version

1.11.0

Describe the problem

I've encountered a bug in dlt when working with MSSQL databases that affects how columns are handled when table prefixes are applied.

Issue 1: Unexpected column without prefix

When loading tables without changing table names:

  • A table called 'person' ends up with 21 columns instead of the expected 20
  • These should be 18 original columns + 2 dlt columns ('dlt_load_id' and 'dlt_tid')
  • There's an unexpected additional column called 'signatur_vtext'
  • For context, the original table has a column called 'signature' containing images

Issue 2: Missing columns with prefix

When adding a prefix to tables (e.g., 'raw_person' instead of 'person'):

  • The destination table has only 18 columns on the new server
  • This includes the standard dlt columns ('dlt_load_id' and 'dlt_tid')
  • Two original columns ('persontype' and 'forekort') are missing
  • One of these missing columns was empty in the source, but the other contained values

I've tried adding each table to the SQL base separately, but it didn't resolve the issue.

Code Samples

Version 1: Without Table Prefix

@dlt.source()
def trafikkapp_source():
    # Get the base sql_database source
    base_source = sql_database(
        table_names=["ff","kjoretoy", "person","fartoy","vei","gjenpart","overtredelse","politibetjent","statuslogg"],
    )
    
    # Extract specific tables
    ff_resource = base_source.resources["ff"]
    kjoretoy = base_source.resources["kjoretoy"] 
    person = base_source.resources["person"]
    fartoy = base_source.resources["fartoy"]
    vei = base_source.resources["vei"]
    gjenpart = base_source.resources["gjenpart"]
    overtredelse = base_source.resources["overtredelse"]
    politibetjent = base_source.resources["politibetjent"]
    statuslogg = base_source.resources["statuslogg"]  

    @dlt.resource(
        name="ff",
        primary_key="id",
        write_disposition="replace"
    )
    def transform_ff():
        # Process the original ff data in batches
        for batch in ff_resource:
            # Handle different batch formats
            if not batch:  # Skip empty batches
                continue

            # Convert batch to DataFrame with proper handling
            try:
                # If batch is a list of dictionaries
                if isinstance(batch, list):
                    if not batch:  # Empty list
                        continue
                    df = pd.DataFrame(batch)
                # If batch is a single dictionary/record
                elif isinstance(batch, dict):
                    df = pd.DataFrame([batch])  # Wrap in list
                # If batch is already a DataFrame
                elif isinstance(batch, pd.DataFrame):
                    df = batch
                else:
                    # Convert other formats to DataFrame
                    df = pd.DataFrame(batch)

            except ValueError as e:
                if "scalar values" in str(e):
                    # Handle scalar values by creating a single-row DataFrame
                    df = pd.DataFrame([batch])
                else:
                    raise e


            # Apply transformations only if coordinates exist
            if 'lengdegrad' in df.columns and 'breddegrad' in df.columns:
                # Filter rows with valid coordinates
                valid_coords = df['lengdegrad'].notna() & df['breddegrad'].notna()

                if valid_coords.any():
                    # Apply transformation only to rows with valid coordinates
                    df.loc[valid_coords, 'Geodata'] = df.loc[valid_coords].apply(
                        lambda row: hente_kommune(row['lengdegrad'], row['breddegrad']), axis=1
                    )

                    # Expand Geodata and rename
                    if 'Geodata' in df.columns:
                        geodata_expanded = df['Geodata'].apply(pd.Series)
                        df = df.join(geodata_expanded).drop('Geodata', axis=1)
                        if 'Navn' in df.columns:
                            df = df.rename(columns={'Navn': 'Kommune'})

            # Yield the transformed data as records
            yield df.to_dict('records')

    return [transform_ff,kjoretoy, person,fartoy,vei,gjenpart,overtredelse,politibetjent,statuslogg]


@dlt_assets(
    dlt_source=trafikkapp_source(),
    dlt_pipeline=dlt.pipeline(
        pipeline_name="trafikk_app_load",
        destination='mssql',
        dataset_name="source_data_trafikkapp"
    ),
    name="trafikkapp_selected_tables",
    group_name="source_data_trafikkapp",
    dagster_dlt_translator=CustomDagsterDltTranslator()
)
def trafikkapp_database_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
    """Load selected tables from trafikkapp database"""
    yield from dlt.run(context=context)```

### Version 2 : with table prefix

```python
@dlt.source(parallelized=True) 
def trafikkapp_source(): 
    prefix = "raw_"  
    # Get the base sql_database source 
    base_source = sql_database( table_names=["ff","kjoretoy", "person","fartoy","vei","gjenpart","overtredelse","politibetjent","statuslogg"],  )  
    
    simple_tables = ["kjoretoy", "person", "fartoy", "vei", "gjenpart", "overtredelse", "politibetjent", "statuslogg"]  
    
    def make_resource(table_name): 
    # Access the resource directly from base_source 
        raw_resource = base_source.resources[table_name] 
    #raw_resource.apply_hints(write_disposition="replace")  
    
        @dlt.resource(name=prefix + table_name, write_disposition="replace") 
        def resource_func(): 
            yield from raw_resource 
    
        return resource_func  
    
    # Create resources 
    resources = [make_resource(table) for table in simple_tables] 
    raw_ff_resource = base_source.resources["ff"]

    @dlt.resource(
        name=prefix +"ff",
        primary_key="id",
        write_disposition="merge"
    )
    def raw_transform_ff():
        ff_source = sql_database(table_names=["ff"])
        raw_ff_resource = ff_source.resources["ff"]
        # Process the original ff data in batches
        for batch in raw_ff_resource:

        # Handle different batch formats
            if not batch:  # Skip empty batches
                continue

            # Convert batch to DataFrame with proper handling
            try:
                # If batch is a list of dictionaries
                if isinstance(batch, list):
                    if not batch:  # Empty list
                        continue
                    df = pd.DataFrame(batch)
                # If batch is a single dictionary/record
                elif isinstance(batch, dict):
                    df = pd.DataFrame([batch])  # Wrap in list
                # If batch is already a DataFrame
                elif isinstance(batch, pd.DataFrame):
                    df = batch
                else:
                    # Convert other formats to DataFrame
                    df = pd.DataFrame(batch)

            except ValueError as e:
                if "scalar values" in str(e):
                    # Handle scalar values by creating a single-row DataFrame
                    df = pd.DataFrame([batch])
                else:
                    raise e


            # Apply transformations only if coordinates exist
            if 'lengdegrad' in df.columns and 'breddegrad' in df.columns:
                # Filter rows with valid coordinates
                valid_coords = df['lengdegrad'].notna() & df['breddegrad'].notna()

                if valid_coords.any():
                    # Apply transformation only to rows with valid coordinates
                    df.loc[valid_coords, 'Geodata'] = df.loc[valid_coords].apply(
                        lambda row: hente_kommune(row['lengdegrad'], row['breddegrad']), axis=1
                    )

                    # Expand Geodata and rename
                    if 'Geodata' in df.columns:
                        geodata_expanded = df['Geodata'].apply(pd.Series)
                        df = df.join(geodata_expanded).drop('Geodata', axis=1)
                        if 'Navn' in df.columns:
                            df = df.rename(columns={'Navn': 'Kommune'})

            # Yield the transformed data as records
            yield df.to_dict('records')

    return [raw_transform_ff] + resources


@dlt_assets(
    dlt_source=trafikkapp_source(),
    dlt_pipeline=dlt.pipeline(
        pipeline_name="trafikk_app_load",
        destination='mssql',
        dataset_name="source_data_trafikkapp"
    ),
    name="trafikkapp_selected_tables",
    group_name="source_data_trafikkapp",
    dagster_dlt_translator=CustomDagsterDltTranslator()
)
def trafikkapp_database_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
    """Load selected tables from trafikkapp database"""
    yield from dlt.run(context=context)```

### Expected behavior

_No response_

### Steps to reproduce

code given up. 

### Operating system

Linux

### Runtime environment

Local

### Python version

3.10

### dlt data source

_No response_

### dlt destination

_No response_

### Other deployment details

mssql ODBC microsoft server 18

### Additional information

_No response_
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Todo
Development

No branches or pull requests

1 participant
0