title: Bug: Missing columns when using table prefix with MSSQL in dlt · Issue #2688 · dlt-hub/dlt · GitHub
More Web Proxy on the site http://driver.im/
You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I've encountered a bug in dlt when working with MSSQL databases that affects how columns are handled when table prefixes are applied.
Issue 1: Unexpected column without prefix
When loading tables without changing table names:
A table called 'person' ends up with 21 columns instead of the expected 20
These should be 18 original columns + 2 dlt columns ('dlt_load_id' and 'dlt_tid')
There's an unexpected additional column called 'signatur_vtext'
For context, the original table has a column called 'signature' containing images
Issue 2: Missing columns with prefix
When adding a prefix to tables (e.g., 'raw_person' instead of 'person'):
The destination table has only 18 columns on the new server
This includes the standard dlt columns ('dlt_load_id' and 'dlt_tid')
Two original columns ('persontype' and 'forekort') are missing
One of these missing columns was empty in the source, but the other contained values
I've tried adding each table to the SQL base separately, but it didn't resolve the issue.
Code Samples
Version 1: Without Table Prefix
@dlt.source()deftrafikkapp_source():
# Get the base sql_database sourcebase_source=sql_database(
table_names=["ff","kjoretoy", "person","fartoy","vei","gjenpart","overtredelse","politibetjent","statuslogg"],
)
# Extract specific tablesff_resource=base_source.resources["ff"]
kjoretoy=base_source.resources["kjoretoy"]
person=base_source.resources["person"]
fartoy=base_source.resources["fartoy"]
vei=base_source.resources["vei"]
gjenpart=base_source.resources["gjenpart"]
overtredelse=base_source.resources["overtredelse"]
politibetjent=base_source.resources["politibetjent"]
statuslogg=base_source.resources["statuslogg"]
@dlt.resource(name="ff",primary_key="id",write_disposition="replace" )deftransform_ff():
# Process the original ff data in batchesforbatchinff_resource:
# Handle different batch formatsifnotbatch: # Skip empty batchescontinue# Convert batch to DataFrame with proper handlingtry:
# If batch is a list of dictionariesifisinstance(batch, list):
ifnotbatch: # Empty listcontinuedf=pd.DataFrame(batch)
# If batch is a single dictionary/recordelifisinstance(batch, dict):
df=pd.DataFrame([batch]) # Wrap in list# If batch is already a DataFrameelifisinstance(batch, pd.DataFrame):
df=batchelse:
# Convert other formats to DataFramedf=pd.DataFrame(batch)
exceptValueErrorase:
if"scalar values"instr(e):
# Handle scalar values by creating a single-row DataFramedf=pd.DataFrame([batch])
else:
raisee# Apply transformations only if coordinates existif'lengdegrad'indf.columnsand'breddegrad'indf.columns:
# Filter rows with valid coordinatesvalid_coords=df['lengdegrad'].notna() &df['breddegrad'].notna()
ifvalid_coords.any():
# Apply transformation only to rows with valid coordinatesdf.loc[valid_coords, 'Geodata'] =df.loc[valid_coords].apply(
lambdarow: hente_kommune(row['lengdegrad'], row['breddegrad']), axis=1
)
# Expand Geodata and renameif'Geodata'indf.columns:
geodata_expanded=df['Geodata'].apply(pd.Series)
df=df.join(geodata_expanded).drop('Geodata', axis=1)
if'Navn'indf.columns:
df=df.rename(columns={'Navn': 'Kommune'})
# Yield the transformed data as recordsyielddf.to_dict('records')
return [transform_ff,kjoretoy, person,fartoy,vei,gjenpart,overtredelse,politibetjent,statuslogg]
@dlt_assets(dlt_source=trafikkapp_source(),dlt_pipeline=dlt.pipeline(pipeline_name="trafikk_app_load",destination='mssql',dataset_name="source_data_trafikkapp" ),name="trafikkapp_selected_tables",group_name="source_data_trafikkapp",dagster_dlt_translator=CustomDagsterDltTranslator())deftrafikkapp_database_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
"""Load selected tables from trafikkapp database"""yieldfromdlt.run(context=context)```
### Version 2 : with table prefix```python@dlt.source(parallelized=True)deftrafikkapp_source():
prefix="raw_"# Get the base sql_database source base_source=sql_database( table_names=["ff","kjoretoy", "person","fartoy","vei","gjenpart","overtredelse","politibetjent","statuslogg"], )
simple_tables= ["kjoretoy", "person", "fartoy", "vei", "gjenpart", "overtredelse", "politibetjent", "statuslogg"]
defmake_resource(table_name):
# Access the resource directly from base_source raw_resource=base_source.resources[table_name]
#raw_resource.apply_hints(write_disposition="replace") @dlt.resource(name=prefix+table_name, write_disposition="replace")defresource_func():
yieldfromraw_resourcereturnresource_func# Create resources resources= [make_resource(table) fortableinsimple_tables]
raw_ff_resource=base_source.resources["ff"]
@dlt.resource(name=prefix+"ff",primary_key="id",write_disposition="merge" )defraw_transform_ff():
ff_source=sql_database(table_names=["ff"])
raw_ff_resource=ff_source.resources["ff"]
# Process the original ff data in batchesforbatchinraw_ff_resource:
# Handle different batch formatsifnotbatch: # Skip empty batchescontinue# Convert batch to DataFrame with proper handlingtry:
# If batch is a list of dictionariesifisinstance(batch, list):
ifnotbatch: # Empty listcontinuedf=pd.DataFrame(batch)
# If batch is a single dictionary/recordelifisinstance(batch, dict):
df=pd.DataFrame([batch]) # Wrap in list# If batch is already a DataFrameelifisinstance(batch, pd.DataFrame):
df=batchelse:
# Convert other formats to DataFramedf=pd.DataFrame(batch)
exceptValueErrorase:
if"scalar values"instr(e):
# Handle scalar values by creating a single-row DataFramedf=pd.DataFrame([batch])
else:
raisee# Apply transformations only if coordinates existif'lengdegrad'indf.columnsand'breddegrad'indf.columns:
# Filter rows with valid coordinatesvalid_coords=df['lengdegrad'].notna() &df['breddegrad'].notna()
ifvalid_coords.any():
# Apply transformation only to rows with valid coordinatesdf.loc[valid_coords, 'Geodata'] =df.loc[valid_coords].apply(
lambdarow: hente_kommune(row['lengdegrad'], row['breddegrad']), axis=1
)
# Expand Geodata and renameif'Geodata'indf.columns:
geodata_expanded=df['Geodata'].apply(pd.Series)
df=df.join(geodata_expanded).drop('Geodata', axis=1)
if'Navn'indf.columns:
df=df.rename(columns={'Navn': 'Kommune'})
# Yield the transformed data as recordsyielddf.to_dict('records')
return [raw_transform_ff] +resources@dlt_assets(dlt_source=trafikkapp_source(),dlt_pipeline=dlt.pipeline(pipeline_name="trafikk_app_load",destination='mssql',dataset_name="source_data_trafikkapp" ),name="trafikkapp_selected_tables",group_name="source_data_trafikkapp",dagster_dlt_translator=CustomDagsterDltTranslator())deftrafikkapp_database_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
"""Load selected tables from trafikkapp database"""yieldfromdlt.run(context=context)```
### Expected behavior_Noresponse_### Steps to reproducecodegivenup.
### Operating systemLinux### Runtime environmentLocal### Python version3.10### dlt data source_Noresponse_### dlt destination_Noresponse_### Other deployment detailsmssqlODBCmicrosoftserver18### Additional information_Noresponse_
The text was updated successfully, but these errors were encountered:
dlt version
1.11.0
Describe the problem
I've encountered a bug in dlt when working with MSSQL databases that affects how columns are handled when table prefixes are applied.
Issue 1: Unexpected column without prefix
When loading tables without changing table names:
Issue 2: Missing columns with prefix
When adding a prefix to tables (e.g., 'raw_person' instead of 'person'):
I've tried adding each table to the SQL base separately, but it didn't resolve the issue.
Code Samples
Version 1: Without Table Prefix
The text was updated successfully, but these errors were encountered: