8000 Refactor our internal representation for Logical Messages. by dimitri · Pull Request #198 · dimitri/pgcopydb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Refactor our internal representation for Logical Messages. #198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions src/bin/pgcopydb/ld_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,41 @@ typedef struct LogicalTransactionArray
} LogicalTransactionArray;


/*
* The logical decoding client produces messages that can be either:
*
* - part of a transaction (BEGIN/COMMIT, then INSERT/UPDATE/DELETE/TRUNCATE)
* - a keepalive message
* - a pgcopydb constructed SWITCH WAL message
*
* The keepalive and switch wal messages could also appear within a
* transaction.
*/
typedef struct LogicalMessage
{
bool isTransaction;
StreamAction action;

union command
{
LogicalTransaction tx;
LogicalMessageSwitchWAL switchwal;
LogicalMessageKeepalive keepalive;
} command;
} LogicalMessage;


typedef struct LogicalMessageArray
{
int count;
LogicalMessage *array; /* malloc'ed area */
} LogicalMessageArray;


/*
* StreamSpecs is the streaming specifications used by the client-side of the
* logical decoding implementation, where we keep track of progress etc.
*/
typedef struct StreamSpecs
{
CDCPaths paths;
Expand Down Expand Up @@ -388,9 +423,10 @@ bool stream_compute_pathnames(uint32_t WalSegSz,
bool stream_transform_stream(FILE * in, FILE *out);
bool stream_transform_line(void *ctx, const char *line, bool *stop);
bool stream_transform_message(char *message,
LogicalTransaction *currentTx,
LogicalMessage *currentMsg,
bool *commit);
bool stream_transform_file(char *jsonfilename, char *sqlfilename);
bool stream_write_message(FILE *out, LogicalMessage *msg);
bool stream_write_transaction(FILE *out, LogicalTransaction *tx);
bool stream_write_begin(FILE *out, LogicalTransaction *tx);
bool stream_write_commit(FILE *out, LogicalTransaction *tx);
Expand All @@ -402,14 +438,15 @@ bool stream_write_update(FILE *out, LogicalMessageUpdate *update);
bool stream_write_delete(FILE * out, LogicalMessageDelete *delete);
bool stream_write_value(FILE *out, LogicalMessageValue *value);

bool parseMessage(LogicalTransaction *txn,
bool parseMessage(LogicalMessage *mesg,
LogicalMessageMetadata *metadata,
char *message,
JSON_Value *json);

bool streamLogicalTransactionAppendStatement(LogicalTransaction *txn,
LogicalTransactionStatement *stmt);

void FreeLogicalMessage(LogicalMessage *msg);
void FreeLogicalTransaction(LogicalTransaction *tx);
void FreeLogicalMessageTupleArray(LogicalMessageTupleArray *tupleArray);

Expand Down
Loading
0