8000 Use column names in COPY statements. by dimitri · Pull Request #290 · dimitri/pgcopydb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Use column names in COPY statements. #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions src/bin/pgcopydb/copydb.c
Original file line number Diff line number Diff line change
Expand Up @@ -826,21 +826,6 @@ copydb_init_table_specs(CopyTableDataSpec *tableSpecs,

strlcpy(tableSpecs->part.partKey, source->partKey, NAMEDATALEN);

/*
* Prepare the COPY command.
*
* The way schema_list_partitions prepares the boundaries is non
* overlapping, so we can use the BETWEEN operator to select our source
* rows in the COPY sub-query.
*/
sformat(tableSpecs->part.copyQuery, sizeof(tableSpecs->part.copyQuery),
"(SELECT * FROM %s"
" WHERE \"%s\" BETWEEN %lld AND %lld)",
tableSpecs->qname,
tableSpecs->part.partKey,
(long long) tableSpecs->part.min,
(long long) tableSpecs->part.max);

/* now compute the table-specific paths we are using in copydb */
if (!copydb_init_tablepaths_for_part(tableSpecs->cfPaths,
&(tableSpecs->tablePaths),
Expand Down
5 changes: 4 additions & 1 deletion src/bin/pgcopydb/copydb.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ typedef struct CopyTableDataPartSpec
int64_t max; /* AND partKey < max */

char partKey[NAMEDATALEN];
char copyQuery[BUFSIZE]; /* COPY (...) TO STDOUT */
} CopyTableDataPartSpec;


Expand Down Expand Up @@ -475,6 +474,10 @@ bool copydb_table_parts_are_all_done(CopyDataSpec *specs,
bool *allPartsDone,
bool *isBeingProcessed);

bool copydb_prepare_copy_query(CopyTableDataSpec *tableSpecs,
PQExpBuffer query,
bool source);

/* blobs.c */
bool copydb_start_blob_process(CopyDataSpec *specs);
bool copydb_copy_blobs(CopyDataSpec *specs);
Expand Down
192 changes: 180 additions & 12 deletions src/bin/pgcopydb/schema.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <time.h>
#include <unistd.h>

#include "parson.h"

#include "defaults.h"
#include "env_utils.h"
#include "file_utils.h"
Expand Down Expand Up @@ -125,6 +127,8 @@ static bool parseCurrentSourceTable(PGresult *result,
int rowNumber,
SourceTable *table);

static bool parseAttributesArray(SourceTable *table, JSON_Value *json);

static void getSequenceArray(void *ctx, PGresult *result);

static bool parseCurrentSourceSequence(PGresult *result,
Expand Down Expand Up @@ -729,11 +733,29 @@ struct FilteringQueries listSourceTablesSQL[] = {
" regexp_replace(n.nspname, '[\\n\\r]', ' '), "
" regexp_replace(c.relname, '[\\n\\r]', ' '), "
" regexp_replace(auth.rolname, '[\\n\\r]', ' ')), "
" pkeys.attname as partkey"
" pkeys.attname as partkey, "
" attrs.js as attributes "

" from pg_catalog.pg_class c"
" join pg_catalog.pg_namespace n on c.relnamespace = n.oid"
" join pg_roles auth ON auth.oid = c.relowner"
" join lateral ( "
" with atts as "
" ("
" select attnum, atttypid, attname, "
" i.indrelid is not null as attisprimary "
" from pg_attribute a "
" left join pg_index i "
" on i.indrelid = a.attrelid "
" and a.attnum = ANY(i.indkey) "
" and i.indisprimary "
" where a.attrelid = c.oid "
" and a.attnum > 0 "
" order by attnum "
" ) "
" select json_agg(row_to_json(atts)) as js "
" from atts "
" ) as attrs on true"
" left join pgcopydb_table_size ts on ts.oid = c.oid"

/* find a copy partition key candidate */
Expand Down Expand Up @@ -786,11 +808,29 @@ struct FilteringQueries listSourceTablesSQL[] = {
" regexp_replace(n.nspname, '[\\n\\r]', ' '), "
" regexp_replace(c.relname, '[\\n\\r]', ' '), "
" regexp_replace(auth.rolname, '[\\n\\r]', ' ')), "
" pkeys.attname as partkey"
" pkeys.attname as partkey, "
" attrs.js as attributes "

" from pg_catalog.pg_class c "
" join pg_catalog.pg_namespace n on c.relnamespace = n.oid "
" join pg_roles auth ON auth.oid = c.relowner"
" join lateral ( "
" with atts as "
" ("
" select attnum, atttypid, attname, "
" i.indrelid is not null as attisprimary "
" from pg_attribute a "
" left join pg_index i "
" on i.indrelid = a.attrelid "
" and a.attnum = ANY(i.indkey) "
" and i.indisprimary "
" where a.attrelid = c.oid "
" and a.attnum > 0 "
" order by attnum "
" ) "
" select json_agg(row_to_json(atts)) as js "
" from atts "
" ) as attrs on true"
" left join pgcopydb_table_size ts on ts.oid = c.oid"

/* include-only-table */
Expand Down Expand Up @@ -845,11 +885,29 @@ struct FilteringQueries listSourceTablesSQL[] = {
" regexp_replace(n.nspname, '[\\n\\r]', ' '), "
" regexp_replace(c.relname, '[\\n\\r]', ' '), "
" regexp_replace(auth.rolname, '[\\n\\r]', ' ')), "
" pkeys.attname as partkey"
" pkeys.attname as partkey, "
" attrs.js as attributes "

" from pg_catalog.pg_class c "
" join pg_catalog.pg_namespace n on c.relnamespace = n.oid "
" join pg_roles auth ON auth.oid = c.relowner"
" join lateral ( "
" with atts as "
" ("
" select attnum, atttypid, attname, "
" i.indrelid is not null as attisprimary "
" from pg_attribute a "
" left join pg_index i "
" on i.indrelid = a.attrelid "
" and a.attnum = ANY(i.indkey) "
" and i.indisprimary "
" where a.attrelid = c.oid "
" and a.attnum > 0 "
" order by attnum "
" ) "
" select json_agg(row_to_json(atts)) as js "
" from atts "
" ) as attrs on true"
" left join pgcopydb_table_size ts on ts.oid = c.oid"

/* exclude-schema */
Expand Down Expand Up @@ -918,11 +976,29 @@ struct FilteringQueries listSourceTablesSQL[] = {
" regexp_replace(n.nspname, '[\\n\\r]', ' '), "
" regexp_replace(c.relname, '[\\n\\r]', ' '), "
" regexp_replace(auth.rolname, '[\\n\\r]', ' ')), "
" pkeys.attname as partkey"
" pkeys.attname as partkey, "
" attrs.js as attributes "

" from pg_catalog.pg_class c "
" join pg_catalog.pg_namespace n on c.relnamespace = n.oid "
" join pg_roles auth ON auth.oid = c.relowner"
" join lateral ( "
" with atts as "
" ("
" select attnum, atttypid, attname, "
" i.indrelid is not null as attisprimary "
" from pg_attribute a "
" left join pg_index i "
" on i.indrelid = a.attrelid "
" and a.attnum = ANY(i.indkey) "
" and i.indisprimary "
" where a.attrelid = c.oid "
" and a.attnum > 0 "
" order by attnum "
" ) "
" select json_agg(row_to_json(atts)) as js "
" from atts "
" ) as attrs on true"
" left join pgcopydb_table_size ts on ts.oid = c.oid"

/* include-only-table */
Expand Down Expand Up @@ -980,11 +1056,29 @@ struct FilteringQueries listSourceTablesSQL[] = {
" regexp_replace(n.nspname, '[\\n\\r]', ' '), "
" regexp_replace(c.relname, '[\\n\\r]', ' '), "
" regexp_replace(auth.rolname, '[\\n\\r]', ' ')), "
" pkeys.attname as partkey"
" pkeys.attname as partkey, "
" attrs.js as attributes "

" from pg_catalog.pg_class c "
" join pg_catalog.pg_namespace n on c.relnamespace = n.oid "
" join pg_roles auth ON auth.oid = c.relowner"
" join lateral ( "
" with atts as "
" ("
" select attnum, atttypid, attname, "
" i.indrelid is not null as attisprimary "
" from pg_attribute a "
" left join pg_index i "
" on i.indrelid = a.attrelid "
" and a.attnum = ANY(i.indkey) "
" and i.indisprimary & 6D4E quot;
" where a.attrelid = c.oid "
" and a.attnum > 0 "
" order by attnum "
" ) "
" select json_agg(row_to_json(atts)) as js "
" from atts "
" ) as attrs on true"
" left join pgcopydb_table_size ts on ts.oid = c.oid"

/* exclude-schema */
Expand Down Expand Up @@ -1120,7 +1214,8 @@ struct FilteringQueries listSourceTablesNoPKSQL[] = {
" regexp_replace(n.nspname, '[\\n\\r]', ' '), "
" regexp_replace(r.relname, '[\\n\\r]', ' '), "
" regexp_replace(auth.rolname, '[\\n\\r]', ' ')),"
" NULL as partkey"
" NULL as partkey,"
" NULL as attributes"

" from pg_class r "
" join pg_namespace n ON n.oid = r.relnamespace "
Expand Down Expand Up @@ -1161,7 +1256,8 @@ struct FilteringQueries listSourceTablesNoPKSQL[] = {
" regexp_replace(n.nspname, '[\\n\\r]', ' '), "
" regexp_replace(r.relname, '[\\n\\r]', ' '), "
" regexp_replace(auth.rolname, '[\\n\\r]', ' ')),"
" NULL as partkey"
" NULL as partkey,"
" NULL as attributes"

" from pg_class r "
" join pg_namespace n ON n.oid = r.relnamespace "
Expand Down Expand Up @@ -1207,7 +1303,8 @@ struct FilteringQueries listSourceTablesNoPKSQL[] = {
" regexp_replace(n.nspname, '[\\n\\r]', ' '), "
" regexp_replace(r.relname, '[\\n\\r]', ' '), "
" regexp_replace(auth.rolname, '[\\n\\r]', ' ')),"
" NULL as partkey"
" NULL as partkey,"
" NULL as attributes"

" from pg_class r "
" join pg_namespace n ON n.oid = r.relnamespace "
Expand Down Expand Up @@ -1267,7 +1364,8 @@ struct FilteringQueries listSourceTablesNoPKSQL[] = {
" regexp_replace(n.nspname, '[\\n\\r]', ' '), "
" regexp_replace(r.relname, '[\\n\\r]', ' '), "
" regexp_replace(auth.rolname, '[\\n\\r]', ' ')),"
" NULL as partkey"
" NULL as partkey,"
" NULL as attributes"

" from pg_class r "
" join pg_namespace n ON n.oid = r.relnamespace "
Expand Down Expand Up @@ -1316,7 +1414,8 @@ struct FilteringQueries listSourceTablesNoPKSQL[] = {
" regexp_replace(n.nspname, '[\\n\\r]', ' '), "
" regexp_replace(r.relname, '[\\n\\r]', ' '), "
" regexp_replace(auth.rolname, '[\\n\\r]', ' ')),"
" NULL as partkey"
" NULL as partkey,"
" NULL as attributes"

" from pg_class r "
" join pg_namespace n ON n.oid = r.relnamespace "
Expand Down Expand Up @@ -3641,9 +3740,9 @@ getTableArray(void *ctx, PGresult *result)

log_debug("getTableArray: %d", nTuples);

if (PQnfields(result) != 9)
if (PQnfields(result) != 10)
{
log_error("Query returned %d columns, expected 9", PQnfields(result));
log_error("Query returned %d columns, expected 10", PQnfields(result));
context->parsedOk = false;
return;
}
Expand Down Expand Up @@ -3820,12 +3919,81 @@ parseCurrentSourceTable(PGresult *result, int rowNumber, SourceTable *table)
}
}

/* 10. attributes */
if (PQgetisnull(result, rowNumber, 9))
{
/* the query didn't care to add the attributes, skip parsing them */
table->attributes.count = 0;
}
else
{
value = PQgetvalue(result, rowNumber, 9);

JSON_Value *json = json_parse_string(value);

if (!parseAttributesArray(table, json))
{
log_error("Failed to parse table \"%s\".\"%s\" attribute array: %s",
table->nspname,
table->relname,
value);
++errors;
}

json_value_free(json);
}

log_trace("parseCurrentSourceTable: %s.%s", table->nspname, table->relname);

return errors == 0;
}


/*
* parseAttributesArray parses a JSON representation of table list of
* attributes and allocates the table's attribute array.
*/
static bool
parseAttributesArray(SourceTable *table, JSON_Value *json)
{
if (json == NULL || json_type(json) != JSONArray)
{
return false;
}

JSON_Array *jsAttsArray = json_array(json);

int count = json_array_get_count(jsAttsArray);

table->attributes.count = count;
table->attributes.array =
(SourceTableAttribute *) calloc(count, sizeof(SourceTableAttribute));

if (table->attributes.array == NULL)
{
log_fatal(ALLOCATION_FAILED_ERROR);
return false;
}

for (int i = 0; i < count; i++)
{
SourceTableAttribute *attr = &(table->attributes.array[i]);
JSON_Object *jsAttr = json_array_get_object(jsAttsArray, i);

attr->attnum = json_object_get_number(jsAttr, "attnum");
attr->atttypid = json_object_get_number(jsAttr, "atttypid");

strlcpy(attr->attname,
json_object_get_string(jsAttr, "attname"),
sizeof(attr->attname));

attr->attisprimary = json_object_get_boolean(jsAttr, "attisprimary");
}

return true;
}


/*
* getSequenceArray loops over the SQL result for the sequence array query and
* allocates an array of tables then populates it with the query result.
Expand Down
16 changes: 16 additions & 0 deletions src/bin/pgcopydb/schema.h
Original file line number Diff line number Diff line change
94A9 Expand Up @@ -125,6 +125,20 @@ typedef struct SourceTablePartsArray
} SourceTablePartsArray;


typedef struct SourceTableAttribute
{
int attnum;
uint32_t atttypid;
char attname[NAMEDATALEN];
bool attisprimary;
} SourceTableAttribute;

typedef struct SourceTableAttributeArray
{
int count;
SourceTableAttribute *array; /* malloc'ed area */
} SourceTableAttributeArray;

/* forward declaration */
struct SourceIndexList;

Expand All @@ -142,6 +156,8 @@ typedef struct SourceTable
char partKey[NAMEDATALEN];
SourceTablePartsArray partsArray;

SourceTableAttributeArray attributes;

struct SourceIndexList *firstIndex;
struct SourceIndexList *lastIndex;

Expand Down
Loading
0