8000 Review and fix connection management for sentinel async updates. by dimitri · Pull Request #273 · dimitri/pgcopydb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
Review and fix connection management for sentinel async updates. #273
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions src/bin/pgcopydb/ld_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,24 +314,14 @@ stream_apply_send_sync_sentinel(StreamApplyContext *context)
{
PGSQL *src = &(context->src);

if (src->connectionStatementType != PGSQL_CONNECTION_MULTI_STATEMENT)
{
log_error("BUG: stream_apply_send_sync_sentinel called "
"in SINGLE statement mode");
return false;
}

if (context->sentinelQueryInProgress)
{
log_error("BUG: stream_apply_send_sync_sentinel already in progress");
return false;
}

if (!pgsql_init(src, context->source_pguri, PGSQL_CONN_SOURCE))
{
/* errors have already been logged */
return false;
}
/* we're going to keep the connection around */
context->src.connectionStatementType = PGSQL_CONNECTION_MULTI_STATEMENT;

/* limit the amount of logging of the apply process */
src->logSQL = true;
Expand Down Expand Up @@ -371,9 +361,13 @@ stream_apply_fetch_sync_sentinel(StreamApplyContext *context)
{
context->sentinelQueryInProgress = false;

/* also disconnect between async queries */
(void) pgsql_finish(&(context->src));

context->apply = sentinel.apply;
context->endpos = sentinel.endpos;
context->startpos = sentinel.startpos;
context->replay_lsn = sentinel.replay_lsn;
}

return true;
Expand Down
48 changes: 41 additions & 7 deletions src/bin/pgcopydb/ld_replay.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,19 @@ stream_apply_replay(StreamSpecs *specs)
LSN_FORMAT_ARGS(context->previousLSN));
}

/*
* The stream_replay_line read_from_stream callback is going to send async
* queries to the source server to maintain the sentinel tables. Initialize
* our connection info now.
*/
PGSQL *src = &(context->src);

if (!pgsql_init(src, context->source_pguri, PGSQL_CONN_SOURCE))
{
/* errors have already been logged */
return false;
}

ReadFromStreamContext readerContext = {
.callback = stream_replay_line,
.ctx = &ctx
Expand All @@ -115,6 +128,22 @@ stream_apply_replay(StreamSpecs *specs)
return false;
}

/*
* When we are done reading our input stream and applying changes, we might
* still have a sentinel query in flight. Make sure to terminate it now.
*/
while (context->sentinelQueryInProgress)
{
if (!stream_apply_fetch_sync_sentinel(context))
{
/* errors have already been logged */
return false;
}

/* sleep 100ms between retries */
pg_usleep(100 * 1000);
}

/* we might still have to disconnect now */
(void) pgsql_finish(&(context->pgsql));

Expand All @@ -126,9 +155,18 @@ stream_apply_replay(StreamSpecs *specs)
return false;
}

log_notice("Replayed %lld messages up to replay_lsn %X/%X",
(long long) readerContext.lineno,
LSN_FORMAT_ARGS(context->replay_lsn));
if (context->endpos != InvalidXLogRecPtr &&
context->endpos <= context->replay_lsn)
{
log_info("Replayed reached endpos %X/%X at replay_lsn %X/%X, stopping",
LSN_FORMAT_ARGS(context->endpos),
LSN_FORMAT_ARGS(context->replay_lsn));
}
else
{
log_info("Replayed up to replay_lsn %X/%X, stopping",
LSN_FORMAT_ARGS(context->replay_lsn));
}

return true;
}
Expand Down 8000 Expand Up @@ -180,10 +218,6 @@ stream_replay_line(void *ctx, const char *line, bool *stop)
/* rate limit to 1 update per second */
else if (1 < (now - context->sentinelSyncTime))
{
/* we're going to keep the connection around */
context->src.connectionStatementType =
PGSQL_CONNECTION_MULTI_STATEMENT;

if (!stream_apply_send_sync_sentinel(context))
{
/* errors have already been logged */
Expand Down
6 changes: 3 additions & 3 deletions src/bin/pgcopydb/ld_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,9 @@ startLogicalStreaming(StreamSpecs *specs)

if (cleanExit)
{
log_info("Streaming is now finished after processing %lld message%s",
(long long) privateContext.counters.total,
privateContext.counters.total > 0 ? "s" : "");
log_info("Streamed up to write_lsn %X/%X, flush_lsn %X/%X, stopping",
LSN_FORMAT_ARGS(context.tracking->written_lsn),
LSN_FORMAT_ARGS(context.tracking->flushed_lsn));
}
else if (!(asked_to_stop || asked_to_stop_fast || asked_to_quit))
{
Expand Down
0