-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Add metadata columns to the RDBMS contrib #2440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add metadata columns to the RDBMS contrib #2440
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a cool addition; however, i'm not completely sold on it yet. It really feels like a specific wrapper around post_copy
, but it's executed along side it.
The end goal you're trying to achieve is to easily modify a set series of metadata columns after performing a Redshift Copy. This feels like a finer-grained version of post_copy
.
By no means am I rejecting this feature. Let's continue this chat to determine if this is something we can better fit into a streamlined process, without introducing a lot of complexity.
Thanks for submitting!
luigi/contrib/redshift.py
Outdated
@@ -37,6 +37,66 @@ | |||
"Will crash at runtime if postgres functionality is used.") | |||
|
|||
|
|||
class _MetadataColumnsMixin(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't distinct to Redshift as i believe this could also function in Postgres. Possibly move this mixin to rdbms.py
?
luigi/contrib/redshift.py
Outdated
@@ -321,6 +381,9 @@ def create_table(self, connection): | |||
raise ValueError("create_table() found no columns for %r" | |||
% self.table) | |||
|
|||
if self.do_enable_metadata_columns: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to enable_metadata_columns
luigi/contrib/redshift.py
Outdated
@@ -338,6 +401,9 @@ def run(self): | |||
self.copy(cursor, path) | |||
self.post_copy(cursor) | |||
|
|||
if self.do_enable_metadata_columns: | |||
self.post_copy_metacolumns(cursor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused on how this would interact with post_copy
. Seems like you would never run both together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've highlighted an example at the PR level displaying a use-case where we have both cases that run.
You would be right to say that it's a wrapper around I would agree that they are pretty much the same but we use Alongside this, we also want to be able to run generic updates. For instance, I've created a gist that may better demonstrate how we use it: https://gist.github.com/cabouffard/73cfc798bef9ec09fc91807b99a6c5a9 That said, having this separated in two different methods, allow us to not have to override the content of
Totally agreed with this statement! |
I feel like I haven't sold on your on this one, is that the case @dlstadther ? :) |
Hey @cabouffard After thinking on this all morning, I think my hesitations here are more-so So, let's continue with getting your metadata feature pushed through. I believe I would like like to see the Mixin moved to rdbms (assuming it's also applicable to Postgres (which i believe is the case)). Can you also mesh some of your gist together in a docstring for the Mixin? I believe another hesitation I had (prior to your gist) was lack of implementation clarity. I very much appreciate all your recent PRs! Sorry for the delay addressing them to completion! |
8394978
to
22d4912
Compare
@dlstadther No worries mate! I've added a whole bunch of text, some extra bit of documentation and changed some code-part with the suggestions you've provided. Is there anything else that you would like for me to do? I'm happy to hear any suggestions! |
@dlstadther Hey, what's the latest on this one? :) |
Sorry for my delayed response here @cabouffard . I pretty sure i'm all good here, but let me review real quick once more this afternoon :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a few grammatical issues, a syntax improvement, and one core question.
Other than that, I'm good with this. Particularly, since the default behavior of Redshift Copy will not change.
luigi/contrib/rdbms.py
Outdated
@@ -27,7 +27,125 @@ | |||
logger = logging.getLogger('luigi-interface') | |||
|
|||
|
|||
class CopyToTable(luigi.task.MixinNaiveBulkComplete, luigi.Task): | |||
class _MetadataColumnsMixin(object): | |||
"""Provide an additinal behavior that adds columns and values to tables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo in additional
luigi/contrib/rdbms.py
Outdated
|
||
Example: | ||
|
||
This is a use example or how this mixin could come handy and how to use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you trying to say This is an example use-case of how this mixin could come in handy.
?
luigi/contrib/rdbms.py
Outdated
def update_report_execution_date_query(self): | ||
query = 'UPDATE {0} ' \ | ||
"SET date_param = DATE '{1}' " \ | ||
'WHERE date_param IS NULL'.format(self.table, self.date) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use the same quote style for these lines? Rather than going back and forth between single and double quote.
luigi/contrib/rdbms.py
Outdated
|
||
|
||
class RedshiftCopyCSVToTableFromS3(CommonMetaColumnsBehavior, redshift.S3CopyToTable): | ||
"We have some business override heres that would only add noise to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here
(not heres
)
luigi/contrib/rdbms.py
Outdated
for column in self.metadata_columns: | ||
if len(column) == 0: | ||
logger.info('Unable to infer column information from column {column} for {table}'.format(column=column, table=self.table)) | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this raise an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thorned with that decision. Raising an exception would halt the pipeline and other tasks from running while breaking out of the function would simply not add the meta_columns like it would be expected.
Breaking would proceed with the execution of the pipeline but may yield potentially unexpected results, while raising would halt everything from running.
I think that you bring a fair point and I'll implement an exception as this is how it seems that we're managing that type of cases throughout that contrib.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that neither solution is the most ideal. But i'm more comfortable being explicit in what must be done to achieve happy path. Anything straying from that to raise an exception.
test/contrib/rdbms_test.py
Outdated
|
||
|
||
# Using Redshift as the test bed since Redshift implements RDBMS. Could have | ||
# opted for PSQL but I'm less familar with that contrib. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps make this comment a docstring to add emphasis. I almost asked why redshift was being used, but then i saw this comment
The goal of this feature is to allow metadata column to exists for specific tables created by the Redshift contrib related tasks. Given the scenario where we would always have to have a `created_tz` column at the end of every tables generated by that contrib we could do the following: ```python UpdateTableTask(redshift.S3CopyToTable): def metadata_columns(self): return [('created_tz', 'TIMESTAMP')] def post_queries(self): return query = 'UPDATE {0} ' \ 'SET created_tz = CURRENT_TIMESTAMP ' \ 'WHERE created_tz IS NULL'.format(self.table) ``` Adding layer of abstraction over this feature, you could easily add many default behavior for specific tables for versioning the table and more. This feature is opt-in by default since we don't want this break other people's pipeline after integrating this.
As suggested in the code-review, there are multiple other DBs that could benefit from this change. Currently, only PSQL and Redshift implements RDMS but other may implement this class and inherits that new behavior.
We've been internally using this feature for Redshift but moving this to the RDBMS contrib and adding this behavior to PSQL could have unexpected side effects, this takes care of testing if the feature works correctly under Redshift and PSQL.
If the count of metadata_columns is 0 and we're expecting to add them to the table, then we raise an error because that is an invalid flow. The contributor is required to have metadata_columns values if we want to add that column to the table.
22d4912
to
70d7023
Compare
@dlstadther Hey ya, we're all green and I've added your suggestions / fixed typos! :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all your work here!
* Add metadata column feature to Redshift The goal of this feature is to allow metadata column to exists for specific tables created by the Redshift contrib related tasks. Given the scenario where we would always have to have a `created_tz` column at the end of every tables generated by that contrib we could do the following: ```python UpdateTableTask(redshift.S3CopyToTable): def metadata_columns(self): return [('created_tz', 'TIMESTAMP')] def post_queries(self): return query = 'UPDATE {0} ' \ 'SET created_tz = CURRENT_TIMESTAMP ' \ 'WHERE created_tz IS NULL'.format(self.table) ``` Adding layer of abstraction over this feature, you could easily add many default behavior for specific tables for versioning the table and more. This feature is opt-in by default since we don't want this break other people's pipeline after integrating this. * Move the Metadata Columns implementation to the RDBMS As suggested in the code-review, there are multiple other DBs that could benefit from this change. Currently, only PSQL and Redshift implements RDMS but other may implement this class and inherits that new behavior. * Add tests for the Metadata behaviors We've been internally using this feature for Redshift but moving this to the RDBMS contrib and adding this behavior to PSQL could have unexpected side effects, this takes care of testing if the feature works correctly under Redshift and PSQL. * Add additional documentation on how to use the new mixin * Raise ValueError on invalid metadata_columns for RDBMS If the count of metadata_columns is 0 and we're expecting to add them to the table, then we raise an error because that is an invalid flow. The contributor is required to have metadata_columns values if we want to add that column to the table.
* Add metadata column feature to Redshift The goal of this feature is to allow metadata column to exists for specific tables created by the Redshift contrib related tasks. Given the scenario where we would always have to have a `created_tz` column at the end of every tables generated by that contrib we could do the following: ```python UpdateTableTask(redshift.S3CopyToTable): def metadata_columns(self): return [('created_tz', 'TIMESTAMP')] def post_queries(self): return query = 'UPDATE {0} ' \ 'SET created_tz = CURRENT_TIMESTAMP ' \ 'WHERE created_tz IS NULL'.format(self.table) ``` Adding layer of abstraction over this feature, you could easily add many default behavior for specific tables for versioning the table and more. This feature is opt-in by default since we don't want this break other people's pipeline after integrating this. * Move the Metadata Columns implementation to the RDBMS As suggested in the code-review, there are multiple other DBs that could benefit from this change. Currently, only PSQL and Redshift implements RDMS but other may implement this class and inherits that new behavior. * Add tests for the Metadata behaviors We've been internally using this feature for Redshift but moving this to the RDBMS contrib and adding this behavior to PSQL could have unexpected side effects, this takes care of testing if the feature works correctly under Redshift and PSQL. * Add additional documentation on how to use the new mixin * Raise ValueError on invalid metadata_columns for RDBMS If the count of metadata_columns is 0 and we're expecting to add them to the table, then we raise an error because that is an invalid flow. The contributor is required to have metadata_columns values if we want to add that column to the table.
* upstream-master: (82 commits) S3 client refactor (spotify#2482) Rename to rpc_log_retries, and make it apply to all the logging involved Factor log_exceptions into a configuration parameter Fix attribute forwarding for tasks with dynamic dependencies (spotify#2478) Add a visiblity level for luigi.Parameters (spotify#2278) Add support for multiple requires and inherits arguments (spotify#2475) Add metadata columns to the RDBMS contrib (spotify#2440) Fix race condition in luigi.lock.acquire_for (spotify#2357) (spotify#2477) tests: Use RunOnceTask where possible (spotify#2476) Optional TOML configs support (spotify#2457) Added default port behaviour for Redshift (spotify#2474) Add codeowners file with default and specific example (spotify#2465) Add Data Revenue to the `blogged` list (spotify#2472) Fix Scheduler.add_task to overwrite accepts_messages attribute. (spotify#2469) Use task_id comparison in Task.__eq__. (spotify#2462) Add stale config Move github templates to .github dir Fix transfer config import (spotify#2458) Additions to provide support for the Load Sharing Facility (LSF) job scheduler (spotify#2373) Version 2.7.6 ...
* upstream-master: S3 client refactor (spotify#2482) Rename to rpc_log_retries, and make it apply to all the logging involved Factor log_exceptions into a configuration parameter Fix attribute forwarding for tasks with dynamic dependencies (spotify#2478) Add a visiblity level for luigi.Parameters (spotify#2278) Add support for multiple requires and inherits arguments (spotify#2475) Add metadata columns to the RDBMS contrib (spotify#2440) Fix race condition in luigi.lock.acquire_for (spotify#2357) (spotify#2477) tests: Use RunOnceTask where possible (spotify#2476) Optional TOML configs support (spotify#2457) Added default port behaviour for Redshift (spotify#2474) Add codeowners file with default and specific example (spotify#2465) Add Data Revenue to the `blogged` list (spotify#2472)
* upstream-master: Remove long-deprecated scheduler config variable alternatives (spotify#2491) Bump tornado milestone version (spotify#2490) Update moto to 1.x milestone version (spotify#2471) Use passed password when create a redis connection (spotify#2489) S3 client refactor (spotify#2482) Rename to rpc_log_retries, and make it apply to all the logging involved Factor log_exceptions into a configuration parameter Fix attribute forwarding for tasks with dynamic dependencies (spotify#2478) Add a visiblity level for luigi.Parameters (spotify#2278) Add support for multiple requires and inherits arguments (spotify#2475) Add metadata columns to the RDBMS contrib (spotify#2440) Fix race condition in luigi.lock.acquire_for (spotify#2357) (spotify#2477) tests: Use RunOnceTask where possible (spotify#2476) Optional TOML configs support (spotify#2457) Added default port behaviour for Redshift (spotify#2474) Add codeowners file with default and specific example (spotify#2465) Add Data Revenue to the `blogged` list (spotify#2472) Fix Scheduler.add_task to overwrite accepts_messages attribute. (spotify#2469) Use task_id comparison in Task.__eq__. (spotify#2462)
The goal of this feature is to allow metadata column to exists for
specific tables created for Redshift.
Given the scenario where we would always have to have a
created_tz
column at the end of every table generated by that contrib we could do
the following:
Adding a layer of abstraction over this feature, you could easily
add many default behavior for specific tables for versioning the table
and more.
This feature is opt-in by default since we don't want this break other
people's pipeline after integrating this.
Do we think that this would be a feature that you guys would be interested in merging? I'm unsure if this becomes too specific for our use-case, but it should help some other people to have that type of behavior if need be.
If that's something that you guys are interested in merging, then I'll spend some time in writing a proper testing suite!
Let me know!