8000 Bug fixes for the transform process. by dimitri · Pull Request #114 · dimitri/pgcopydb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Bug fixes for the transform process. #114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/design.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ The process tree then looks like the following:

* pgcopydb stream receive

* pgcopydb stream transform
* pgcopydb stream transform

* pgcopydb stream catchup

Expand Down Expand Up @@ -151,9 +151,9 @@ Here is a description of the process tree:
- One process implements :ref:`pgcopydb_stream_receive` to fetch changes
in the JSON format and pre-fetch them in JSON files.

- As soon as JSON file is completed, then a new process is
opportunistically started to transform the JSON file into SQL, as if
by calling the command :ref:`pgcopydb_stream_transform`.
- As soon as JSON file is completed, the pgcopydb stream transform
worker transforms the JSON file into SQL, as if by calling the command
:ref:`pgcopydb_stream_transform`.

- Another process implements :ref:`pgcopydb_stream_catchup` to apply SQL
changes to the target Postgres instance. This process loops over
Expand Down
8 changes: 4 additions & 4 deletions src/bin/pgcopydb/indexes.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ copydb_index_worker(CopyDataSpec *specs)

case QMSG_TYPE_INDEXOID:
{
if (!copydb_create_index_by_oid(specs, mesg.oid))
if (!copydb_create_index_by_oid(specs, mesg.data.oid))
{
++errors;
}
Expand Down Expand Up @@ -336,13 +336,13 @@ copydb_add_table_indexes(CopyDataSpec *specs, CopyTableDataSpec *tableSpecs)

QMessage mesg = {
.type = QMSG_TYPE_INDEXOID,
.oid = index->indexOid
.data.oid = index->indexOid
};

log_trace("Queueing index \"%s\".\"%s\" [%u] for table %s [%u]",
index->indexNamespace,
index->indexRelname,
mesg.oid,
mesg.data.oid,
tableSpecs->qname,
tableSpecs->sourceTable->oid);

Expand All @@ -369,7 +369,7 @@ copydb_index_workers_send_stop(CopyDataSpec *specs)
{
for (int i = 0; i < specs->indexJobs; i++)
{
QMessage stop = { .type = QMSG_TYPE_STOP, .oid = 0 };
QMessage stop = { .type = QMSG_TYPE_STOP, .data.oid = 0 };

log_debug("Send STOP message to CREATE INDEX queue %d",
specs->indexQueue.qId);
Expand Down
4 changes: 3 additions & 1 deletion src/bin/pgcopydb/ld_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ stream_apply_catchup(StreamSpecs *specs)
}
}

if (!stream_read_context(specs, &(context.system), &(context.WalSegSz)))
if (!stream_read_context(&(specs->paths),
&(context.system),
&(context.WalSegSz)))
{
log_error("Failed to read the streaming context information "
"from the source database, see above for details");
Expand Down
238 changes: 238 additions & 0 deletions src/bin/pgcopydb/ld_transform.c
F438
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,237 @@ static bool streamLogicalTransactionAppendStatement(LogicalTransaction *txn,
);


/*
* stream_transform_start_worker creates a sub-process that transform JSON
* files into SQL files as needed, consuming requests from a queue.
*/
bool
stream_transform_start_worker(LogicalStreamContext *context)
{
StreamContext *privateContext = (StreamContext *) context->private;

/*
* Flush stdio channels just before fork, to avoid double-output
* problems.
*/
fflush(stdout);
fflush(stderr);

int fpid = fork();

switch (fpid)
{
case -1:
{
log_error("Failed to fork a stream transform worker process: %m");
return false;
}

case 0:
{
/* child process runs the command */
if (!stream_transform_worker(context))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
}

exit(EXIT_CODE_QUIT);
}

default:
{
/* fork succeeded, in parent */
privateContext->subprocess = fpid;
break;
}
}

return true;
}


/*
* stream_transform_worker is a worker process that loops over messages
* received from a queue, each message contains the WAL.json and the WAL.sql
* file names. When receiving such a message, the WAL.json file is transformed
* into the WAL.sql file.
*/
bool
stream_transform_worker(LogicalStreamContext *context)
{
StreamContext *privateContext = (StreamContext *) context->private;

int errors = 0;
bool stop = false;

log_notice("Started Stream Transform worker %d [%d]", getpid(), getppid());

while (!stop)
{
QMessage mesg = { 0 };

if (asked_to_stop || asked_to_stop_fast || asked_to_quit)
{
return false;
}

if (!queue_receive(&(privateContext->transformQueue), &mesg))
{
/* errors have already been logged */
break;
}

log_debug("stream_transform_worker received message type %ld",
mesg.type);

switch (mesg.type)
{
case QMSG_TYPE_STOP:
{
stop = true;
log_debug("Stop message received by stream transform worker");
break;
}

case QMSG_TYPE_STREAM_TRANSFORM:
{
log_debug("stream_transform_worker received transform %X/%X",
LSN_FORMAT_ARGS(mesg.data.lsn));

if (!stream_compute_pathnames(context, mesg.data.lsn))
{
/* errors have already been logged, break from the loop */
++errors;
break;
}

if (!stream_transform_file(privateContext->walFileName,
privateContext->sqlFileName))
{
/* errors have already been logged, break from the loop */
++errors;
break;
}
break;
}

default:
{
log_error("Received unknown message type %ld on vacuum queue %d",
mesg.type,
privateContext->transformQueue.qId);
break;
}
}
}

return stop == true && errors == 0;
}


/*
* stream_compute_pathnames computes the WAL.json and WAL.sql filenames from
* the given LSN, which is expected to be the first LSN processed in the file
* we need to find the name of.
*/
bool
stream_compute_pathnames(LogicalStreamContext *context, uint64_t lsn)
{
StreamContext *privateContext = (StreamContext *) context->private;

char wal[MAXPGPATH] = { 0 };

/*
* The timeline and wal segment size are determined when connecting to the
* source database, and stored to local files at that time. When the Stream
* Transform Worker process is created, we don't have that information yet,
* so the first time we process an LSN from the queue we go and fetch the
* information from our local files.
*/
if (context->timeline == 0)
{
uint32_t WalSegSz;
IdentifySystem system = { 0 };

if (!stream_read_context(&(privateContext->paths), &system, &WalSegSz))
{
log_error("Failed to read the streaming context information "
"from the source database, see above for details");
return false;
}

context->timeline = system.timeline;
context->WalSegSz = WalSegSz;
}

/* compute the WAL filename that would host the current LSN */
XLogSegNo segno;
XLByteToSeg(lsn, segno, context->WalSegSz);
XLogFileName(wal, context->timeline, segno, context->WalSegSz);

sformat(privateContext->walFileName,
sizeof(privateContext->walFileName),
"%s/%s.json",
privateContext->paths.dir,
wal);

sformat(privateContext->sqlFileName,
sizeof(privateContext->sqlFileName),
"%s/%s.sql",
privateContext->paths.dir,
wal);

return true;
}


/*
* vacuum_add_table sends a message to the VACUUM process queue to process
* given table.
*/
bool
stream_transform_add_file(Queue *queue, uint64_t firstLSN)
{
QMessage mesg = {
.type = QMSG_TYPE_STREAM_TRANSFORM,
.data.lsn = firstLSN
};

log_debug("stream_transform_add_file[%d]: %X/%X",
queue->qId,
LSN_FORMAT_ARGS(mesg.data.lsn));

if (!queue_send(queue, &mesg))
{
/* errors have already been logged */
return false;
}

return true;
}


/*
* vacuum_send_stop sends the STOP message to the Stream Transform worker.
*/
bool
stream_transform_send_stop(Queue *queue)
{
QMessage stop = { .type = QMSG_TYPE_STOP };

log_debug("Send STOP message to Transform Queue %d", queue->qId);

if (!queue_send(queue, &stop))
{
/* errors have already been logged */
return false;
}

return true;
}


/*
* stream_transform_file transforms a JSON formatted file as received from the
* wal2json logical decoding plugin into an SQL file ready for applying to the
Expand All @@ -52,6 +283,10 @@ stream_transform_file(char *jsonfilename, char *sqlfilename)
StreamContent content = { 0 };
long size = 0L;

log_notice("Transforming JSON file \"%s\" into SQL file \"%s\"",
jsonfilename,
sqlfilename);

strlcpy(content.filename, jsonfilename, sizeof(content.filename));

if (!read_file(content.filename, &(content.buffer), &size))
Expand Down Expand Up @@ -204,6 +439,9 @@ stream_transform_file(char *jsonfilename, char *sqlfilename)
return false;
}

log_info("Transformed JSON messages into SQL file \"%s\"",
sqlfilename);

return true;
}

Expand Down
5 changes: 2 additions & 3 deletions src/bin/pgcopydb/queue_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ queue_send(Queue *queue, QMessage *msg)
if (errStatus < 0)
{
log_error("Failed to send a message to queue %d "
"with type %ld and oid %u: %m",
"with type %ld: %m",
queue->qId,
msg->type,
msg->oid);
msg->type);
return false;
}

Expand Down
7 changes: 6 additions & 1 deletion src/bin/pgcopydb/queue_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ typedef enum
QMSG_TYPE_UNKNOWN = 0,
QMSG_TYPE_TABLEOID,
QMSG_TYPE_INDEXOID,
QMSG_TYPE_STREAM_TRANSFORM,
QMSG_TYPE_STOP
} QMessageType;

typedef struct QMessage
{
long type;
uint32_t oid;
union
{
uint32_t oid;
uint64_t lsn;
} data;
} QMessage;

bool queue_create(Queue *queue);
Expand Down
Loading
0