8000 Refactor the transform process management. by dimitri · Pull Request #194 · dimitri/pgcopydb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Refactor the transform process management. #194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 17 additions & 60 deletions src/bin/pgcopydb/cli_clone_follow.c
< 8000 td class="blob-code blob-code-context js-file-line">
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ CommandLine fork_command =
cli_copy_db_getopts,
cli_clone);

/* pgcopydb copy db is an alias for pgcopydb clone */
CommandLine copy__db_command =
make_command(
"copy-db",
"Clone an entire database from source to target",
" --source ... --target ... [ --table-jobs ... --index-jobs ... ] ",
PGCOPYDB_CLONE_GETOPTS_HELP,
cli_copy_db_getopts,
cli_clone);


CommandLine follow_command =
make_command(
Expand Down Expand Up @@ -100,7 +110,6 @@ static bool start_follow_process(CopyDataSpec *copySpecs,
static bool cli_clone_follow_wait_subprocess(const char *name, pid_t pid);

static bool cloneDB(CopyDataSpec *copySpecs);
static bool followDB(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs);


/*
Expand All @@ -127,7 +136,7 @@ cli_clone(int argc, char **argv)
copyDBoptions.slotName,
copyDBoptions.origin,
copyDBoptions.endpos,
STREAM_MODE_PREFETCH,
STREAM_MODE_CATCHUP,
copyDBoptions.stdIn,
copyDBoptions.stdOut))
{
Expand Down Expand Up @@ -337,6 +346,9 @@ cli_clone(int argc, char **argv)
}
}

/* make sure all sub-processes are now finished */
success = success && copydb_wait_for_subprocesses();

if (!success)
{
exit(EXIT_CODE_INTERNAL_ERROR);
Expand Down Expand Up @@ -364,7 +376,7 @@ cli_follow(int argc, char **argv)
copyDBoptions.slotName,
copyDBoptions.origin,
copyDBoptions.endpos,
STREAM_MODE_PREFETCH,
STREAM_MODE_CATCHUP,
copyDBoptions.stdIn,
copyDBoptions.stdOut))
{
Expand Down Expand Up @@ -414,7 +426,7 @@ start_clone_process(CopyDataSpec *copySpecs, pid_t *pid)
case 0:
{
/* child process runs the command */
log_debug("Starting the clone sub-process");
log_notice("Starting the clone sub-process");

if (!cloneDB(copySpecs))
{
Expand Down Expand Up @@ -608,7 +620,7 @@ start_follow_process(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs,
case 0:
{
/* child process runs the command */
log_debug("Starting the follow sub-process");
log_notice("Starting the follow sub-process");

if (!followDB(copySpecs, streamSpecs))
{
Expand All @@ -631,61 +643,6 @@ start_follow_process(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs,
}

/*
* followDB implements a logical decoding client for streaming changes from the
* source database into the target database. The stream_setup_databases() must
* have been called already so that the replication slot using wal2json is
* ready, the pgcopydb.sentinel table exists, and the target database
* replication origin has been created too.
*/
static bool
followDB(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)
{
/*
* Remove the possibly still existing stream context files from
* previous round of operations (--resume, etc). We want to make sure
* that the catchup process reads the files created on this connection.
*/
if (!stream_cleanup_context(streamSpecs))
{
/* errors have already been logged */
return false;
}

pid_t prefetch = -1;
pid_t catchup = -1;

if (!follow_start_prefetch(streamSpecs, &prefetch))
{
/* errors have already been logged */
return false;
}

/*
* Second, start the catchup process.
*/
if (!follow_start_catchup(streamSpecs, &catchup))
{
/* errors have already been logged */
return false;
}

/*
* Finally wait until the process are finished.
*
* This happens when the sentinel endpos is set, typically using the
* command: pgcopydb stream sentinel set endpos --current.
*/
if (!follow_wait_subprocesses(streamSpecs, prefetch, catchup))
{
/* errors have already been logged */
return false;
}

return true;
}


/*
* cli_clone_follow_wait_subprocesses waits until both sub-processes are
< 10000 /td> * finished.
Expand Down
25 changes: 0 additions & 25 deletions src/bin/pgcopydb/cli_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,6 @@ static void cli_copy_indexes(int argc, char **argv);
static void cli_copy_constraints(int argc, char **argv);
static void cli_copy_blobs(int argc, char **argv);

/* pgcopydb copy db is an alias for pgcopydb clone */
CommandLine copy__db_command =
make_command(
"copy-db",
"Copy an entire database from source to target",
" --source ... --target ... [ --table-jobs ... --index-jobs ... ] ",
" --source Postgres URI to the source database\n"
" --target Postgres URI to the target database\n"
" --dir Work directory to use\n"
" --table-jobs Number of concurrent COPY jobs to run\n"
" --index-jobs Number of concurrent CREATE INDEX jobs to run\n"
" --drop-if-exists On the target database, clean-up from a previous run first\n"
" --roles Also copy roles found on source to target\n"
" --no-owner Do not set ownership of objects to match the original database\n"
" --no-acl Prevent restoration of access privileges (grant/revoke commands).\n"
" --no-comments Do not output commands to restore comments\n"
" --skip-large-objects Skip copying large objects (blobs)\n"
" --filters <filename> Use the filters defined in <filename>\n"
" --restart Allow restarting when temp files exist already\n"
" --resume Allow resuming operations after a failure\n"
" --not-consistent Allow taking a new snapshot on the source database\n"
" --snapshot Use snapshot obtained with pg_export_snapshot\n",
cli_copy_db_getopts,
cli_clone);

static CommandLine copy_db_command =
make_command(
"db",
Expand Down
30 changes: 27 additions & 3 deletions src/bin/pgcopydb/cli_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1107,9 +1107,33 @@ stream_start_in_mode(LogicalStreamMode mode)
exit(EXIT_CODE_INTERNAL_ERROR);
}

if (!startLogicalStreaming(&specs))
switch (specs.mode)
{
/* errors have already been logged */
exit(EXIT_CODE_SOURCE);
case STREAM_MODE_RECEIVE:
{
if (!startLogicalStreaming(&specs))
{
/* errors have already been logged */
exit(EXIT_CODE_SOURCE);
}
break;
}

case STREAM_MODE_PREFETCH:
{
if (!followDB(&copySpecs, &specs))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
}
break;
}

default:
{
log_fatal("BUG: stream_start_in_mode called in mode %d", mode);
exit(EXIT_CODE_INTERNAL_ERROR);
break;
}
}
}
4 changes: 2 additions & 2 deletions src/bin/pgcopydb/copydb.c
Original file line number Diff line number Diff line change
Expand Up @@ -740,14 +740,14 @@ copydb_init_specs(CopyDataSpec *specs,
specs->section == DATA_SECTION_TABLE_DATA)
{
/* create the VACUUM process queue */
if (!queue_create(&(specs->vacuumQueue)))
if (!queue_create(&(specs->vacuumQueue), "vacuum"))
{
log_error("Failed to create the VACUUM process qu 613A eue");
return false;
}

/* create the CREATE INDEX process queue */
if (!queue_create(&(specs->indexQueue)))
if (!queue_create(&(specs->indexQueue), "create index"))
{
log_error("Failed to create the INDEX process queue");
return false;
Expand Down
Loading
0