8000 Fix issue with missing data from transactions with BEGIN LSN less than consistent_point. by shubhamdhama · Pull Request #295 · dimitri/pgcopydb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Fix issue with missing data from transactions with BEGIN LSN less than consistent_point. #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 41 additions & 20 deletions src/bin/pgcopydb/ld_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -496,27 +496,44 @@ stream_apply_sql(StreamApplyContext *context,

case STREAM_ACTION_BEGIN:
{
if (metadata->lsn == InvalidXLogRecPtr ||
metadata->txnCommitLSN == InvalidXLogRecPtr ||
IS_EMPTY_STRING_BUFFER(metadata->timestamp))
{
log_fatal("Failed to parse BEGIN message: %s", sql);
return false;
}

/* did we reach the starting LSN positions now? */
if (!context->reachedStartPos)
{
/*
* compare previousLSN with COMMIT LSN to safely include
* complete transactions while skipping already applied changes.
*
* this is particularly useful at the beginnig where BEGIN LSN
* of some transactions could be less than `consistent_point`,
* but COMMIT LSN of those transactions is guaranteed to be
* greater.
*
* in case of interruption and this is the first transaction to
* be applied, previousLSN should be equal to the last
* transaction's COMMIT LSN or the LSN of non-transaction
* action. Therefore, this condition will still hold true.
*/

context->reachedStartPos =
context->previousLSN < metadata->lsn;
context->previousLSN < metadata->txnCommitLSN;
}

log_trace("BEGIN %lld LSN %X/%X @%s, previous LSN %X/%X %s",
log_trace("BEGIN %lld LSN %X/%X @%s, previous LSN %X/%X, COMMIT LSN %X/%X %s",
(long long) metadata->xid,
LSN_FORMAT_ARGS(metadata->lsn),
metadata->timestamp,
LSN_FORMAT_ARGS(context->previousLSN),
LSN_FORMAT_ARGS(metadata->txnCommitLSN),
context->reachedStartPos ? "" : "[skipping]");

if (metadata->lsn == InvalidXLogRecPtr ||
IS_EMPTY_STRING_BUFFER(metadata->timestamp))
{
log_fatal("Failed to parse BEGIN message: %s", sql);
return false;
}

/*
* Check if we reached the endpos LSN already.
*/
Expand All @@ -543,6 +560,20 @@ stream_apply_sql(StreamApplyContext *context,
return false;
}

break;
}

case STREAM_ACTION_COMMIT:
{
if (!context->reachedStartPos)
{
return true;
}

/*
* update replication progress with metadata->lsn, that is,
* transaction COMMIT LSN
*/
char lsn[PG_LSN_MAXLENGTH] = { 0 };

sformat(lsn, sizeof(lsn), "%X/%X",
Expand All @@ -556,16 +587,6 @@ stream_apply_sql(StreamApplyContext *context,
return false;
}

break;
}

case STREAM_ACTION_COMMIT:
{
if (!context->reachedStartPos)
{
return true;
}

log_trace("COMMIT %lld LSN %X/%X",
(long long) metadata->xid,
LSN_FORMAT_ARGS(metadata->lsn));
Expand Down Expand Up @@ -917,7 +938,7 @@ parseSQLAction(const char *query, LogicalMessageMetadata *metadata)
else if (query == commit)
{
metadata->action = STREAM_ACTION_COMMIT;
message = commit + strlen(OUTPUT_BEGIN);
message = commit + strlen(OUTPUT_COMMIT);
}
else if (query == switchwal)
{
Expand Down
14 changes: 14 additions & 0 deletions src/bin/pgcopydb/ld_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1540,6 +1540,20 @@ parseMessageMetadata(LogicalMessageMetadata *metadata,
}
}

if (json_object_has_value(jsobj, "commit_lsn"))
{
char *txnCommitLSN = (char *) json_object_get_string(jsobj, "commit_lsn");

if (txnCommitLSN != NULL)
{
if (!parseLSN(txnCommitLSN, &(metadata->txnCommitLSN)))
{
log_error("Failed to parse LSN \"%s\"", txnCommitLSN);
return false;
}
}
}

if (!skipAction &&
metadata->lsn == InvalidXLogRecPtr &&
(metadata->action == STREAM_ACTION_BEGIN ||
Expand Down
1 change: 1 addition & 0 deletions src/bin/pgcopydb/ld_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ typedef struct LogicalMessageMetadata
StreamAction action;
uint32_t xid;
uint64_t lsn;
uint64_t txnCommitLSN; /* COMMIT LSN of the transaction */
char timestamp[PG_MAX_TIMESTAMP];

/* our own internal decision making */
Expand Down
8 changes: 4 additions & 4 deletions src/bin/pgcopydb/ld_transform.c
Original file line number Diff line number Diff line change
Expand Up @@ -1521,19 +1521,19 @@ stream_write_transaction(FILE *out, LogicalTransaction *txn)


/*
* stream_write_switchwal writes a SWITCH statement to the already open out
* stream.
* stream_write_begin writes a BEGIN statement to the already open out stream.
*/
bool
stream_write_begin(FILE *out, LogicalTransaction *txn)
{
int ret =
fformat(out,
"%s{\"xid\":%lld,\"lsn\":\"%X/%X\",\"timestamp\":\"%s\"}\n",
"%s{\"xid\":%lld,\"lsn\":\"%X/%X\",\"timestamp\":\"%s\",\"commit_lsn\":\"%X/%X\"}\n",
OUTPUT_BEGIN,
(long long) txn->xid,
LSN_FORMAT_ARGS(txn->beginLSN),
txn->timestamp);
txn->timestamp,
LSN_FORMAT_ARGS(txn->commitLSN));

return ret != -1;
}
Expand Down
1 change: 1 addition & 0 deletions tests/follow-data-only/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ FROM pagila

WORKDIR /usr/src/pgcopydb
COPY ./copydb.sh copydb.sh
COPY ./run-background-traffic.sh run-background-traffic.sh
COPY ./dml.sql dml.sql
COPY ./ddl.sql ddl.sql

Expand Down
13 changes: 12 additions & 1 deletion tests/follow-data-only/copydb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,21 @@ psql -d ${PGCOPYDB_TARGET_PGURI} -f /usr/src/pgcopydb/ddl.sql
# insert a first batch of 10 rows (1..10)
psql -v a=1 -v b=10 -d ${PGCOPYDB_SOURCE_PGURI} -f /usr/src/pgcopydb/dml.sql

# take a snapshot with concurrent activity happening it would be hard to sync
# concurrent activity in the inject service, so use a job instead
bash ./run-background-traffic.sh &
BACKGROUND_TRAFFIC_PID=$!

# wait for few seconds to allow snapshot to happen in between the traffic
sleep 2

# grab a snapshot on the source database
coproc ( pgcopydb snapshot --follow --plugin wal2json --notice )

sleep 1
sleep 2

# stop the background traffic
kill -TERM ${BACKGROUND_TRAFFIC_PID}

# check the replication slot file contents
cat /var/lib/postgres/.local/share/pgcopydb/slot
Expand Down
14 changes: 14 additions & 0 deletions tests/follow-data-only/run-background-traffic.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#! /bin/bash

set -e

# Run transactions that insert a variable number of rows at a time. The number
# of rows inserted varies between 1 and 100. Each insert will occur in the
# background and finish quickly which avoids excessive connections.
while true; do
psql -d ${PGCOPYDB_SOURCE_PGURI} \
-c "INSERT INTO table_a(some_field) SELECT generate_series(1,(random() * 100 + 1)::int);" &
# To control the rate, we include a sleep of 0.1 seconds between each insert
# operation.
sleep 0.1
done
0