8000 Switch to using libpq async API. by dimitri · Pull Request #488 · dimitri/pgcopydb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Switch to using libpq async API. #488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 76 additions & 28 deletions src/bin/pgcopydb/pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,6 @@ pgsql_execute_with_params(PGSQL *pgsql, const char *sql, int paramCount,
const Oid *paramTypes, const char **paramValues,
void *context, ParsePostgresResultCB *parseFun)
{
PGresult *result = NULL;
PQExpBuffer debugParameters = NULL;

PGconn *connection = pgsql_open_connection(pgsql);
Expand Down Expand Up @@ -1510,20 +1509,48 @@ pgsql_execute_with_params(PGSQL *pgsql, const char *sql, int paramCount,
}
}

int sentQuery = 0;

if (paramCount == 0)
{
result = PQexec(connection, sql);
sentQuery = PQsendQuery(connection, sql);
}
else
{
result = PQexecParams(connection, sql,
paramCount, paramTypes, paramValues,
NULL, NULL, 0);
sentQuery = PQsendQueryParams(connection, sql,
paramCount, paramTypes, paramValues,
NULL, NULL, 0);
}

if (!is_response_ok(result))
bool done = false;
int errors = 0;

while (!done)
{
pgsql_execute_log_error(pgsql, result, sql, debugParameters, context);
if (asked_to_quit || asked_to_stop || asked_to_stop_fast)
{
log_error("Postgres query was interrupted: %s", sql);

destroyPQExpBuffer(debugParameters);
(void) pgsql_finish(pgsql);

return false;
}

/* this uses select() with a timeout: we're not busy looping */
if (!pgsql_fetch_results(pgsql, &done, context, parseFun))
{
++errors;
break;
}
}

/*
* 1 is returned if the command was successfully dispatched and 0 if not.
*/
if (sentQuery == 0 || errors > 0)
{
pgsql_execute_log_error(pgsql, NULL, sql, debugParameters, context);
destroyPQExpBuffer(debugParameters);

/*
Expand All @@ -1538,15 +1565,9 @@ pgsql_execute_with_params(PGSQL *pgsql, const char *sql, int paramCount,
return false;
}

if (parseFun != NULL)
{
(*parseFun)(context, result);
}

destroyPQExpBuffer(debugParameters);

PQclear(result);
clear_results(pgsql);

if (pgsql->connectionStatementType == PGSQL_CONNECTION_SINGLE_STATEMENT)
{
(void) pgsql_finish(pgsql);
Expand Down Expand Up @@ -1719,24 +1740,43 @@ pgsql_fetch_results(PGSQL *pgsql, bool *done,
return false;
}

/* Only collect the result when we know the server is ready for it */
/* Only collect the results when we know the server is ready for it */
if (PQisBusy(conn) == 0)
{
PGresult *result = PQgetResult(conn);
PGresult *result = NULL;

if (!is_response_ok(result))
/*
* When we got clearance that libpq did fetch the Postgres query result
* in its internal buffers, we process the result without checking for
* interrupts.
*
* The reason is that pgcopydb relies internally on signaling sibling
* processes to terminate at several places, including logical
* replication client and operating mode management. It's better for
* the code that we process the already available query result now and
* let the callers check for interrupts (asked_to_stop and friends).
*/
while ((result = PQgetResult(conn)) != NULL)
{
pgsql_execute_log_error(pgsql, result, NULL, NULL, context);
return false;
}
DD84 /* remember to check PQnotifies after each PQgetResult or PQexec */
(void) pgsql_handle_notifications(pgsql);

if (parseFun != NULL)
{
(*parseFun)(context, result);
if (!is_response_ok(result))
{
pgsql_execute_log_error(pgsql, result, NULL, NULL, context);
return false;
}

if (parseFun != NULL)
{
(*parseFun)(context, result);
}

*done = true;
PQclear(result);
}

*done = true;

PQclear(result);
clear_results(pgsql);
}
Expand Down Expand Up @@ -1896,11 +1936,16 @@ pgsql_execute_log_error(PGSQL *pgsql,
PQExpBuffer debugParameters,
void *context)
{
char *sqlstate = PQresultErrorField(result, PG_DIAG_SQLSTATE);
char *sqlstate = NULL;

if (sqlstate)
if (result != NULL)
{
strlcpy(pgsql->sqlstate, sqlstate, sizeof(pgsql->sqlstate));
sqlstate = PQresultErrorField(result, PG_DIAG_SQLSTATE);

if (sqlstate)
{
strlcpy(pgsql->sqlstate, sqlstate, sizeof(pgsql->sqlstate));
}
}

char *endpoint =
Expand Down Expand Up @@ -1958,7 +2003,10 @@ pgsql_execute_log_error(PGSQL *pgsql,
pgsql->status = PG_CONNECTION_BAD;
}

PQclear(result);
if (result != NULL)
{
PQclear(result);
}
clear_results(pgsql);
}

Expand Down
0