From 5be171d1c34eb4deacba9901b2c1c8dd5fbc52ed Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Tue, 15 May 2018 15:14:40 -0400 Subject: [PATCH 1/5] Add metadata column feature to Redshift The goal of this feature is to allow metadata column to exists for specific tables created by the Redshift contrib related tasks. Given the scenario where we would always have to have a `created_tz` column at the end of every tables generated by that contrib we could do the following: ```python UpdateTableTask(redshift.S3CopyToTable): def metadata_columns(self): return [('created_tz', 'TIMESTAMP')] def post_queries(self): return query = 'UPDATE {0} ' \ 'SET created_tz = CURRENT_TIMESTAMP ' \ 'WHERE created_tz IS NULL'.format(self.table) ``` Adding layer of abstraction over this feature, you could easily add many default behavior for specific tables for versioning the table and more. This feature is opt-in by default since we don't want this break other people's pipeline after integrating this. --- luigi/contrib/redshift.py | 76 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/luigi/contrib/redshift.py b/luigi/contrib/redshift.py index a1c5c43172..b9cace61ea 100644 --- a/luigi/contrib/redshift.py +++ b/luigi/contrib/redshift.py @@ -37,6 +37,66 @@ "Will crash at runtime if postgres functionality is used.") +class _MetadataColumnsMixin(object): + """ + This mixin is used to provide an additional behavior that allow a task to + add generic metadata columns to every table created for Redshift. + """ + @property + def metadata_columns(self): + """Returns the default metadata columns. + + Those columns are columns that we want each tables to have by default. + """ + return [] + + @property + def metadata_queries(self): + return [] + + @property + def enable_metadata_columns(self): + return False + + def _add_metadata_columns(self, cursor): + for column in self.metadata_columns: + if len(column) == 0: + logger.info('Unable to infer column information from column {column} for {table}'.format(column=column, table=self.table)) + break + + column_name = column[0] + if not self._column_exists(cursor, column_name): + logger.info('Adding missing metadata column {column} to {table}'.format(column=column, table=self.table)) + self._add_column_to_table(cursor, column) + + def _column_exists(self, cursor, column_name): + if '.' in self.table: + schema, table = self.table.split('.') + query = "SELECT 1 AS column_exists " \ + "FROM information_schema.columns " \ + "WHERE table_schema = LOWER('{0}') AND table_name = LOWER('{1}') AND column_name = LOWER('{2}') LIMIT 1".format(schema, table, column_name) + else: + query = "SELECT 1 AS column_exists " \ + "FROM information_schema.columns " \ + "WHERE table_name = LOWER('{0}') AND column_name = LOWER('{1}') LIMIT 1".format(table, column_name) + + cursor.execute(query) + result = cursor.fetchone() + return bool(result) + + def _add_column_to_table(self, cursor, column): + if len(column) == 1: + raise ValueError("_add_column_to_table() column type not specified for {column}".format(column=column[0])) + elif len(column) == 2: + query = "ALTER TABLE {table} ADD COLUMN {column}".format(table=self.table, column=' '.join(column)) + elif len(column) == 3: + query = "ALTER TABLE {table} ADD COLUMN {column} ENCODE {encoding}".format(table=self.table, column=' '.join(column[0:2]), encoding=column[3]) + else: + raise ValueError("_add_column_to_table() found no matching behavior for {column}".format(column=column)) + + cursor.execute(query) + + class _CredentialsMixin(): """ This mixin is used to provide the same credential properties @@ -141,7 +201,7 @@ class RedshiftTarget(postgres.PostgresTarget): use_db_timestamps = False -class S3CopyToTable(rdbms.CopyToTable, _CredentialsMixin): +class S3CopyToTable(rdbms.CopyToTable, _CredentialsMixin, _MetadataColumnsMixin): """ Template task for inserting a data set into Redshift from s3. @@ -373,6 +433,9 @@ def run(self): self.copy(cursor, path) self.post_copy(cursor) + if self.enable_metadata_columns: + self.post_copy_metacolumns(cursor) + # update marker table output.touch(connection) connection.commit() @@ -472,6 +535,9 @@ def init_copy(self, connection): logger.info("Creating table %s", self.table) self.create_table(connection) + if self.enable_metadata_columns: + self._add_metadata_columns(connection.cursor()) + if self.do_truncate_table: logger.info("Truncating table %s", self.table) self.truncate_table(connection) @@ -488,6 +554,14 @@ def post_copy(self, cursor): for query in self.queries: cursor.execute(query) + def post_copy_metacolums(self, cursor): + """ + Performs post-copy to fill metadata columns. + """ + logger.info('Executing post copy metadata queries') + for query in self.metadata_queries: + cursor.execute(query) + class S3CopyJSONToTable(S3CopyToTable, _CredentialsMixin): """ From 528e0b138278da9695a9a93bb1704e413a7859bb Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Mon, 9 Jul 2018 13:34:30 -0400 Subject: [PATCH 2/5] Move the Metadata Columns implementation to the RDBMS As suggested in the code-review, there are multiple other DBs that could benefit from this change. Currently, only PSQL and Redshift implements RDMS but other may implement this class and inherits that new behavior. --- luigi/contrib/rdbms.py | 66 ++++++++++++++++++++++++++++++++++++++- luigi/contrib/redshift.py | 62 +----------------------------------- 2 files changed, 66 insertions(+), 62 deletions(-) diff --git a/luigi/contrib/rdbms.py b/luigi/contrib/rdbms.py index d7b82d4c9d..61149aa468 100644 --- a/luigi/contrib/rdbms.py +++ b/luigi/contrib/rdbms.py @@ -27,7 +27,68 @@ logger = logging.getLogger('luigi-interface') -class CopyToTable(luigi.task.MixinNaiveBulkComplete, luigi.Task): +class _MetadataColumnsMixin(object): + """ + This mixin is used to provide an additional behavior that allow a task to + add generic metadata columns to every table created for both PSQL and + Redshift. + """ + @property + def metadata_columns(self): + """Returns the default metadata columns. + + Those columns are columns that we want each tables to have by default. + """ + return [] + + @property + def metadata_queries(self): + return [] + + @property + def enable_metadata_columns(self): + return False + + def _add_metadata_columns(self, cursor): + for column in self.metadata_columns: + if len(column) == 0: + logger.info('Unable to infer column information from column {column} for {table}'.format(column=column, table=self.table)) + break + + column_name = column[0] + if not self._column_exists(cursor, column_name): + logger.info('Adding missing metadata column {column} to {table}'.format(column=column, table=self.table)) + self._add_column_to_table(cursor, column) + + def _column_exists(self, cursor, column_name): + if '.' in self.table: + schema, table = self.table.split('.') + query = "SELECT 1 AS column_exists " \ + "FROM information_schema.columns " \ + "WHERE table_schema = LOWER('{0}') AND table_name = LOWER('{1}') AND column_name = LOWER('{2}') LIMIT 1".format(schema, table, column_name) + else: + query = "SELECT 1 AS column_exists " \ + "FROM information_schema.columns " \ + "WHERE table_name = LOWER('{0}') AND column_name = LOWER('{1}') LIMIT 1".format(table, column_name) + + cursor.execute(query) + result = cursor.fetchone() + return bool(result) + + def _add_column_to_table(self, cursor, column): + if len(column) == 1: + raise ValueError("_add_column_to_table() column type not specified for {column}".format(column=column[0])) + elif len(column) == 2: + query = "ALTER TABLE {table} ADD COLUMN {column}".format(table=self.table, column=' '.join(column)) + elif len(column) == 3: + query = "ALTER TABLE {table} ADD COLUMN {column} ENCODE {encoding}".format(table=self.table, column=' '.join(column[0:2]), encoding=column[3]) + else: + raise ValueError("_add_column_to_table() found no matching behavior for {column}".format(column=column)) + + cursor.execute(query) + + +class CopyToTable(luigi.task.MixinNaiveBulkComplete, _MetadataColumnsMixin, luigi.Task): """ An abstract task for inserting a data set into RDBMS. @@ -120,6 +181,9 @@ def init_copy(self, connection): if hasattr(self, "clear_table"): raise Exception("The clear_table attribute has been removed. Override init_copy instead!") + if self.enable_metadata_columns: + self._add_metadata_columns(connection.cursor()) + def post_copy(self, connection): """ Override to perform custom queries. diff --git a/luigi/contrib/redshift.py b/luigi/contrib/redshift.py index b9cace61ea..677023e20b 100644 --- a/luigi/contrib/redshift.py +++ b/luigi/contrib/redshift.py @@ -37,66 +37,6 @@ "Will crash at runtime if postgres functionality is used.") -class _MetadataColumnsMixin(object): - """ - This mixin is used to provide an additional behavior that allow a task to - add generic metadata columns to every table created for Redshift. - """ - @property - def metadata_columns(self): - """Returns the default metadata columns. - - Those columns are columns that we want each tables to have by default. - """ - return [] - - @property - def metadata_queries(self): - return [] - - @property - def enable_metadata_columns(self): - return False - - def _add_metadata_columns(self, cursor): - for column in self.metadata_columns: - if len(column) == 0: - logger.info('Unable to infer column information from column {column} for {table}'.format(column=column, table=self.table)) - break - - column_name = column[0] - if not self._column_exists(cursor, column_name): - logger.info('Adding missing metadata column {column} to {table}'.format(column=column, table=self.table)) - self._add_column_to_table(cursor, column) - - def _column_exists(self, cursor, column_name): - if '.' in self.table: - schema, table = self.table.split('.') - query = "SELECT 1 AS column_exists " \ - "FROM information_schema.columns " \ - "WHERE table_schema = LOWER('{0}') AND table_name = LOWER('{1}') AND column_name = LOWER('{2}') LIMIT 1".format(schema, table, column_name) - else: - query = "SELECT 1 AS column_exists " \ - "FROM information_schema.columns " \ - "WHERE table_name = LOWER('{0}') AND column_name = LOWER('{1}') LIMIT 1".format(table, column_name) - - cursor.execute(query) - result = cursor.fetchone() - return bool(result) - - def _add_column_to_table(self, cursor, column): - if len(column) == 1: - raise ValueError("_add_column_to_table() column type not specified for {column}".format(column=column[0])) - elif len(column) == 2: - query = "ALTER TABLE {table} ADD COLUMN {column}".format(table=self.table, column=' '.join(column)) - elif len(column) == 3: - query = "ALTER TABLE {table} ADD COLUMN {column} ENCODE {encoding}".format(table=self.table, column=' '.join(column[0:2]), encoding=column[3]) - else: - raise ValueError("_add_column_to_table() found no matching behavior for {column}".format(column=column)) - - cursor.execute(query) - - class _CredentialsMixin(): """ This mixin is used to provide the same credential properties @@ -201,7 +141,7 @@ class RedshiftTarget(postgres.PostgresTarget): use_db_timestamps = False -class S3CopyToTable(rdbms.CopyToTable, _CredentialsMixin, _MetadataColumnsMixin): +class S3CopyToTable(rdbms.CopyToTable, _CredentialsMixin): """ Template task for inserting a data set into Redshift from s3. From e721ff9fd5eefe162a7bd33f65df6a4b993f36bc Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Mon, 9 Jul 2018 18:02:58 -0400 Subject: [PATCH 3/5] Add tests for the Metadata behaviors We've been internally using this feature for Redshift but moving this to the RDBMS contrib and adding this behavior to PSQL could have unexpected side effects, this takes care of testing if the feature works correctly under Redshift and PSQL. --- luigi/contrib/postgres.py | 2 + luigi/contrib/rdbms.py | 20 ++- luigi/contrib/redshift.py | 2 +- test/contrib/postgres_test.py | 52 +++++++ test/contrib/rdbms_test.py | 255 ++++++++++++++++++++++++++++++++++ test/contrib/redshift_test.py | 92 ++++++++++++ 6 files changed, 416 insertions(+), 7 deletions(-) create mode 100644 test/contrib/rdbms_test.py diff --git a/luigi/contrib/postgres.py b/luigi/contrib/postgres.py index ec8b72df16..363cde70b0 100644 --- a/luigi/contrib/postgres.py +++ b/luigi/contrib/postgres.py @@ -326,6 +326,8 @@ def run(self): self.init_copy(connection) self.copy(cursor, tmp_file) self.post_copy(connection) + if self.enable_metadata_columns: + self.post_copy_metacolumns(cursor) except psycopg2.ProgrammingError as e: if e.pgcode == psycopg2.errorcodes.UNDEFINED_TABLE and attempt == 0: # if first attempt fails with "relation not found", try creating table diff --git a/luigi/contrib/rdbms.py b/luigi/contrib/rdbms.py index 61149aa468..ced695d056 100644 --- a/luigi/contrib/rdbms.py +++ b/luigi/contrib/rdbms.py @@ -28,7 +28,8 @@ class _MetadataColumnsMixin(object): - """ + """Provice an additinal behavior that adds columns and values to tables + This mixin is used to provide an additional behavior that allow a task to add generic metadata columns to every table created for both PSQL and Redshift. @@ -49,7 +50,9 @@ def metadata_queries(self): def enable_metadata_columns(self): return False - def _add_metadata_columns(self, cursor): + def _add_metadata_columns(self, connection): + cursor = connection.cursor() + for column in self.metadata_columns: if len(column) == 0: logger.info('Unable to infer column information from column {column} for {table}'.format(column=column, table=self.table)) @@ -65,11 +68,11 @@ def _column_exists(self, cursor, column_name): schema, table = self.table.split('.') query = "SELECT 1 AS column_exists " \ "FROM information_schema.columns " \ - "WHERE table_schema = LOWER('{0}') AND table_name = LOWER('{1}') AND column_name = LOWER('{2}') LIMIT 1".format(schema, table, column_name) + "WHERE table_schema = LOWER('{0}') AND table_name = LOWER('{1}') AND column_name = LOWER('{2}') LIMIT 1;".format(schema, table, column_name) else: query = "SELECT 1 AS column_exists " \ "FROM information_schema.columns " \ - "WHERE table_name = LOWER('{0}') AND column_name = LOWER('{1}') LIMIT 1".format(table, column_name) + "WHERE table_name = LOWER('{0}') AND column_name = LOWER('{1}') LIMIT 1;".format(self.table, column_name) cursor.execute(query) result = cursor.fetchone() @@ -79,14 +82,19 @@ def _add_column_to_table(self, cursor, column): if len(column) == 1: raise ValueError("_add_column_to_table() column type not specified for {column}".format(column=column[0])) elif len(column) == 2: - query = "ALTER TABLE {table} ADD COLUMN {column}".format(table=self.table, column=' '.join(column)) + query = "ALTER TABLE {table} ADD COLUMN {column};".format(table=self.table, column=' '.join(column)) elif len(column) == 3: - query = "ALTER TABLE {table} ADD COLUMN {column} ENCODE {encoding}".format(table=self.table, column=' '.join(column[0:2]), encoding=column[3]) + query = "ALTER TABLE {table} ADD COLUMN {column} ENCODE {encoding};".format(table=self.table, column=' '.join(column[0:2]), encoding=column[2]) else: raise ValueError("_add_column_to_table() found no matching behavior for {column}".format(column=column)) cursor.execute(query) + def post_copy_metacolumns(self, cursor): + logger.info('Executing post copy metadata queries') + for query in self.metadata_queries: + cursor.execute(query) + class CopyToTable(luigi.task.MixinNaiveBulkComplete, _MetadataColumnsMixin, luigi.Task): """ diff --git a/luigi/contrib/redshift.py b/luigi/contrib/redshift.py index 677023e20b..6792efff09 100644 --- a/luigi/contrib/redshift.py +++ b/luigi/contrib/redshift.py @@ -476,7 +476,7 @@ def init_copy(self, connection): self.create_table(connection) if self.enable_metadata_columns: - self._add_metadata_columns(connection.cursor()) + self._add_metadata_columns(connection) if self.do_truncate_table: logger.info("Truncating table %s", self.table) diff --git a/test/contrib/postgres_test.py b/test/contrib/postgres_test.py index 5df6888343..eadd6d1018 100644 --- a/test/contrib/postgres_test.py +++ b/test/contrib/postgres_test.py @@ -121,3 +121,55 @@ def test_bulk_complete(self, mock_connect): 'DummyPostgresQuery_2015_01_06_f91a47ec40', ]) self.assertFalse(task.complete()) + + +@attr('postgres') +class TestCopyToTableWithMetaColumns(unittest.TestCase): + @mock.patch("luigi.contrib.postgres.CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True) + @mock.patch("luigi.contrib.postgres.CopyToTable._add_metadata_columns") + @mock.patch("luigi.contrib.postgres.CopyToTable.post_copy_metacolumns") + @mock.patch("luigi.contrib.postgres.CopyToTable.rows", return_value=['row1', 'row2']) + @mock.patch("luigi.contrib.postgres.PostgresTarget") + @mock.patch('psycopg2.connect') + def test_copy_with_metadata_columns_enabled(self, + mock_connect, + mock_redshift_target, + mock_rows, + mock_add_columns, + mock_update_columns, + mock_metadata_columns_enabled): + + task = DummyPostgresImporter(date=datetime.datetime(1991, 3, 24)) + + mock_cursor = MockPostgresCursor([task.task_id]) + mock_connect.return_value.cursor.return_value = mock_cursor + + task = DummyPostgresImporter(date=datetime.datetime(1991, 3, 24)) + task.run() + + self.assertTrue(mock_add_columns.called) + self.assertTrue(mock_update_columns.called) + + @mock.patch("luigi.contrib.postgres.CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=False) + @mock.patch("luigi.contrib.postgres.CopyToTable._add_metadata_columns") + @mock.patch("luigi.contrib.postgres.CopyToTable.post_copy_metacolumns") + @mock.patch("luigi.contrib.postgres.CopyToTable.rows", return_value=['row1', 'row2']) + @mock.patch("luigi.contrib.postgres.PostgresTarget") + @mock.patch('psycopg2.connect') + def test_copy_with_metadata_columns_disabled(self, + mock_connect, + mock_redshift_target, + mock_rows, + mock_add_columns, + mock_update_columns, + mock_metadata_columns_enabled): + + task = DummyPostgresImporter(date=datetime.datetime(1991, 3, 24)) + + mock_cursor = MockPostgresCursor([task.task_id]) + mock_connect.return_value.cursor.return_value = mock_cursor + + task.run() + + self.assertFalse(mock_add_columns.called) + self.assertFalse(mock_update_columns.called) diff --git a/test/contrib/rdbms_test.py b/test/contrib/rdbms_test.py new file mode 100644 index 0000000000..3127cb2e8d --- /dev/null +++ b/test/contrib/rdbms_test.py @@ -0,0 +1,255 @@ +# Copyright 2012-2015 Spotify AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +We're using Redshift as the test bed since Redshift implements RDBMS. We could +have opted for PSQL but we're less familiar with that contrib and there are +less examples on how to test it. +""" + +import luigi +import luigi.contrib.redshift +import mock + +import unittest + + +# Fake AWS and S3 credentials taken from `../redshift_test.py`. +AWS_ACCESS_KEY = 'key' +AWS_SECRET_KEY = 'secret' + +AWS_ACCOUNT_ID = '0123456789012' +AWS_ROLE_NAME = 'MyRedshiftRole' + +BUCKET = 'bucket' +KEY = 'key' + + +class DummyS3CopyToTableBase(luigi.contrib.redshift.S3CopyToTable): + # Class attributes taken from `DummyPostgresImporter` in + # `../postgres_test.py`. + host = 'dummy_host' + database = 'dummy_database' + user = 'dummy_user' + password = 'dummy_password' + table = luigi.Parameter(default='dummy_table') + columns = luigi.TupleParameter( + default=( + ('some_text', 'varchar(255)'), + ('some_int', 'int'), + ) + ) + + copy_options = '' + prune_table = '' + prune_column = '' + prune_date = '' + + def s3_load_path(self): + return 's3://%s/%s' % (BUCKET, KEY) + + +class DummyS3CopyToTableKey(DummyS3CopyToTableBase): + aws_access_key_id = AWS_ACCESS_KEY + aws_secret_access_key = AWS_SECRET_KEY + + +class TestS3CopyToTableWithMetaColumns(unittest.TestCase): + @mock.patch("luigi.contrib.redshift.S3CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True) + @mock.patch("luigi.contrib.redshift.S3CopyToTable.metadata_columns", new_callable=mock.PropertyMock, return_value=[('created_tz', 'TIMESTAMP')]) + @mock.patch("luigi.contrib.redshift.RedshiftTarget") + def test_copy_check_meta_columns_to_table_if_exists(self, + mock_redshift_target, + mock_metadata_columns, + mock_metadata_columns_enabled): + task = DummyS3CopyToTableKey(table='my_test_table') + task.run() + + mock_cursor = (mock_redshift_target.return_value + .connect + .return_value + .cursor + .return_value) + + executed_query = mock_cursor.execute.call_args_list[1][0][0] + + expected_output = "SELECT 1 AS column_exists FROM information_schema.columns " \ + "WHERE table_name = LOWER('{table}') " \ + "AND column_name = LOWER('{column}') " \ + "LIMIT 1;".format(table='my_test_table', column='created_tz') + + self.assertEqual(executed_query, expected_output) + + @mock.patch("luigi.contrib.redshift.S3CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True) + @mock.patch("luigi.contrib.redshift.S3CopyToTable.metadata_columns", new_callable=mock.PropertyMock, return_value=[('created_tz', 'TIMESTAMP')]) + @mock.patch("luigi.contrib.redshift.RedshiftTarget") + def test_copy_check_meta_columns_to_schematable_if_exists(self, + mock_redshift_target, + mock_metadata_columns, + mock_metadata_columns_enabled): + task = DummyS3CopyToTableKey(table='test.my_test_table') + task.run() + + mock_cursor = (mock_redshift_target.return_value + .connect + .return_value + .cursor + .return_value) + + executed_query = mock_cursor.execute.call_args_list[2][0][0] + + expected_output = "SELECT 1 AS column_exists FROM information_schema.columns " \ + "WHERE table_schema = LOWER('{schema}') " \ + "AND table_name = LOWER('{table}') " \ + "AND column_name = LOWER('{column}') " \ + "LIMIT 1;".format(schema='test', table='my_test_table', column='created_tz') + + self.assertEqual(executed_query, expected_output) + + @mock.patch("luigi.contrib.redshift.S3CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True) + @mock.patch("luigi.contrib.redshift.S3CopyToTable.metadata_columns", new_callable=mock.PropertyMock, return_value=[('created_tz', 'TIMESTAMP')]) + @mock.patch("luigi.contrib.redshift.S3CopyToTable._column_exists", return_value=True) + @mock.patch("luigi.contrib.redshift.S3CopyToTable._add_column_to_table") + @mock.patch("luigi.contrib.redshift.RedshiftTarget") + def test_copy_not_add_if_meta_columns_already_exists(self, + mock_redshift_target, + mock_add_to_table, + mock_columns_exists, + mock_metadata_columns, + mock_metadata_columns_enabled): + task = DummyS3CopyToTableKey() + task.run() + + self.assertFalse(mock_add_to_table.called) + + @mock.patch("luigi.contrib.redshift.S3CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True) + @mock.patch("luigi.contrib.redshift.S3CopyToTable.metadata_columns", new_callable=mock.PropertyMock, return_value=[('created_tz', 'TIMESTAMP')]) + @mock.patch("luigi.contrib.redshift.S3CopyToTable._column_exists", return_value=False) + @mock.patch("luigi.contrib.redshift.S3CopyToTable._add_column_to_table") + @mock.patch("luigi.contrib.redshift.RedshiftTarget") + def test_copy_add_if_meta_columns_not_already_exists(self, + mock_redshift_target, + mock_add_to_table, + mock_columns_exists, + mock_metadata_columns, + mock_metadata_columns_enabled): + task = DummyS3CopyToTableKey() + task.run() + + self.assertTrue(mock_add_to_table.called) + + @mock.patch("luigi.contrib.redshift.S3CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True) + @mock.patch("luigi.contrib.redshift.S3CopyToTable.metadata_columns", new_callable=mock.PropertyMock, return_value=[('created_tz', 'TIMESTAMP')]) + @mock.patch("luigi.contrib.redshift.S3CopyToTable._column_exists", return_value=False) + @mock.patch("luigi.contrib.redshift.RedshiftTarget") + def test_copy_add_regular_column(self, + mock_redshift_target, + mock_columns_exists, + mock_metadata_columns, + mock_metadata_columns_enabled): + task = DummyS3CopyToTableKey(table='my_test_table') + task.run() + + mock_cursor = (mock_redshift_target.return_value + .connect + .return_value + .cursor + .return_value) + + executed_query = mock_cursor.execute.call_args_list[1][0][0] + + expected_output = "ALTER TABLE {table} " \ + "ADD COLUMN {column} {type};".format(table='my_test_table', column='created_tz', type='TIMESTAMP') + + self.assertEqual(executed_query, expected_output) + + @mock.patch("luigi.contrib.redshift.S3CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True) + @mock.patch("luigi.contrib.redshift.S3CopyToTable.metadata_columns", new_callable=mock.PropertyMock, return_value=[('created_tz', 'TIMESTAMP', 'bytedict')]) + @mock.patch("luigi.contrib.redshift.S3CopyToTable._column_exists", return_value=False) + @mock.patch("luigi.contrib.redshift.RedshiftTarget") + def test_copy_add_encoded_column(self, + mock_redshift_target, + mock_columns_exists, + mock_metadata_columns, + mock_metadata_columns_enabled): + task = DummyS3CopyToTableKey(table='my_test_table') + task.run() + + mock_cursor = (mock_redshift_target.return_value + .connect + .return_value + .cursor + .return_value) + + executed_query = mock_cursor.execute.call_args_list[1][0][0] + + expected_output = "ALTER TABLE {table} " \ + "ADD COLUMN {column} {type} ENCODE {encoding};".format(table='my_test_table', column='created_tz', + type='TIMESTAMP', + encoding='bytedict') + + self.assertEqual(executed_query, expected_output) + + @mock.patch("luigi.contrib.redshift.S3CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True) + @mock.patch("luigi.contrib.redshift.S3CopyToTable.metadata_columns", new_callable=mock.PropertyMock, return_value=[('created_tz')]) + @mock.patch("luigi.contrib.redshift.S3CopyToTable._column_exists", return_value=False) + @mock.patch("luigi.contrib.redshift.RedshiftTarget") + def test_copy_raise_error_on_no_column_type(self, + mock_redshift_target, + mock_columns_exists, + mock_metadata_columns, + mock_metadata_columns_enabled): + task = DummyS3CopyToTableKey() + + with self.assertRaises(ValueError): + task.run() + + @mock.patch("luigi.contrib.redshift.S3CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True) + @mock.patch("luigi.contrib.redshift.S3CopyToTable.metadata_columns", new_callable=mock.PropertyMock, + return_value=[('created_tz', 'TIMESTAMP', 'bytedict', '42')]) + @mock.patch("luigi.contrib.redshift.S3CopyToTable._column_exists", return_value=False) + @mock.patch("luigi.contrib.redshift.RedshiftTarget") + def test_copy_raise_error_on_invalid_column(self, + mock_redshift_target, + mock_columns_exists, + mock_metadata_columns, + mock_metadata_columns_enabled): + task = DummyS3CopyToTableKey() + + with self.assertRaises(ValueError): + task.run() + + @mock.patch("luigi.contrib.redshift.S3CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True) + @mock.patch("luigi.contrib.redshift.S3CopyToTable.metadata_queries", new_callable=mock.PropertyMock, return_value=['SELECT 1 FROM X', 'SELECT 2 FROM Y']) + @mock.patch("luigi.contrib.redshift.RedshiftTarget") + def test_post_copy_metacolumns(self, + mock_redshift_target, + mock_metadata_queries, + mock_metadata_columns_enabled): + task = DummyS3CopyToTableKey() + task.run() + + mock_cursor = (mock_redshift_target.return_value + .connect + .return_value + .cursor + .return_value) + + executed_query = mock_cursor.execute.call_args_list[2][0][0] + expected_output = "SELECT 1 FROM X" + self.assertEqual(executed_query, expected_output) + + executed_query = mock_cursor.execute.call_args_list[3][0][0] + expected_output = "SELECT 2 FROM Y" + self.assertEqual(executed_query, expected_output) diff --git a/test/contrib/redshift_test.py b/test/contrib/redshift_test.py index c6b23bf2b1..5433c6d186 100644 --- a/test/contrib/redshift_test.py +++ b/test/contrib/redshift_test.py @@ -80,6 +80,36 @@ def s3_load_path(self): return 's3://%s/%s' % (BUCKET, KEY) +class DummyS3CopyJSONToTableBase(luigi.contrib.redshift.S3CopyJSONToTable): + # Class attributes taken from `DummyPostgresImporter` in + # `../postgres_test.py`. + aws_access_key_id = AWS_ACCESS_KEY + aws_secret_access_key = AWS_SECRET_KEY + + host = 'dummy_host' + database = 'dummy_database' + user = 'dummy_user' + password = 'dummy_password' + table = luigi.Parameter(default='dummy_table') + columns = luigi.TupleParameter( + default=( + ('some_text', 'varchar(255)'), + ('some_int', 'int'), + ) + ) + + copy_options = '' + prune_table = '' + prune_column = '' + prune_date = '' + + jsonpath = '' + copy_json_options = '' + + def s3_load_path(self): + return 's3://%s/%s' % (BUCKET, KEY) + + class DummyS3CopyToTableKey(DummyS3CopyToTableBase): aws_access_key_id = AWS_ACCESS_KEY aws_secret_access_key = AWS_SECRET_KEY @@ -130,6 +160,68 @@ def test_from_config(self): self.assertEqual(self.aws_secret_access_key, "config_secret") +class TestS3CopyToTableWithMetaColumns(unittest.TestCase): + @mock.patch("luigi.contrib.redshift.S3CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True) + @mock.patch("luigi.contrib.redshift.S3CopyToTable._add_metadata_columns") + @mock.patch("luigi.contrib.redshift.S3CopyToTable.post_copy_metacolumns") + @mock.patch("luigi.contrib.redshift.RedshiftTarget") + def test_copy_with_metadata_columns_enabled(self, + mock_redshift_target, + mock_add_columns, + mock_update_columns, + mock_metadata_columns_enabled): + task = DummyS3CopyToTableKey() + task.run() + + self.assertTrue(mock_add_columns.called) + self.assertTrue(mock_update_columns.called) + + @mock.patch("luigi.contrib.redshift.S3CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=False) + @mock.patch("luigi.contrib.redshift.S3CopyToTable._add_metadata_columns") + @mock.patch("luigi.contrib.redshift.S3CopyToTable.post_copy_metacolumns") + @mock.patch("luigi.contrib.redshift.RedshiftTarget") + def test_copy_with_metadata_columns_disabled(self, + mock_redshift_target, + mock_add_columns, + mock_update_columns, + mock_metadata_columns_enabled): + task = DummyS3CopyToTableKey() + task.run() + + self.assertFalse(mock_add_columns.called) + self.assertFalse(mock_update_columns.called) + + @mock.patch("luigi.contrib.redshift.S3CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True) + @mock.patch("luigi.contrib.redshift.S3CopyToTable._add_metadata_columns") + @mock.patch("luigi.contrib.redshift.S3CopyToTable.post_copy_metacolumns") + @mock.patch("luigi.contrib.redshift.RedshiftTarget") + def test_json_copy_with_metadata_columns_enabled(self, + mock_redshift_target, + mock_add_columns, + mock_update_columns, + mock_metadata_columns_enabled): + task = DummyS3CopyJSONToTableBase() + task.run() + + self.assertTrue(mock_add_columns.called) + self.assertTrue(mock_update_columns.called) + + @mock.patch("luigi.contrib.redshift.S3CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=False) + @mock.patch("luigi.contrib.redshift.S3CopyToTable._add_metadata_columns") + @mock.patch("luigi.contrib.redshift.S3CopyToTable.post_copy_metacolumns") + @mock.patch("luigi.contrib.redshift.RedshiftTarget") + def test_json_copy_with_metadata_columns_disabled(self, + mock_redshift_target, + mock_add_columns, + mock_update_columns, + mock_metadata_columns_enabled): + task = DummyS3CopyJSONToTableBase() + task.run() + + self.assertFalse(mock_add_columns.called) + self.assertFalse(mock_update_columns.called) + + class TestS3CopyToTable(unittest.TestCase): @mock.patch("luigi.contrib.redshift.RedshiftTarget") def test_copy_missing_creds(self, mock_redshift_target): From 2eea73f47ec41827c9c55ca303fc0e1b863159ab Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Tue, 10 Jul 2018 10:28:44 -0400 Subject: [PATCH 4/5] Add additional documentation on how to use the new mixin --- luigi/contrib/rdbms.py | 51 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/luigi/contrib/rdbms.py b/luigi/contrib/rdbms.py index ced695d056..de661c9100 100644 --- a/luigi/contrib/rdbms.py +++ b/luigi/contrib/rdbms.py @@ -28,11 +28,60 @@ class _MetadataColumnsMixin(object): - """Provice an additinal behavior that adds columns and values to tables + """Provide an additional behavior that adds columns and values to tables This mixin is used to provide an additional behavior that allow a task to add generic metadata columns to every table created for both PSQL and Redshift. + + Example: + + This is a use-case example of how this mixin could come handy and how + to use it. + + .. code:: python + + class CommonMetaColumnsBehavior(object): + def update_report_execution_date_query(self): + query = "UPDATE {0} " \ + "SET date_param = DATE '{1}' " \ + "WHERE date_param IS NULL".format(self.table, self.date) + + return query + + @property + def metadata_columns(self): + if self.date: + cols.append(('date_param', 'VARCHAR')) + + return cols + + @property + def metadata_queries(self): + queries = [self.update_created_tz_query()] + if self.date: + queries.append(self.update_report_execution_date_query()) + + return queries + + + class RedshiftCopyCSVToTableFromS3(CommonMetaColumnsBehavior, redshift.S3CopyToTable): + "We have some business override here that would only add noise to the + example, so let's assume that this is only a shell." + pass + + + class UpdateTableA(RedshiftCopyCSVToTableFromS3): + date = luigi.Parameter() + table = 'tableA' + + def queries(): + return [query_content_for('/queries/deduplicate_dupes.sql')] + + + class UpdateTableB(RedshiftCopyCSVToTableFromS3): + date = luigi.Parameter() + table = 'tableB' """ @property def metadata_columns(self): From 70d7023180b1314aa125891a921032628fe62f85 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Thu, 2 Aug 2018 13:05:02 -0400 Subject: [PATCH 5/5] Raise ValueError on invalid metadata_columns for RDBMS If the count of metadata_columns is 0 and we're expecting to add them to the table, then we raise an error because that is an invalid flow. The contributor is required to have metadata_columns values if we want to add that column to the table. --- luigi/contrib/rdbms.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/luigi/contrib/rdbms.py b/luigi/contrib/rdbms.py index de661c9100..a955c87275 100644 --- a/luigi/contrib/rdbms.py +++ b/luigi/contrib/rdbms.py @@ -104,8 +104,8 @@ def _add_metadata_columns(self, connection): for column in self.metadata_columns: if len(column) == 0: - logger.info('Unable to infer column information from column {column} for {table}'.format(column=column, table=self.table)) - break + raise ValueError("_add_metadata_columns is unable to infer column information from column {column} for {table}".format(column=column, + table=self.table)) column_name = column[0] if not self._column_exists(cursor, column_name):