8000 Add metadata columns to the RDBMS contrib by thisiscab · Pull Request #2440 · spotify/luigi · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add metadata columns to the RDBMS contrib #2440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions luigi/contrib/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ def run(self):
self.init_copy(connection)
self.copy(cursor, tmp_file)
self.post_copy(connection)
if self.enable_metadata_columns:
self.post_copy_metacolumns(cursor)
except psycopg2.ProgrammingError as e:
if e.pgcode == psycopg2.errorcodes.UNDEFINED_TABLE and attempt == 0:
# if first attempt fails with "relation not found", try creating table
Expand Down
123 changes: 122 additions & 1 deletion luigi/contrib/rdbms.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,125 @@
logger = logging.getLogger('luigi-interface')


class CopyToTable(luigi.task.MixinNaiveBulkComplete, luigi.Task):
class _MetadataColumnsMixin(object):
"""Provide an additional behavior that adds columns and values to tables

This mixin is used to provide an additional behavior that allow a task to
add generic metadata columns to every table created for both PSQL and
Redshift.

Example:

This is a use-case example of how this mixin could come handy and how
to use it.

.. code:: python

class CommonMetaColumnsBehavior(object):
def update_report_execution_date_query(self):
query = "UPDATE {0} " \
"SET date_param = DATE '{1}' " \
"WHERE date_param IS NULL".format(self.table, self.date)

return query

@property
def metadata_columns(self):
if self.date:
cols.append(('date_param', 'VARCHAR'))

return cols

@property
def metadata_queries(self):
queries = [self.update_created_tz_query()]
if self.date:
queries.append(self.update_report_execution_date_query())

return queries


class RedshiftCopyCSVToTableFromS3(CommonMetaColumnsBehavior, redshift.S3CopyToTable):
"We have some business override here that would only add noise to the
example, so let's assume that this is only a shell."
pass


class UpdateTableA(RedshiftCopyCSVToTableFromS3):
date = luigi.Parameter()
table = 'tableA'

def queries():
return [query_content_for('/queries/deduplicate_dupes.sql')]


class UpdateTableB(RedshiftCopyCSVToTableFromS3):
date = luigi.Parameter()
table = 'tableB'
"""
@property
def metadata_columns(self):
"""Returns the default metadata columns.

Those columns are columns that we want each tables to have by default.
"""
return []

@property
def metadata_queries(self):
return []

@property
def enable_metadata_columns(self):
return False

def _add_metadata_columns(self, connection):
cursor = connection.cursor()

for column in self.metadata_columns:
if len(column) == 0:
raise ValueError("_add_metadata_columns is unable to infer column information from column {column} for {table}".format(column=column,
table=self.table))

column_name = column[0]
if not self._column_exists(cursor, column_name):
logger.info('Adding missing metadata column {column} to {table}'.format(column=column, table=self.table))
self._add_column_to_table(cursor, column)

def _column_exists(self, cursor, column_name):
if '.' in self.table:
schema, table = se 10000 lf.table.split('.')
query = "SELECT 1 AS column_exists " \
"FROM information_schema.columns " \
"WHERE table_schema = LOWER('{0}') AND table_name = LOWER('{1}') AND column_name = LOWER('{2}') LIMIT 1;".format(schema, table, column_name)
else:
query = "SELECT 1 AS column_exists " \
"FROM information_schema.columns " \
"WHERE table_name = LOWER('{0}') AND column_name = LOWER('{1}') LIMIT 1;".format(self.table, column_name)

cursor.execute(query)
result = cursor.fetchone()
return bool(result)

def _add_column_to_table(self, cursor, column):
if len(column) == 1:
raise ValueError("_add_column_to_table() column type not specified for {column}".format(column=column[0]))
elif len(column) == 2:
query = "ALTER TABLE {table} ADD COLUMN {column};".format(table=self.table, column=' '.join(column))
elif len(column) == 3:
query = "ALTER TABLE {table} ADD COLUMN {column} ENCODE {encoding};".format(table=self.table, column=' '.join(column[0:2]), encoding=column[2])
else:
raise ValueError("_add_column_to_table() found no matching behavior for {column}".format(column=column))

cursor.execute(query)

def post_copy_metacolumns(self, cursor):
logger.info('Executing post copy metadata queries')
for query in self.metadata_queries:
cursor.execute(query)


class CopyToTable(luigi.task.MixinNaiveBulkComplete, _MetadataColumnsMixin, luigi.Task):
"""
An abstract task for inserting a data set into RDBMS.

Expand Down Expand Up @@ -120,6 +238,9 @@ def init_copy(self, connection):
if hasattr(self, "clear_table"):
raise Exception("The clear_table attribute has been removed. Override init_copy instead!")

if self.enable_metadata_columns:
self._add_metadata_columns(connection.cursor())

def post_copy(self, connection):
"""
Override to perform custom queries.
Expand Down
14 changes: 14 additions & 0 deletions luigi/contrib/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ def run(self):
self.copy(cursor, path)
self.post_copy(cursor)

if self.enable_metadata_columns:
self.post_copy_metacolumns(cursor)

# update marker table
output.touch(connection)
connection.commit()
Expand Down Expand Up @@ -472,6 +475,9 @@ def init_copy(self, connection):
logger.info("Creating table %s", self.table)
self.create_table(connection)

if self.enable_metadata_columns:
self._add_metadata_columns(connection)

if self.do_truncate_table:
logger.info("Truncating table %s", self.table)
self.truncate_table(connection)
Expand All @@ -488,6 +494,14 @@ def post_copy(self, cursor):
for query in self.queries:
cursor.execute(query)

def post_copy_metacolums(self, cursor):
"""
Performs post-copy to fill metadata columns.
"""
logger.info('Executing post copy metadata queries')
for query in self.metadata_queries:
cursor.execute(query)


class S3CopyJSONToTable(S3CopyToTable, _CredentialsMixin):
"""
Expand Down
52 changes: 52 additions & 0 deletions test/contrib/postgres_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,55 @@ def test_bulk_complete(self, mock_connect):
'DummyPostgresQuery_2015_01_06_f91a47ec40',
])
self.assertFalse(task.complete())


@attr('postgres')
class TestCopyToTableWithMetaColumns(unittest.TestCase):
@mock.patch("luigi.contrib.postgres.CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True)
@mock.patch("luigi.contrib.postgres.CopyToTable._add_metadata_columns")
@mock.patch("luigi.contrib.postgres.CopyToTable.post_copy_metacolumns")
@mock.patch("luigi.contrib.postgres.CopyToTable.rows", return_value=['row1', 'row2'])
@mock.patch("luigi.contrib.postgres.PostgresTarget")
@mock.patch('psycopg2.connect')
def test_copy_with_metadata_columns_enabled(self,
mock_connect,
mock_redshift_target,
mock_rows,
mock_add_columns,
mock_update_columns,
mock_metadata_columns_enabled):

task = DummyPostgresImporter(date=datetime.datetime(1991, 3, 24))

mock_cursor = MockPostgresCursor([task.task_id])
mock_connect.return_value.cursor.return_value = mock_cursor

task = DummyPostgresImporter(date=datetime.datetime(1991, 3, 24))
task.run()

self.assertTrue(mock_add_columns.called)
self.assertTrue(mock_update_columns.called)

@mock.patch("luigi.contrib.postgres.CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=False)
@mock.patch("luigi.contrib.postgres.CopyToTable._add_metadata_columns")
@mock.patch("luigi.contrib.postgres.CopyToTable.post_copy_metacolumns")
@mock.patch("luigi.contrib.postgres.CopyToTable.rows", return_value=['row1', 'row2'])
@mock.patch("luigi.contrib.postgres.PostgresTarget")
@mock.patch('psycopg2.connect')
def test_copy_with_metadata_columns_disabled(self,
mock_connect,
mock_redshift_target,
mock_rows,
mock_add_columns,
mock_update_columns,
mock_metadata_columns_enabled):

task = DummyPostgresImporter(date=datetime.datetime(1991, 3, 24))

mock_cursor = MockPostgresCursor([task.task_id])
mock_connect.return_value.cursor.return_value = mock_cursor

task.run()

self.assertFalse(mock_add_columns.called)
self.assertFalse(mock_update_columns.called)
Loading
0