8000 Use libpq Single Row Mode when fetching catalogs. by dimitri · Pull Request #584 · dimitri/pgcopydb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
8000

Use libpq Single Row Mode when fetching catalogs. #584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion src/bin/pgcopydb/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -1399,6 +1399,63 @@ catalog_add_attributes(DatabaseCatalog *catalog, SourceTable *table)
}


/*
* catalog_add_s_table_part INSERTs a SourceTableParts to our internal catalogs
* database (s_table_parts).
*/
bool
catalog_add_s_table_part(DatabaseCatalog *catalog, SourceTable *table)
{
sqlite3 *db = catalog->db;

if (db == NULL)
{
log_error("BUG: catalog_add_s_table_part: db is NULL");
return false;
}

char *sql =
"insert into s_table_part(oid, partnum, partcount, min, max, count)"
"values($1, $2, $3, $4, $5, $6)";

SQLiteQuery query = { 0 };

if (!catalog_sql_prepare(db, sql, &query))
{
/* errors have already been logged */
return false;
}

SourceTableParts *part = &(table->partition);

BindParam params[] = {
{ BIND_PARAMETER_TYPE_INT64, "oid", table->oid, NULL },
{ BIND_PARAMETER_TYPE_INT64, "partnum", part->partNumber, NULL },
{ BIND_PARAMETER_TYPE_INT64, "partcount", part->partCount, NULL },
{ BIND_PARAMETER_TYPE_INT64, "min", part->min, NULL },
{ BIND_PARAMETER_TYPE_INT64, "max", part->max, NULL },
{ BIND_PARAMETER_TYPE_INT64, "count", part->count, NULL },
};

int count = sizeof(params) / sizeof(params[0]);

if (!catalog_sql_bind(&query, params, count))
{
/* errors have already been logged */
return false;
}

/* now execute the query, which does not return any row */
if (!catalog_sql_execute_once(&query))
{
/* errors have already been logged */
return false;
}

return true;
}


/*
* catalog_add_s_table_parts INSERTs a SourceTableParts array to our internal
* catalogs database (s_table_parts).
Expand Down Expand Up @@ -4369,7 +4426,7 @@ catalog_lookup_filter_by_rlname(DatabaseCatalog *catalog,


/*
* catalog_s_index_fetch fetches a SourceIndex entry from a SQLite ppStmt
* catalog_filter_fetch fetches a CatalogFilter entry from a SQLite ppStmt
* result set.
*/
bool
Expand Down
1 change: 1 addition & 0 deletions src/bin/pgcopydb/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ bool catalog_count_fetch(SQLiteQuery *query);
*/
bool catalog_add_s_table(DatabaseCatalog *catalog, SourceTable *table);
bool catalog_add_attributes(DatabaseCatalog *catalog, SourceTable *table);
bool catalog_add_s_table_part(DatabaseCatalog *catalog, SourceTable *table);
bool catalog_add_s_table_parts(DatabaseCatalog *catalog, SourceTable *table);

bool catalog_add_s_table_chksum(DatabaseCatalog *catalog,
Expand Down
3 changes: 3 additions & 0 deletions src/bin/pgcopydb/copydb_schema.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ copydb_fetch_schema_and_prepare_specs(CopyDataSpec *specs)
}
}

/* make sure we receive only one row at a time in-memory */
src->singleRowMode = true;

if (!copydb_fetch_source_schema(specs, src))
{
/* errors have already been logged */
Expand Down
64 changes: 49 additions & 15 deletions src/bin/pgcopydb/pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -1536,14 +1536,20 @@ pgsql_execute_with_params(PGSQL *pgsql, const char *sql, int paramCount,
}

/*
* TODO
*
* Use PQsetSingleRowMode(connection) to switch to select single-row mode
* and fetch only one result at a time in memory. Most queries are already
* fine with the idea, thanks to inserting the value into our SQLite
* internal catalogs. Some query still expect PQntuples() to reflect the
* actual number of tuples returned by the query etc.
* and fetch only one result at a time in memory. Works with query result
* handlers that do not expect PQntuples() to reflect the actual number of
* tuples returned by the query etc.
*/
if (pgsql->singleRowMode)
{
if (PQsetSingleRowMode(connection) != 1)
{
log_error("Failed to select single-row mode: %s",
PQerrorMessage(connection));
return false;
}
}

bool done = false;
int errors = 0;
Expand Down Expand Up @@ -1768,6 +1774,8 @@ pgsql_fetch_results(PGSQL *pgsql, bool *done,
{
PGresult *result = NULL;

bool firstResult = true;

/*
* When we got clearance that libpq did fetch the Postgres query result
* in its internal buffers, we process the result without checking for
Expand All @@ -1790,12 +1798,36 @@ pgsql_fetch_results(PGSQL *pgsql, bool *done,
return false;
}

/*
* From Postgres docs:
*
* If the query returns any rows, they are returned as individual
* PGresult objects, which look like normal query results except
* for having status code PGRES_SINGLE_TUPLE instead of
* PGRES_TUPLES_OK. After the last row, or immediately if the query
* returns zero rows, a zero-row object with status PGRES_TUPLES_OK
* is returned; this is the signal that no more rows will arrive.
*/
if (parseFun != NULL)
{
(*parseFun)(context, result);
bool skipCallback =
!firstResult &&
pgsql->singleRowMode &&
PQntuples(result) == 0 &&
PQresultStatus(result) == PGRES_TUPLES_OK;

if (!skipCallback)
{
(*parseFun)(context, result);
}
}

PQclear(result);

if (firstResult)
{
firstResult = false;
}
}

*done = true;
Expand Down Expand Up @@ -5008,22 +5040,24 @@ pgsql_table_exists(PGSQL *pgsql,
const char *relname,
bool *exists)
{
SingleValueResultContext context = { { 0 }, PGSQL_RESULT_INT, false };
SingleValueResultContext context = { { 0 }, PGSQL_RESULT_BOOL, false };

char *existsQuery =
"select 1 "
" from pg_class c "
" join pg_namespace n on n.oid = c.relnamespace "
" where n.nspname = $1 "
" and c.relname = $2";
"select exists( "
" select 1 "
" from pg_class c "
" join pg_namespace n on n.oid = c.relnamespace "
" where n.nspname = $1 "
" and c.relname = $2"
" )";

int paramCount = 2;
const Oid paramTypes[2] = { TEXTOID, TEXTOID };
const char *paramValues[2] = { nspname, relname };

if (!pgsql_execute_with_params(pgsql, existsQuery,
paramCount, paramTypes, paramValues,
&context, &fetchedRows))
&context, &parseSingleValueResult))
{
log_error("Failed to check if \"%s\".\"%s\" exists", nspname, relname);
return false;
Expand All @@ -5039,7 +5073,7 @@ pgsql_table_exists(PGSQL *pgsql,
* If the exists query returns no rows, create our table:
* pgcopydb.pgcopydb_table_size
*/
*exists = context.intVal == 1;
*exists = context.boolVal;

return true;
}
Expand Down
1 change: 1 addition & 0 deletions src/bin/pgcopydb/pgsql.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ typedef struct PGSQL
bool notificationReceived;

bool logSQL;
bool singleRowMode;
} PGSQL;


Expand Down
44 changes: 5 additions & 39 deletions src/bin/pgcopydb/schema.c
Original file line number Diff line number Diff line change
Expand Up @@ -4677,8 +4677,6 @@ getTableArray(void *ctx, PGresult *result)
SourceTableArrayContext *context = (SourceTableArrayContext *) ctx;
int nTuples = PQntuples(result);

log_debug("getTableArray: %d", nTuples);

if (PQnfields(result) != 12)
{
log_error("Query returned %d columns, expected 12", PQnfields(result));
Expand Down Expand Up @@ -5002,8 +5000,6 @@ getSequenceArray(void *ctx, PGresult *result)
SourceSequenceArrayContext *context = (SourceSequenceArrayContext *) ctx;
int nTuples = PQntuples(result);

log_debug("getSequenceArray: %d", nTuples);

if (PQnfields(result) != 7)
{
log_error("Query returned %d columns, expected 7", PQnfields(result));
Expand Down Expand Up @@ -5175,8 +5171,6 @@ getIndexArray(void *ctx, PGresult *result)
SourceIndexArrayContext *context = (SourceIndexArrayContext *) ctx;
int nTuples = PQntuples(result);

log_debug("getIndexArray: %d", nTuples);

if (PQnfields(result) != 16)
{
log_error("Query returned %d columns, expected 16", PQnfields(result));
Expand Down Expand Up @@ -5684,32 +5678,14 @@ getPartitionList(void *ctx, PGresult *result)
return;
}

/* we're not supposed to re-cycle arrays here */
if (context->table->partsArray.array != NULL)
{
/* issue a warning but let's try anyway */
log_warn("BUG? context's partsArray is not null in getPartitionList");

free(context->table->partsArray.array);
context->table->partsArray.array = NULL;
context->table->partsArray.count = 0;
}

context->table->partsArray.count = nTuples;
context->table->partsArray.array =
(SourceTableParts *) calloc(nTuples, sizeof(SourceTableParts));

if (context->table->partsArray.array == NULL)
{
log_fatal(ALLOCATION_FAILED_ERROR);
return;
}

bool parsedOk = true;

for (int rowNumber = 0; rowNumber < nTuples; rowNumber++)
{
SourceTableParts *parts = &(table->partsArray.array[rowNumber]);
SourceTableParts *parts = &(table->partition);

/* make sure to clean-up the memory area we keep re-using */
bzero(parts, sizeof(SourceTableParts));

if (!parseCurrentPartition(result, rowNumber, parts))
{
Expand All @@ -5723,27 +5699,17 @@ getPartitionList(void *ctx, PGresult *result)
(long long) parts->min,
(long long) parts->max,
parts->partCount);
}

if (parsedOk)
{
if (context->catalog != NULL && context->catalog->db != NULL)
{
if (!catalog_add_s_table_parts(context->catalog, table))
if (!catalog_add_s_table_part(context->catalog, table))
{
/* errors have already been logged */
parsedOk = false;
}
}
}

if (!parsedOk)
{
free(context->table->partsArray.array);
context->table->partsArray.array = NULL;
context->table->partsArray.count = 0;
}

context->parsedOk = parsedOk;
}

Expand Down
0