31.2. 订阅#
订阅是逻辑复制的下游端。定义订阅的节点称为订阅者。订阅定义了与另一个数据库的连接,以及它想要订阅的一组发布(一个或多个)。
订阅者数据库的行为与任何其他 PostgreSQL 实例相同,并且可以通过定义自己的发布来用作其他数据库的发布者。
如果需要,订阅者节点可以有多个订阅。可以在单个发布者-订阅者对之间定义多个订阅,在这种情况下,必须注意确保订阅的发布对象不重叠。
每个订阅都将通过一个复制槽接收更改(请参见第 27.2.6 节)。可能需要额外的复制槽来对已存在表数据的初始数据进行同步,并且这些数据将在数据同步结束时删除。
逻辑复制订阅可以成为同步复制的备用(参见第 27.2.8 节)。备用名称默认为订阅名称。可以在订阅的连接信息中将备用名称指定为application_name
。
如果当前用户是超级用户,则pg_dump
会转储订阅。否则,会写入警告并跳过订阅,因为非超级用户无法从pg_subscription
目录中读取所有订阅信息。
使用CREATE SUBSCRIPTION
添加订阅,并且可以使用ALTER SUBSCRIPTION
命令随时停止/恢复订阅,并使用DROP SUBSCRIPTION
删除订阅。
删除并重新创建订阅时,同步信息会丢失。这意味着之后必须重新同步数据。
不会复制架构定义,已发布的表必须存在于订阅者上。只有常规表可以成为复制的目标。例如,无法复制到视图。
表在发布者和订阅者之间使用完全限定的表名进行匹配。不支持复制到订阅者上名称不同的表。
表中的列也按名称进行匹配。订阅者表中列的顺序无需与发布者表中列的顺序匹配。列的数据类型无需匹配,只要数据的文本表示形式可以转换为目标类型即可。例如,可以从类型为integer
的列复制到类型为bigint
的列。目标表还可以有已发布表未提供的其他列。任何此类列都将填充目标表定义中指定的默认值。但是,二进制格式的逻辑复制有更多限制。有关详细信息,请参阅CREATE SUBSCRIPTION
的binary
选项。
31.2.1. 复制槽管理#
如前所述,每个(活动的)订阅都会从远程(发布)端的复制槽接收更改。
其他表同步槽通常是临时的,在内部创建用于执行初始表同步,并在不再需要时自动删除。这些表同步槽具有生成的名称:“pg_%u_sync_%u_%llu
”(参数:订阅*oid
、表relid
、系统标识符sysid
*)
通常,在使用CREATE SUBSCRIPTION
创建订阅时会自动创建远程复制槽,并在使用DROP SUBSCRIPTION
删除订阅时自动删除远程复制槽。但是,在某些情况下,单独操作订阅和底层复制槽可能很有用或很有必要。以下是一些场景
创建订阅时,复制槽已存在。在这种情况下,可以使用
create_slot = false
选项创建订阅,以关联到现有槽。创建订阅时,无法访问远程主机或其状态不明确。在这种情况下,可以使用
connect = false
选项创建订阅。然后,根本不会联系远程主机。这是 pg_dump 所使用的。然后必须在激活订阅之前手动创建远程复制槽。删除订阅时,应保留复制槽。当订阅者数据库被移动到不同的主机并从那里激活时,这可能很有用。在这种情况下,在尝试删除订阅之前,使用
ALTER SUBSCRIPTION
从订阅中分离槽。删除订阅时,无法访问远程主机。在这种情况下,在尝试删除订阅之前,使用
ALTER SUBSCRIPTION
从订阅中分离槽。如果远程数据库实例不再存在,则无需执行进一步的操作。但是,如果远程数据库实例只是无法访问,则应手动删除复制槽(以及任何仍剩余的表同步槽);否则,它/它们将继续保留 WAL,并最终可能导致磁盘空间用尽。应仔细调查此类情况。
31.2.2. 示例:设置逻辑复制#
在发布者上创建一些测试表。
test_pub=# CREATE TABLE t1(a int, b text, PRIMARY KEY(a));
CREATE TABLE
test_pub=# CREATE TABLE t2(c int, d text, PRIMARY KEY(c));
CREATE TABLE
test_pub=# CREATE TABLE t3(e int, f text, PRIMARY KEY(e));
CREATE TABLE
在订阅者上创建相同的表。
test_sub=# CREATE TABLE t1(a int, b text, PRIMARY KEY(a));
CREATE TABLE
test_sub=# CREATE TABLE t2(c int, d text, PRIMARY KEY(c));
CREATE TABLE
test_sub=# CREATE TABLE t3(e int, f text, PRIMARY KEY(e));
CREATE TABLE
在发布者端向表中插入数据。
test_pub=# INSERT INTO t1 VALUES (1, 'one'), (2, 'two'), (3, 'three');
INSERT 0 3
test_pub=# INSERT INTO t2 VALUES (1, 'A'), (2, 'B'), (3, 'C');
INSERT 0 3
test_pub=# INSERT INTO t3 VALUES (1, 'i'), (2, 'ii'), (3, 'iii');
INSERT 0 3
为表创建发布。发布pub2
和pub3a
不允许某些publish
操作。发布pub3b
有一个行过滤器(参见第 31.3 节)。
test_pub=# CREATE PUBLICATION pub1 FOR TABLE t1;
CREATE PUBLICATION
test_pub=# CREATE PUBLICATION pub2 FOR TABLE t2 WITH (publish = 'truncate');
CREATE PUBLICATION
test_pub=# CREATE PUBLICATION pub3a FOR TABLE t3 WITH (publish = 'truncate');
CREATE PUBLICATION
test_pub=# CREATE PUBLICATION pub3b FOR TABLE t3 WHERE (e > 5);
CREATE PUBLICATION
为发布创建订阅。订阅sub3
同时订阅pub3a
和pub3b
。默认情况下,所有订阅都将复制初始数据。
test_sub=# CREATE SUBSCRIPTION sub1
test_sub-# CONNECTION 'host=localhost dbname=test_pub application_name=sub1'
test_sub-# PUBLICATION pub1;
CREATE SUBSCRIPTION
test_sub=# CREATE SUBSCRIPTION sub2
test_sub-# CONNECTION 'host=localhost dbname=test_pub application_name=sub2'
test_sub-# PUBLICATION pub2;
CREATE SUBSCRIPTION
test_sub=# CREATE SUBSCRIPTION sub3
test_sub-# CONNECTION 'host=localhost dbname=test_pub application_name=sub3'
test_sub-# PUBLICATION pub3a, pub3b;
CREATE SUBSCRIPTION
请注意,无论发布的publish
操作如何,都会复制初始表数据。
test_sub=# SELECT * FROM t1;
a | b
---+-------
1 | one
2 | two
3 | three
(3 rows)
test_sub=# SELECT * FROM t2;
c | d
---+---
1 | A
2 | B
3 | C
(3 rows)
此外,由于初始数据复制会忽略publish
操作,并且由于发布pub3a
没有行过滤器,这意味着复制的表t3
包含所有行,即使它们与发布pub3b
的行过滤器不匹配。
test_sub=# SELECT * FROM t3;
e | f
---+-----
1 | i
2 | ii
3 | iii
(3 rows)
在发布者端向表中插入更多数据。
test_pub=# INSERT INTO t1 VALUES (4, 'four'), (5, 'five'), (6, 'six');
INSERT 0 3
test_pub=# INSERT INTO t2 VALUES (4, 'D'), (5, 'E'), (6, 'F');
INSERT 0 3
test_pub=# INSERT INTO t3 VALUES (4, 'iv'), (5, 'v'), (6, 'vi');
INSERT 0 3
现在发布者端数据如下所示
test_pub=# SELECT * FROM t1;
a | b
---+-------
1 | one
2 | two
3 | three
4 | four
5 | five
6 | six
(6 rows)
test_pub=# SELECT * FROM t2;
c | d
---+---
1 | A
2 | B
3 | C
4 | D
5 | E
6 | F
(6 rows)
test_pub=# SELECT * FROM t3;
e | f
---+-----
1 | i
2 | ii
3 | iii
4 | iv
5 | v
6 | vi
(6 rows)
请注意,在正常复制期间,将使用适当的publish
操作。这意味着发布pub2
和pub3a
不会复制INSERT
。此外,发布pub3b
仅会复制与pub3b
的行过滤器匹配的数据。现在订阅者端数据如下所示
test_sub=# SELECT * FROM t1;
a | b
---+-------
1 | one
2 | two
3 | three
4 | four
5 | five
6 | six
(6 rows)
test_sub=# SELECT * FROM t2;
c | d
---+---
1 | A
2 | B
3 | C
(3 rows)
test_sub=# SELECT * FROM t3;
e | f
---+-----
1 | i
2 | ii
3 | iii
6 | vi
(4 rows)
31.2.3. 示例:延迟复制槽创建#
在某些情况下(例如第 31.2.1 节),如果未自动创建远程复制槽,则用户必须在激活订阅之前手动创建它。以下示例中显示了创建槽和激活订阅的步骤。这些示例指定了内置逻辑复制使用的标准逻辑解码输出插件 (pgoutput
)。
首先,为示例创建一个发布。
test_pub=# CREATE PUBLICATION pub1 FOR ALL TABLES;
CREATE PUBLICATION
示例 1:订阅中connect = false
创建订阅。
test_sub=# CREATE SUBSCRIPTION sub1 test_sub-# CONNECTION 'host=localhost dbname=test_pub' test_sub-# PUBLICATION pub1 test_sub-# WITH (connect=false); WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. CREATE SUBSCRIPTION
在发布者上,手动创建一个槽。由于在
CREATE SUBSCRIPTION
期间未指定名称,因此要创建的槽的名称与订阅名称相同,例如“sub1”。test_pub=# SELECT * FROM pg_create_logical_replication_slot('sub1', 'pgoutput'); slot_name | lsn -----------+----------- sub1 | 0/19404D0 (1 row)
在订阅者上,完成订阅激活。此后,
pub1
的表将开始复制。test_sub=# ALTER SUBSCRIPTION sub1 ENABLE; ALTER SUBSCRIPTION test_sub=# ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; ALTER SUBSCRIPTION
示例 2:订阅中connect = false
,但还指定了slot_name
选项。
创建订阅。
test_sub=# CREATE SUBSCRIPTION sub1 test_sub-# CONNECTION 'host=localhost dbname=test_pub' test_sub-# PUBLICATION pub1 test_sub-# WITH (connect=false, slot_name='myslot'); WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. CREATE SUBSCRIPTION
在发布者上,使用在
CREATE SUBSCRIPTION
期间指定的相同名称手动创建一个槽,例如“myslot”。test_pub=# SELECT * FROM pg_create_logical_replication_slot('myslot', 'pgoutput'); slot_name | lsn -----------+----------- myslot | 0/19059A0 (1 row)
在订阅者上,剩余的订阅激活步骤与之前相同。
test_sub=# ALTER SUBSCRIPTION sub1 ENABLE; ALTER SUBSCRIPTION test_sub=# ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; ALTER SUBSCRIPTION
示例 3:订阅指定slot_name = NONE
创建订阅。当
slot_name = NONE
时,还需要enabled = false
和create_slot = false
。test_sub=# CREATE SUBSCRIPTION sub1 test_sub-# CONNECTION 'host=localhost dbname=test_pub' test_sub-# PUBLICATION pub1 test_sub-# WITH (slot_name=NONE, enabled=false, create_slot=false); CREATE SUBSCRIPTION
在发布者上,使用任意名称手动创建一个槽,例如“myslot”。
test_pub=# SELECT * FROM pg_create_logical_replication_slot('myslot', 'pgoutput'); slot_name | lsn -----------+----------- myslot | 0/1905930 (1 row)
在订阅者上,将订阅与刚刚创建的槽名称关联。
test_sub=# ALTER SUBSCRIPTION sub1 SET (slot_name='myslot'); ALTER SUBSCRIPTION
其余订阅激活步骤与之前相同。
test_sub=# ALTER SUBSCRIPTION sub1 ENABLE; ALTER SUBSCRIPTION test_sub=# ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; ALTER SUBSCRIPTION