8000 Fix pgcopydb list progress command. by dimitri · Pull Request #610 · dimitri/pgcopydb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Fix pgcopydb list progress command. #610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 83 additions & 2 deletions src/bin/pgcopydb/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -6791,13 +6791,14 @@ catalog_iter_s_table_in_copy_init(SourceTableIterator *iter)
" select t.oid, qname, nspname, relname, amname, restore_list_name, "
" relpages, reltuples, t.bytes, t.bytes_pretty, "
" exclude_data, part_key, "
" coalesce(part.partcount, 0), s.partnum, 0 as min, 0 as max, "
" part.partcount, s.partnum, part.min, part.max, "
" c.srcrowcount, c.srcsum, c.dstrowcount, c.dstsum, "
" sum(s.duration), sum(s.bytes) "

" from process p "
" join s_table t on p.tableoid = t.oid "
" join summary s on s.pid = t.pid and s.tableoid = p.tableoid "
" join summary s on s.pid = p.pid "
" and s.tableoid = p.tableoid "

" left join s_table_part part "
" on part.oid = p.tableoid "
Expand Down Expand Up @@ -6932,6 +6933,86 @@ catalog_iter_s_index_in_progress_init(SourceIndexIterator *iter)
}


/*
* catalog_count_summary_done counts the number of tables and indexes that have
* already been processed from the summary table.
*/
bool
catalog_count_summary_done(DatabaseCatalog *catalog,
CatalogProgressCount *count)
{
sqlite3 *db = catalog->db;

if (db == NULL)
{
log_error("BUG: catalog_count_summary_done: db is NULL");
return false;
}

char *sql =
"select "
" ("
" with pdone as "
" ("
" select tableoid, "
" count(s.partnum) as partdone, "
" coalesce(p.partcount, 1) as partcount "
" from summary s "
" join s_table t on t.oid = s.tableoid "
" left join s_table_part p on p.oid = t.oid "
" where tableoid is not null "
" and done_time_epoch is not null "
" group by tableoid"
" ) "
" select count(tableoid) from pdone where partdone = partcount"
" ) as tblcount,"
" ("
" select count(indexoid) "
" from summary "
" where indexoid is not null and done_time_epoch is not null"
" ) as idxcount";

SQLiteQuery query = {
.context = count,
.fetchFunction = &catalog_count_summary_done_fetch
};

if (!catalog_sql_prepare(db, sql, &query))
{
/* errors have already been logged */
return false;
}

/* now execute the query, which return exactly one row */
if (!catalog_sql_execute_once(&query))
{
/* errors have already been logged */
return false;
}

return true;
}


/*
* catalog_count_summary_done_fetch fetches a CatalogProgressCount from a query
* ppStmt result.
*/
bool
catalog_count_summary_done_fetch(SQLiteQuery *query)
{
CatalogProgressCount *count = (CatalogProgressCount *) query->context;

/* cleanup the memory area before re-use */
bzero(count, sizeof(CatalogProgressCount));

count->table = sqlite3_column_int64(query->ppStmt, 0);
count->index = sqlite3_column_int64(query->ppStmt, 1);

return true;
}


/*
* catalog_sql_prepare prepares a SQLite query for our internal catalogs.
*/
Expand Down
14 changes: 13 additions & 1 deletion src/bin/pgcopydb/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ bool catalog_iter_s_depend_finish(SourceDependIterator *iter);
bool catalog_s_depend_fetch(SQLiteQuery *query);

/*
* Processes
* Processes, progress, summary
*/
typedef struct ProcessInfo
{
Expand All @@ -508,6 +508,7 @@ typedef struct ProcessInfo
uint32_t indexOid;
} ProcessInfo;


bool catalog_upsert_process_info(DatabaseCatalog *catalog, ProcessInfo *ps);
bool catalog_delete_process(DatabaseCatalog *catalog, pid_t pid);

Expand All @@ -524,6 +525,17 @@ bool catalog_iter_s_index_in_progress(DatabaseCatalog *catalog,
bool catalog_iter_s_index_in_progress_init(SourceIndexIterator *iter);


typedef struct CatalogProgressCount
{
uint64_t table;
uint64_t index;
} CatalogProgressCount;

bool catalog_count_summary_done(DatabaseCatalog *catalog,
CatalogProgressCount *count);
bool catalog_count_summary_done_fetch(SQLiteQuery *query);


/*
* Internal tooling for catalogs management
*/
Expand Down
12 changes: 10 additions & 2 deletions src/bin/pgcopydb/progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,16 @@ copydb_update_progress(CopyDataSpec *copySpecs, CopyProgress *progress)
progress->tableCount,
progress->indexCount);

CatalogProgressCount done = { 0 };

if (!catalog_count_summary_done(sourceDB, &done))
{
log_error("Failed to count tables and indexes done in our catalogs");
return false;
}

/* count table in progress, table done */
progress->tableDoneCount = 0;
progress->tableDoneCount = done.table;
progress->tableInProgress.count = 0;

/* we can't have more table in progress than tableJobs */
Expand Down Expand Up @@ -597,7 +605,7 @@ copydb_update_progress(CopyDataSpec *copySpecs, CopyProgress *progress)
}

/* count index in progress, index done */
progress->indexDoneCount = 0;
progress->indexDoneCount = done.index;
progress->indexInProgress.count = 0;

/* we can't have more index in progress than indexJobs */
Expand Down
0