49.6. 逻辑解码输出插件#
可以在 PostgreSQL 源代码树的contrib/test_decoding
子目录中找到一个示例输出插件。
49.6.1. 初始化函数#
通过动态加载一个共享库来加载一个输出插件,该共享库的名称作为库的基本名称。正常的库搜索路径用于定位库。为了提供必需的输出插件回调并指示该库实际上是一个输出插件,它需要提供一个名为_PG_output_plugin_init
的函数。此函数传递了一个结构,需要用各个操作的回调函数指针填充该结构。
typedef struct OutputPluginCallbacks
{
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeFilterPrepareCB filter_prepare_cb;
LogicalDecodeBeginPrepareCB begin_prepare_cb;
LogicalDecodePrepareCB prepare_cb;
LogicalDecodeCommitPreparedCB commit_prepared_cb;
LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
LogicalDecodeStreamPrepareCB stream_prepare_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
必需begin_cb
、change_cb
和commit_cb
回调,而startup_cb
、truncate_cb
、message_cb
、filter_by_origin_cb
和shutdown_cb
是可选的。如果未设置truncate_cb
但要解码TRUNCATE
,则该操作将被忽略。
输出插件还可以定义函数来支持正在进行的大型事务的流式处理。必需stream_start_cb
、stream_stop_cb
、stream_abort_cb
、stream_commit_cb
和stream_change_cb
,而stream_message_cb
和stream_truncate_cb
是可选的。如果输出插件还支持两阶段提交,则stream_prepare_cb
也是必需的。
输出插件还可以定义函数来支持两阶段提交,这允许在PREPARE TRANSACTION
上解码操作。必需begin_prepare_cb
、prepare_cb
、commit_prepared_cb
和rollback_prepared_cb
回调,而filter_prepare_cb
是可选的。如果输出插件还支持正在进行的大型事务的流式处理,则stream_prepare_cb
也是必需的。
49.6.2. 功能#
为了解码、格式化和输出更改,输出插件可以使用后端的正常基础设施的大部分,包括调用输出函数。只要仅访问由initdb
在pg_catalog
架构中创建的关系或使用以下内容标记为用户提供的目录表,就可以允许只读访问关系:
ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
请注意,必须仅通过systable_*
扫描 API 访问输出插件中的用户目录表或常规系统目录表。通过heap_*
扫描 API 访问将出错。此外,禁止任何导致事务 ID 分配的操作。其中包括写入表、执行 DDL 更改和调用pg_current_xact_id()
。
49.6.3. 输出模式#
输出插件回调可以以几乎任意格式将数据传递给使用者。对于某些用例(如通过 SQL 查看更改),以可以包含任意数据的数据类型(例如,bytea
)返回数据很麻烦。如果输出插件仅以服务器编码输出文本数据,则它可以通过在启动回调中将OutputPluginOptions.output_type
设置为OUTPUT_PLUGIN_TEXTUAL_OUTPUT
而不是OUTPUT_PLUGIN_BINARY_OUTPUT
来声明这一点。在这种情况下,所有数据都必须采用服务器编码,以便text
数据项可以包含它。这在启用断言的版本中进行检查。
49.6.4. 输出插件回调#
输出插件会通过它需要提供的各种回调来获取有关正在发生的更改的通知。
并发事务按提交顺序解码,并且仅在begin
和commit
回调之间解码属于特定事务的更改。显式或隐式回滚的事务永远不会被解码。成功的保存点将折叠到包含它们的交易中,按它们在该交易中执行的顺序折叠。如果使用PREPARE TRANSACTION
为两阶段提交准备事务,并且提供了解码它们所需的输出插件回调,那么该事务也将被解码。当前正在解码的已准备事务可能会通过ROLLBACK PREPARED
命令并发中止。在这种情况下,此事务的逻辑解码也将被中止。一旦检测到中止并调用prepare_cb
回调,此类事务的所有更改都将被跳过。因此,即使在并发中止的情况下,也会向输出插件提供足够的信息,以便它在解码ROLLBACK PREPARED
后对其进行正确处理。
注意
只有已经安全刷新到磁盘的事务才会被解码。当synchronous_commit
设置为off
时,这可能会导致COMMIT
不会在紧随其后的pg_logical_slot_get_changes()
中立即被解码。
49.6.4.1. 启动回调#
每当创建复制槽或要求其流式传输更改时,都会调用可选的startup_cb
回调,而不管准备好输出的更改数量如何。
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
OutputPluginOptions *options,
bool is_init);
当创建复制槽时,is_init
参数将为 true,否则为 false。*options
*指向输出插件可以设置的选项结构
typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
} OutputPluginOptions;
output_type
必须设置为OUTPUT_PLUGIN_TEXTUAL_OUTPUT
或OUTPUT_PLUGIN_BINARY_OUTPUT
。另请参见第 49.6.3 节。如果receive_rewrites
为 true,则输出插件还将针对在某些 DDL 操作期间堆重写所做的更改进行调用。这些更改对于处理 DDL 复制的插件很重要,但它们需要特殊处理。
启动回调应验证ctx->output_plugin_options
中存在的选项。如果输出插件需要有状态,它可以使用ctx->output_plugin_private
来存储它。
49.6.4.2. 关闭回调#
每当先前处于活动状态的复制槽不再使用时,就会调用可选的shutdown_cb
回调,并且可以用来释放输出插件的私有资源。该槽不一定正在删除,只是流正在停止。
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
49.6.4.3. 事务开始回调#
每当已提交事务的开始已解码时,就会调用必需的begin_cb
回调。已中止的事务及其内容永远不会被解码。
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
*txn
*参数包含有关事务的元信息,例如已提交的时间戳及其 XID。
49.6.4.4. 事务结束回调#
每当事务提交已解码时,就会调用必需的commit_cb
回调。如果存在任何已修改的行,则在之前将调用所有已修改行的change_cb
回调。
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
49.6.4.5. 更改回调#
对于事务中的每个单独行修改,都会调用必需的change_cb
回调,它可能是INSERT
、UPDATE
或DELETE
。即使原始命令同时修改了几行,也会针对每行单独调用回调。change_cb
回调可以访问系统或用户目录表,以帮助输出行修改详细信息的过程。在解码已准备(但尚未提交)的事务或解码未提交的事务的情况下,此更改回调也可能由于此事务本身同时回滚而出错。在这种情况下,此已中止事务的逻辑解码将正常停止。
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
*ctx
和txn
参数与begin_cb
和commit_cb
回调的内容相同,但此外关系描述符relation
指向行所属的关系,并且描述行修改的结构change
*被传递进来。
注意
只能使用逻辑解码提取未记录(请参阅UNLOGGED
)且不是临时(请参阅TEMPORARY
或TEMP
)的用户定义表中的更改。
49.6.4.6. 截断回调#
可选的truncate_cb
回调针对TRUNCATE
命令调用。
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
参数类似于change_cb
回调。但是,因为通过外键连接的表上的TRUNCATE
操作需要一起执行,所以此回调接收关系数组,而不仅仅是单个关系。有关详细信息,请参阅TRUNCATE语句的描述。
49.6.4.7. 来源筛选回调#
可选的filter_by_origin_cb
回调被调用以确定从*origin_id
*重放的数据是否与输出插件相关。
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
RepOriginId origin_id);
对于其他回调,*ctx
*参数具有相同的内容。没有信息,但来源可用。要发出信号表明在传入节点上发生的更改不相关,请返回 true,导致它们被过滤掉;否则返回 false。其他回调将不会针对已过滤掉的交易和更改调用。
在实现级联或多向复制解决方案时,这非常有用。按来源筛选可以防止在这样的设置中来回复制相同的更改。虽然交易和更改也携带有关来源的信息,但通过此回调进行筛选明显更有效。
49.6.4.8. 通用消息回调#
每当解码逻辑解码消息时,都会调用可选的message_cb
回调。
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
*txn
参数包含有关交易的元信息,例如已提交的时间戳及其 XID。但是,请注意,当消息是非事务性的并且尚未在记录消息的交易中分配 XID 时,它可以为 NULL。lsn
具有消息的 WAL 位置。transactional
指出消息是否作为事务性消息发送。与更改回调类似,在解码已准备(但尚未提交)的交易或解码未提交的交易的情况下,此消息回调也可能由于此交易本身同时回滚而出错。在这种情况下,此中止交易的逻辑解码将正常停止。prefix
是任意以 null 结尾的前缀,可用于识别当前插件的有趣消息。最后,message
参数保存大小为message_size
*的实际消息。
应格外小心,以确保输出插件认为有趣的前缀是唯一的。使用扩展名或输出插件本身的名称通常是一个不错的选择。
49.6.4.9. 准备筛选回调#
可选的filter_prepare_cb
回调被调用来确定当前两阶段提交事务的一部分数据是否应该在准备阶段考虑进行解码,还是稍后作为常规单阶段事务在COMMIT PREPARED
时间进行解码。要表示应该跳过解码,请返回true
;否则返回false
。当未定义回调时,假定为false
(即无筛选,所有使用两阶段提交的事务也分两阶段进行解码)。
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid);
对于其他回调,*ctx
参数具有相同的内容。参数xid
和gid
*提供了识别事务的两种不同方式。后来的COMMIT PREPARED
或ROLLBACK PREPARED
携带这两个标识符,为输出插件提供了选择要使用什么内容的机会。
可以对每个事务多次调用回调进行解码,并且每次调用时都必须为给定的*xid
和gid
*对提供相同的静态答案。
49.6.4.10. 事务开始准备回调#
每当已解码准备事务的开始时,都会调用必需的begin_prepare_cb
回调。*txn
参数的一部分gid
*字段可用于此回调中,以检查插件是否已收到此PREPARE
,在这种情况下,它可以出错或跳过事务的其余更改。
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
49.6.4.11. 事务准备回调#
每当已解码已准备进行两阶段提交的事务时,都会调用必需的prepare_cb
回调。如果存在任何已修改的行,则在此之前将调用所有已修改行的change_cb
回调。*txn
参数的一部分gid
*字段可用于此回调中。
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
49.6.4.12. 事务提交准备回调#
每当已解码事务COMMIT PREPARED
时,都会调用必需的commit_prepared_cb
回调。*txn
参数的一部分gid
*字段可用于此回调中。
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
49.6.4.13. 事务回滚已准备的回调#
每当解码事务ROLLBACK PREPARED
时,都会调用必需的rollback_prepared_cb
回调。此回调中可以使用*txn
参数中包含的gid
字段。可以使用参数prepare_end_lsn
和prepare_time
来检查插件是否已收到此PREPARE TRANSACTION
,如果是,则可以应用回滚,否则可以跳过回滚操作。仅gid
*不够,因为下游节点可能具有具有相同标识符的已准备事务。
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time);
49.6.4.14. 流启动回调#
在打开正在进行的事务的流式更改块时,将调用必需的stream_start_cb
回调。
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
49.6.4.15. 流停止回调#
在关闭正在进行的事务的流式更改块时,将调用必需的stream_stop_cb
回调。
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
49.6.4.16. 流中止回调#
调用必需的stream_abort_cb
回调以中止先前流式传输的事务。
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
49.6.4.17. 流准备回调#
调用stream_prepare_cb
回调以准备先前流式传输的事务作为两阶段提交的一部分。当输出插件同时支持大型正在进行事务的流式传输和两阶段提交时,需要此回调。
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
49.6.4.18. 流提交回调#
调用必需的stream_commit_cb
回调以提交先前流式传输的事务。
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
49.6.4.19. 流更改回调#
在流式更改块中发送更改(由stream_start_cb
和stream_stop_cb
调用分隔)时,将调用必需的stream_change_cb
回调。不显示实际更改,因为事务可能在稍后的时间点中止,并且我们不会为中止的事务解码更改。
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
49.6.4.20. 流消息回调#
在流式更改块中发送通用消息(由stream_start_cb
和stream_stop_cb
调用分隔)时,将调用可选的stream_message_cb
回调。不显示事务消息的消息内容,因为事务可能在稍后的时间点中止,并且我们不会为中止的事务解码更改。
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
49.6.4.21. 流截断回调#
可选的stream_truncate_cb
回调在流式更改块中的TRUNCATE
命令中调用(由stream_start_cb
和stream_stop_cb
调用分隔)。
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
参数类似于stream_change_cb
回调。但是,由于需要一起执行通过外键连接的表上的TRUNCATE
操作,此回调接收关系数组,而不仅仅是单个关系。有关详细信息,请参阅TRUNCATE语句的描述。
49.6.5. 用于生成输出的函数#
为了实际生成输出,输出插件可以在begin_cb
、commit_cb
或change_cb
回调中将数据写入StringInfo
输出缓冲区ctx->out
中。在写入输出缓冲区之前,必须调用OutputPluginPrepareWrite(ctx, last_write)
,在完成写入缓冲区后,必须调用OutputPluginWrite(ctx, last_write)
来执行写入。*last_write
*参数指示特定写入是否为回调的最后写入。
以下示例展示了如何向输出插件的使用者输出数据
OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);