8000 Problem: Finding replay_lsn with lsn tracking is 7.5 times slower by arajkumar · Pull Request #731 · dimitri/pgcopydb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Problem: Finding replay_lsn with lsn tracking is 7.5 times slower #731

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/bin/pgcopydb/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ static char *sourceDBcreateDDLs[] = {
"create table sentinel("
" id integer primary key check (id = 1), "
" startpos pg_lsn, endpos pg_lsn, apply bool, "
" write_lsn pg_lsn, flush_lsn pg_lsn, replay_lsn pg_lsn)",

"create table lsn_tracking(source pg_lsn, target pg_lsn)"
" write_lsn pg_lsn, flush_lsn pg_lsn, replay_lsn pg_lsn)"
};


Expand Down
123 changes: 10 additions & 113 deletions src/bin/pgcopydb/ld_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "copydb.h"
#include "env_utils.h"
#include "ld_stream.h"
#include "lsn_tracking.h"
#include "lock_utils.h"
#include "log.h"
#include "parsing_utils.h"
Expand Down Expand Up @@ -194,13 +193,6 @@ stream_apply_setup(StreamSpecs *specs, StreamApplyContext *context)
return false;
}

/* read-in the previous lsn tracking file, if it exists */
if (!lsn_tracking_read(context))
{
log_error("Failed to read LSN tracking file");
return false;
}

/* wait until the sentinel enables the apply process */
if (!stream_apply_wait_for_sentinel(specs, context))
{
Expand Down Expand Up @@ -393,13 +385,6 @@ stream_apply_wait_for_sentinel(StreamSpecs *specs, StreamApplyContext *context)
bool
stream_apply_sync_sentinel(StreamApplyContext *context, bool findDurableLSN)
{
/* now is a good time to write the LSN tracking to disk */
if (!lsn_tracking_write(context->sourceDB, context->lsnTrackingList))
{
/* errors have already been logged */
return false;
}

uint64_t durableLSN = InvalidXLogRecPtr;

/*
Expand Down Expand Up @@ -867,17 +852,6 @@ stream_apply_sql(StreamApplyContext *context,
return true;
}

/*
* An idle source producing only KEEPALIVE should move the
* replay_lsn forward.
*/
if (!stream_apply_track_insert_lsn(context, metadata->lsn))
{
log_error("Failed to track target LSN position, "
"see above for details");
return false;
}

break;
}

Expand Down Expand Up @@ -1053,13 +1027,6 @@ stream_apply_sql(StreamApplyContext *context,
break;
}

if (!stream_apply_track_insert_lsn(context, metadata->lsn))
{
log_error("Failed to track target LSN position, "
"see above for details");
return false;
}

break;
}

Expand Down Expand Up @@ -1655,99 +1622,29 @@ parseSQLAction(const char *query, LogicalMessageMetadata *metadata)
}


/*
* stream_apply_track_insert_lsn tracks the current pg_current_wal_insert_lsn()
* location on the target system right after a COMMIT; of a transaction that
* was assigned sourceLSN on the source system.
*/
bool
stream_apply_track_insert_lsn(StreamApplyContext *context, uint64_t sourceLSN)
{
LSNTracking *lsn_tracking = (LSNTracking *) calloc(1, sizeof(LSNTracking));

if (lsn_tracking == NULL)
{
log_error(ALLOCATION_FAILED_ERROR);
return false;
}

lsn_tracking->sourceLSN = sourceLSN;

if (!pgsql_current_wal_insert_lsn(&(context->controlPgConn),
&(lsn_tracking->insertLSN)))
{
/* errors have already been logged */
return false;
}

log_debug("stream_apply_track_insert_lsn: %X/%X :: %X/%X",
LSN_FORMAT_ARGS(sourceLSN),
LSN_FORMAT_ARGS(lsn_tracking->insertLSN));

/* update the linked list */
lsn_tracking->previous = context->lsnTrackingList;
context->lsnTrackingList = lsn_tracking;

return true;
}


/*
* stream_apply_find_durable_lsn fetches the LSN for the current durable
* location on the target system, and finds the greatest sourceLSN with an
* associated insertLSN that's before the current (durable) write location.
* location on the target system using pg_replication_origin_progress.
*/
bool
stream_apply_find_durable_lsn(StreamApplyContext *context, uint64_t *durableLSN)
{
uint64_t flushLSN = InvalidXLogRecPtr;

if (!stream_fetch_current_lsn(&flushLSN,
context->connStrings->target_pguri,
PGSQL_CONN_SOURCE))
{
log_error("Failed to retrieve current WAL positions, "
"see above for details");
return false;
}

bool found = false;
LSNTracking *current = context->lsnTrackingList;

for (; current != NULL; current = current->previous)
{
if (current->insertLSN <= flushLSN)
{
found = true;
*durableLSN = current->sourceLSN;
break;
}
}
bool flush = true;

if (!found)
if (!pgsql_replication_origin_progress(&(context->controlPgConn),
context->origin,
flush,
&flushLSN))
{
*durableLSN = InvalidXLogRecPtr;

log_debug("Failed to find a durable source LSN for target LSN %X/%X",
LSN_FORMAT_ARGS(flushLSN));

/* errors have already been logged */
log_error("Failed to retrieve origin progress, "
"see above for details");
return false;
}

log_debug("stream_apply_find_durable_lsn(%X/%X): %X/%X :: %X/%X",
LSN_FORMAT_ARGS(flushLSN),
LSN_FORMAT_ARGS(current->sourceLSN),
LSN_FORMAT_ARGS(current->insertLSN));

/* clean-up the lsn tracking list */
LSNTracking *tail = current->previous;
current->previous = NULL;

while (tail != NULL)
{
LSNTracking *previous = tail->previous;
tail = previous;
}
*durableLSN = flushLSN;

return true;
}
Expand Down
5 changes: 0 additions & 5 deletions src/bin/pgcopydb/ld_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -679,14 +679,9 @@ bool computeSQLFileName(StreamApplyContext *context);

bool parseSQLAction(const char *query, LogicalMessageMetadata *metadata);

bool stream_apply_track_insert_lsn(StreamApplyContext *context,
uint64_t sourceLSN);

bool stream_apply_find_durable_lsn(StreamApplyContext *context,
uint64_t *durableLSN);

bool stream_apply_write_lsn_tracking(StreamApplyContext *context);
bool stream_apply_read_lsn_tracking(StreamApplyContext *context);

/* ld_replay */
bool stream_replay(StreamSpecs *specs);
Expand Down
Loading
0