Skip to content

📚 Create Publication and Subscription

🔍 Introduction logical replication:

alt text

Logical replication is a method of replicating data objects and their changes based on the replication identifier (usually the primary key) of the data object. Logical replication allows for finer grained control over data replication and security

Logical replication Usage scenarios: 1、Merge data from multiple databases into a single database for data analysis in a data warehouse 2、Logical replication does not rely on version restrictions and can perform data replication between PG databases of different major versions 3、Sharing partial data among multiple databases

The advantages of logical replication: 1、Can cross database versions 2、When publishing, insert can be supported delete、update、truncate 3、A database can have multiple publications, ensuring that the publications do not have duplicate names 4、A publication allows multiple subscribers 5、Allow all tables to be published at once or only partial tables to be published

The disadvantages of logical replication: Logical replication only has global switches The table name and schema for subscription publishing must be the same Need to create corresponding schema and table on the subscription end first Only supports replication DML operations, not DDL operations. If you want to make DDL changes, you need to first execute DDL on the > subscription side, and then execute DDL on the publishing side Copying can only be from base table to base table, and does not support views, materialized views, external tables, etc. If the table > is a partitioned table, it needs to be replicated based on partitions

Limitations on Logical Publishing: The source database user for logical replication must have the replication or superuser role The subscription side can have extra columns, but all columns on the publishing side must have The prerequisite for logical replication is to set the wal_level of the publishing database to logical At the same time, ensure that there are sufficient max_logical_replication_workers and max_wal_senders Ensure that the subscription segment can be connected to the source database through the stream replication protocol (pg_hba.conf) When publishing with delete and update operations, the table must set replica identity to identify old rows (pk, uk, full) For tables without primary/unique keys in logical synchronization, only insert operations are allowed on the publishing side, and > delete and update operations cannot be performed

There are two models for logical replication:

Publication

Nodes that have published are called publishers,Publishing is a collection of changes generated from a table or a set of tables, and can also be described as changing or copying a collection

Subscription

Nodes that have subscriptions are called subscriber,A subscription will define a connection to another database and the collection of publications it wants to subscribe to,Each subscription will receive changes through a replication slot.The initial data synchronization process of pre-existing table data may require additional temporary replication slots,When a subscription is deleted and rebuilt, synchronization information will be lost. This means that the data must be resynchronized

🔍 Create publication syntax:

CREATE PUBLICATION name
    [ FOR TABLE [ ONLY ] table_name [ * ] [, ...]
      | FOR ALL TABLES ]
    [ WITH ( publication_parameter [= value] [, ... ] ) ]

alt text

exapmle: Create a publication to publish all changes in two tables: CREATE PUBLICATION mypublication FOR TABLE users, departments;

Create a publication to publish all changes in all tables: CREATE PUBLICATION all tables FOR ALL TABLES;

Create a publication that only publishes the POST operation in one table: CREATE PUBLICATION insert_only FOR TABLE mydata WITH (publish = 'insert');

🔍 Modify publication syntax:

ALTER PUBLICATION name ADD TABLE [ ONLY ] table_name [ * ] [, ...]
ALTER PUBLICATION name SET TABLE [ ONLY ] table_name [ * ] [, ...]
ALTER PUBLICATION name DROP TABLE [ ONLY ] table_name [ * ] [, ...]
ALTER PUBLICATION name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
ALTER PUBLICATION name RENAME TO new_name

alt text

exapmle: Change the publication to publish only deletes and updates: ALTER PUBLICATION noinsert SET (publish = 'update, delete');

Add some tables to the publication: ALTER PUBLICATION mypublication ADD TABLE users, departments;

🔍 Create subscription syntax:

CREATE SUBSCRIPTION subscription_name
    CONNECTION 'conninfo'
    PUBLICATION publication_name [, ...]
    [ WITH ( subscription_parameter [= value] [, ... ] ) ]

alt text

exapmle: Create a subscription to a remote server, replicate the tables in insert_only publication, and do not start replication until replication is enabled later: CREATE SUBSCRIPTION mysub CONNECTION 'host=192.168.1.50 port=5432 user=postgres password=****** dbname=foodb' PUBLICATION insert_only WITH (enabled = false);

🔍 Modify subscription syntax:

ALTER SUBSCRIPTION name CONNECTION 'conninfo'
ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] [ WITH ( set_publication_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name REFRESH PUBLICATION [ WITH ( refresh_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name ENABLE
ALTER SUBSCRIPTION name DISABLE
ALTER SUBSCRIPTION name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
ALTER SUBSCRIPTION name RENAME TO new_name

alt text

1. Setup Publication & Subscription

  • **1.1 Modify configuration file on publication and subscription:
vim /pgdata/Clustername/postgresql.conf
max_worker_processes = 100  #Set the maximum number of background processes that the system can support
max_logical_replication_workers = 100  #Specify the maximum number of logical replication workers
max_replication_slots = 100  #Specify the maximum number of replication slots that the server can support
max_sync_workers_per_subscription = 25  #The maximum number of synchronization workers per subscription

alt text

  • **1.2 Modify configuration file on publication(Add Subscription User&IP):
vim /pgdata/Clustername/pg_hba.conf
host    replication     postgres        10.41.246.144/24        trust

alt text

  • **1.3 Modify wal_level on publication
vim /pgdata/Clustername/postgresql.conf
wal_level = logical

alt text

  • **1.4 Reload Cluster
pg_ctl reload -D /pgdata/Clustername
  • **1.5 Create Publication
create publication pub_fa131 for table fa131.tablename;

alt text

Note: Tables that need to be published under the same schema in WZS should be created under the same publication to distinguish them pub_fa131: Name the publication as pub_ {schema} fa131: Name the schema for publishing tablename: For tables that need to be published (copied)

  • **1.6 Create Subscription
create subscription sub_fa131 connection 'host=XXX dbname=MESP131 user=postgres password=****** port=5432' publication pub_fa131 WITH   (connect = true, enabled = true, create_slot = true, slot_name = sub_fa131, synchronous_commit = 'off');

alt text

sub_fa131:Name the subscription as sub_ {schema} host:For the IP address of the publishing end dbname:Name the DB for the publishing end user/password:To support logical replication of accounts/passwords port:For the port of the publishing end pub_fa131:Specify which publication is associated with the publishing end sub_fa131:Slot name

  • **1.7 Add table to publication
ALTER PUBLICATION pub_fa131 ADD TABLE fa131.mesupnroute;
  • **1.8 drop Table from publication
ALTER PUBLICATION pub_fa131 DROP TABLE fa131.mesupnroute;
  • **1.9 Refresh subscription
Alter subscription sub_fa131 refresh publication;

2. Monitor script Setup

  • **2.1 monitor_rep.sh(On Publication)
vim /pgdata/wistron/script/MESP130/monitor_rep.sh
#!/bin/sh
ource ~/.bash_profile
##设置需要备份DB的标识###
ID=MESP130
##设置PGDATA环境变量###
gdata=/pgdata/$SID
##设置端口号###
ort=`cat $pgdata/postmaster.pid|sed -n 4p`
##设置脚本日志存放路径###
criptfolder=/pgdata/wistron/script/$SID
ow=`date "+%Y-%m-%d %H:%M:%S"`
apcounts=0

sql -p $port -c "select slot_name,plugin,slot_type,database,active,xmin,catalog_xmin,pg_wal_lsn_diff(pg_current_wal_insert_lsn(),  estart_lsn) restart_delay,to_timestamp(to_char((pg_stat_file(current_setting('data_directory') || '/pg_wal/' || pg_walfile_name  restart_lsn))).modification, 'YYYY-MM-DD HH24:MI:SS.MSOF'),'yyyy-mm-dd hh24:MI:SS'),round(cast(date_part('epoch', to_timestamp(now() :text,'yyyy-mm-dd hh24:MI:SS') - to_timestamp(to_char((pg_stat_file(current_setting('data_directory') || '/pg_wal/' || pg_walfile_name  restart_lsn))).modification, 'YYYY-MM-DD HH24:MI:SS.MSOF'),'yyyy-mm-dd hh24:MI:SS'))/60 as numeric),0) as \"gap(min)\" from  g_replication_slots;" > $scriptfolder/monitor_rep_"$port".log

f [ -s /pgdata/wistron/script/MESP130/monitor_rep_"$port".log ]
hen
for a in `cat -n /pgdata/wistron/script/MESP130/monitor_rep_"$port".log|grep logical|awk '{print $1}'`; do #进入循环
db=`cat /pgdata/wistron/script/MESP130/monitor_rep_"$port".log|awk '{print $7}'|sed -n "$a"p`
slotname=`cat /pgdata/wistron/script/MESP130/monitor_rep_"$port".log|awk '{print $1}'|sed -n "$a"p`
gapmin=`cat /pgdata/wistron/script/MESP130/monitor_rep_"$port".log|awk '{print $19}'|sed -n "$a"p`
gapsize=`cat /pgdata/wistron/script/MESP130/monitor_rep_"$port".log|awk '{print $14}'|sed -n "$a"p`

 if [ "${gapmin}" -gt 10 ];then
 gapcounts=$(expr $gapcounts + 1)
 fi

done
fi

if [ "${gapcounts}" -gt 0 ];then
/bin/echo -e "Replication Check Failed in $now ! DB port is $port" >> /pgdata/wistron/script/MESP130/monitor_rep_"$port"_status.log
/bin/mail -s "Replication Check Failed in $now ! DB port is $port" -r "SE_PG_Replication_Check" "mzl320.wzs.wistron@wistron.com" < / pgdata/wistron/script/MESP130/monitor_rep_"$port".log
else
echo -e "Replication Check Success in $now ! DB port is $port" >> /pgdata/wistron/script/MESP130/monitor_rep_"$port"_status.log
fi
crontab -l
############Replication monitor###############
*/15 * * * * sh /pgdata/wistron/script/MESP130/monitor_rep.sh

alt text

alt text

  • **2.2 subscription_check.sh(On subscription)
vim /pgdata/wistron/script/REP130/subscription_check.sh
#!/bin/bash
now=`/usr/bin/date "+%Y-%m-%d %H:%M:%S"`
now2=`/usr/bin/date "+%Y-%m-%d"`
pgsoft=/usr/pgsql-13/bin
port=6434
pgdata=/pgdata/REP130
scriptfolder=/pgdata/wistron/script/REP130
$pgsoft/pg_isready -p $port > $scriptfolder/pg_response_"$port"_tmp_status.txt
pgstatus=`/usr/bin/cat $scriptfolder/pg_response_"$port"_tmp_status.txt|/usr/bin/awk '{print $3 " " $4}'`
currentlog=`/usr/bin/cat $pgdata/current_logfiles|/usr/bin/awk '{print $2}'`

######根据DB状态检查订阅检查记录######
if [[ "${pgstatus}" = "accepting connections" ]];then
$pgsoft/psql -p $port -A -F ' ' -t -c "select pg_stat_subscription.subname,datname,pid from pg_stat_subscription,pg_SUBSCRIPTION, pg_database where pg_stat_subscription.subid=pg_SUBSCRIPTION.oid and pg_database.oid=pg_SUBSCRIPTION.subdbid;" > $scriptfolder/  sub_status_"$port".txt
######输出订阅检查记录日志######
echo "订阅检查结果记录: $scriptfolder/sub_status_"$port".txt"

######循环判断复制槽状态######
for a in `/usr/bin/cat -n $scriptfolder/sub_status_"$port".txt|/usr/bin/awk '{print $1}'`; do #进入循环
nowsubname=`/usr/bin/cat $scriptfolder/sub_status_"$port".txt |/usr/bin/sed -n "$a"p|/usr/bin/awk '{print $1}'` #获取当前订阅名称
nowsubnamedb=`/usr/bin/cat $scriptfolder/sub_status_"$port".txt |/usr/bin/sed -n "$a"p|/usr/bin/awk '{print $2}'` #获取当前数据库名称
dbname=$nowsubnamedb
nowsubnamepid=`/usr/bin/cat $scriptfolder/sub_status_"$port".txt |/usr/bin/sed -n "$a"p|/usr/bin/awk '{print $3}'` #获取当前pid

######根据数据库指定通知邮件人######
if [[ "${dbname}" = "REP130" ]];then
mailpic="mzl320.wzs.wistron@wistron.com,mzl120.wzs.wistron@wistron.com,Joyly_Hua@wistron.com,Robert_Pan@wistron.com,Jean_Ouyang@wistron.  com,Binge_Chen@wistron.com,junli2@wistronits.com"
else
mailpic="mzl320.wzs.wistron@wistron.com"
fi

######判断是否获取到pid号######
 if [ -z "$nowsubnamepid" ];then
 /usr/bin/echo "$nowsubname is not response on pid $nowsubnamepid , logfile path is $pgdata/$currentlog , Please Check !!! " >>   $scriptfolder/sub_status_"$port"_list.txt ##记录检查记录
 echo "订阅检查结果历史记录: $scriptfolder/sub_status_"$port"_list.txt"
 /usr/bin/cat $pgdata/$currentlog|/usr/bin/grep "$nowsubname" > $scriptfolder/tmpissuetable_"$port".txt ##获取日志中问题订阅出现的的最后一  次pid
 tmpissuepid=`tac $scriptfolder/tmpissuetable_"$port".txt|head -n 1|/usr/bin/awk '{printf $4}'` ##截取pid
 tmpissuepid2=`/usr/bin/echo ${tmpissuepid#*[}` ##截取pid
 issuepid=`/usr/bin/echo ${tmpissuepid2%]*}` ##截取pid

 /usr/bin/cat $pgdata/$currentlog|/usr/bin/grep "$now2"|/usr/bin/grep -E "\[$issuepid\]" > $scriptfolder/issuetable_"$port".txt
 tmpmiss=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "missing some replicated   columns"|/usr/bin/grep relation|wc -l` ##输出缺失列计数
 tmplong=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "value too long for type   character"|wc -l` ##输出栏位长度不匹配计数
 tmpconstraint=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "unique constraint"|wc   -l` ##输出违反唯一约束计数
 tmpnull=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "null value in column"|wc  -l` ##输出违反空值计数
 tmpbigint=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "invalid input syntax for"|  wc -l` ##输出字段类型不匹配计数
 tmpidentity=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "does not have REPLICA   IDENTITY FULL"|wc -l` ##输出缺失复制标识计数
 notable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "does not exist"|wc -l` ##输 出订阅未建表计数
 tmpoverflow=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "numeric field overflow"|  wc -l` ##输出精度不匹配计数
 tmpidentity2=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "publisher did not send   replica identity column"|wc -l` ##输出缺失复制标识计数
 tmppartition=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "no partition of  relation"|wc -l` ##输出缺失分区或关系计数

 echo "可能存在问题的pid进程号: $issuepid,当前日志路径: $pgdata/$currentlog"
 echo "缺失列计数 : $tmpmiss"
 echo "栏位长度不匹配计数 : $tmplong"
 echo "违反唯一约束计数 : $tmpconstraint"
 echo "违反空值计数 : $tmpnull"
 echo "字段类型不匹配计数 : $tmpbigint"
 echo "缺失复制标识计数 : $tmpidentity"
 echo "订阅未建表计数 : $notable"
 echo "订阅精度未匹配计数 : $tmpoverflow"
 echo "缺失复制标识计数 : $tmpidentity2"
 echo "缺失分区或关系计数 : $tmppartition"

  if [ "$tmpmiss" -gt 0 ];then ###存在缺失列计数进入判断并告知收件人
  tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
  tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
  issuetable=`/usr/bin/echo ${tmptable%%\"*}`
  /usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic"  < $scriptfolder/issuetable_"$port".txt

  elif [ "$tmplong" -gt 0 ];then ###存在栏位长度不匹配计数进入判断并告知收件人
  tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
  tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
  issuetable=`/usr/bin/echo ${tmptable%%\"*}`
  /usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic"  < $scriptfolder/issuetable_"$port".txt

  elif [ "$tmpconstraint" -gt 0 ];then ###存在违反唯一约束计数进入判断并告知收件人
  tmpconstraint=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "unique constraint"`
  tmpconstraint=`/usr/bin/echo ${tmpconstraint#*constraint \"}`
  constraint=`/usr/bin/echo ${tmpconstraint%\"*}`
  $pgsoft/psql -p $port -A -F ' ' -t -d $dbname -c "SELECT table_name FROM (SELECT c.connamespace::regnamespace::text as table_schema,  c.conrelid::regclass::text as table_name, con.column_name, c.conname as constraint_name, pg_get_constraintdef(c.oid) FROM  pg_constraint c JOIN pg_namespace ON pg_namespace.oid = c.connamespace JOIN pg_class ON c.conrelid = pg_class.oid LEFT JOIN  information_schema.constraint_column_usage con ON c.conname = con.constraint_name AND pg_namespace.nspname = con.constraint_schema   UNION ALL SELECT table_schema, table_name, column_name, NULL, 'NOT NULL' FROM information_schema.columns WHERE is_nullable = 'NO')  all_constraints WHERE table_schema NOT IN ('pg_/usr/bin/catalog', 'information_schema') and constraint_name='$constraint' limit 1;" >  $scriptfolder/tmptable_"$port".txt
  issuetable=`/usr/bin/cat $scriptfolder/tmptable_"$port".txt`
  echo "$constraint"
  echo "/usr/bin/cat $scriptfolder/tmptable_"$port".txt"
  /usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic"  < $scriptfolder/issuetable_"$port".txt

  elif [ "$tmpnull" -gt 0 ];then ###存在违反空值计数进入判断并告知收件人
  tmpnull=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "null value in column"`
  tmpnull=`/usr/bin/echo ${tmpnull#*null value in column \"}`
  null=`/usr/bin/echo ${tmpnull%%\"*}`
  $pgsoft/psql -p $port -A -F ' ' -t -d $dbname -c "select attrelid::regclass from pg_attribute where attname='$null' and attnotnull is   true and attrelid in (select srrelid from pg_subscription_rel where srsubid in (select subid from pg_stat_subscription where  subname='$nowsubname'));"  > $scriptfolder/tmptable_"$port".txt
  issuetable=`/usr/bin/cat $scriptfolder/tmptable_"$port".txt`
  /usr/bin/echo -e "\n\tPlease check the following list , maybe have null in table list" >> $scriptfolder/issuetable_"$port".txt
  /usr/bin/mail -s "Replication Check Failed ! DB name is $dbname" -r "SE_PG_Replication_Check" -a $scriptfolder/tmptable_"$port".txt   "$mailpic" < $scriptfolder/issuetable_"$port".txt

  elif [ "$tmpbigint" -gt 0 ];then ###存在字段类型不匹配计数进入判断并告知收件人
  tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
  tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
  issuetable=`/usr/bin/echo ${tmptable%%\"*}`
  /usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic"  < $scriptfolder/issuetable_"$port".txt

  elif [ "$tmpidentity" -gt 0 ];then ###存在缺失复制标识计数进入判断并告知收件人
  tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
  tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
  issuetable=`/usr/bin/echo ${tmptable%%\"*}`
  /usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic"  < $scriptfolder/issuetable_"$port".txt

  elif [ "$notable" -gt 0 ];then ###存在缺失复制表计数进入判断并告知收件人
  tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
  tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
  issuetable=`/usr/bin/echo ${tmptable%%\"*}`
  /usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic"  < $scriptfolder/issuetable_"$port".txt

  elif [ "$tmpoverflow" -gt 0 ];then ###存在精度未匹配计数进入判断并告知收件人
  tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
  tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
  issuetable=`/usr/bin/echo ${tmptable%%\"*}`
  /usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic"  < $scriptfolder/issuetable_"$port".txt

  elif [ "$tmpidentity2" -gt 0 ];then ###存在缺失复制标识计数进入判断并告知收件人
  tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
  tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
  issuetable=`/usr/bin/echo ${tmptable%%\"*}`
  /usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic"  < $scriptfolder/issuetable_"$port".txt

  elif [ "$tmppartition" -gt 0 ];then ###存在缺失分区或关系计数进入判断并告知收件人
  tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
  tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
  issuetable=`/usr/bin/echo ${tmptable%%\"*}`
  /usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic"  < $scriptfolder/issuetable_"$port".txt

  else ###未发现上述定义错误发送邮件通知
  /usr/bin/mail -s "Replication Check Failed ! Unknown , Please Check LOG $pgdata/$currentlog , DB name is $dbname" -r  "SE_PG_Replication_Check" "mzl320.wzs.wistron@wistron.com"  < $scriptfolder/sub_status_"$port".txt
  fi

 else
 /usr/bin/echo "$dbname logical replication $nowsubname is runing on pid $nowsubnamepid , Check success in $now !!! " >> $scriptfolder/ sub_status_"$port"_list.txt
 fi

done
/usr/bin/echo "--------------------------------------------------------------------------------" >> $scriptfolder/  sub_status_"$port"_list.txt

else
/usr/bin/mail -s "DB response failed . Please check it , $now ! DB Port is $port" -r "SE_PG_Response_Check" "mzl320.wzs.wistron@wistron.  com" < "$scriptfolder/pg_response_"$port"_tmp_status.txt"
fi
crontab -l
############Replication table check###############
*/10 * * * * sh /pgdata/wistron/script/REP130/subscription_check.sh

alt text

alt text

3. Issue share

  • **3.1 Without a primary key constraint, data can be inserted and cannot be deleted or modified. Please inform the User. If there is no primary key, use the following command to use all columns of the table as identifiers (executed on the publishing side)

alt text

ALTER TABLE fa130.meslotno REPLICA IDENTITY FULL;
  • **3.2 Data cannot be synchronized. According to the PG database log on the subscription end, Column is missing. Please ask SFCS/AP Users to fill in the missing column on the subscription end

alt text

  • **3.3 Processes not enough

alt text

  • **3.4 Different constraints

alt text

  • **3.4 Have Lock

alt text

4. Recreate Subscription

  • **4.1 Keep create publication SQL(Publication,Use pgadmin to record)
CREATE PUBLICATION pub_cim138
  FOR TABLE cim138.cimaltitem, cim138.cimapgroup, cim138.cimapimage, cim138.cimapini, cim138.cimapiniworkstation, cim138.cimapmapping, cim138.cimapplication, cim138.cimapuser, cim138.cimapversion, cim138.cimbgbase, cim138.cimbomlocation, cim138.cimcarrysystem, cim138.cimcategory, cim138.cimcategorymapping, cim138.cimcausestationstage, cim138.cimcompareitem, cim138.cimcomparesetting, cim138.cimconfiguration, cim138.cimconfigurationitem, cim138.cimcpninfo, cim138.cimcpnrohsapprove, cim138.cimcpnrule, cim138.cimcpnseq, cim138.cimcpnstatus, cim138.cimcsnrule, cim138.cimcustomererrorcode, cim138.cimcustomererrortype, cim138.cimcustomize, cim138.cimdepartment, cim138.cimderivecategory, cim138.cimderiveitem, cim138.cimdiybasedata, cim138.cimdiybasedatacrictl, cim138.cimdiybasedataeditctl, cim138.cimdynamicquery, cim138.cimdynamicquerydetail, cim138.cimerrorcode, cim138.cimesop, cim138.cimexceptionroute, cim138.cimexcludeitem, cim138.cimgateway, cim138.cimgreenfactor, cim138.cimgroup, cim138.cimid, cim138.cimidlist, cim138.cimidrange, cim138.cimidrangelist, cim138.cimidspecification, cim138.cimidvalue, cim138.cimlabeljobsetting, cim138.cimline, cim138.cimlogon, cim138.cimmaterial, cim138.cimmaterialgroup, cim138.cimmessage, cim138.cimmessagemapping, cim138.cimmodel, cim138.cimmodwnlog, cim138.cimmodwnlogdetail, cim138.cimmotype, cim138.cimoperation, cim138.cimpassword, cim138.cimportallayout, cim138.cimprofitcenter, cim138.cimreasoncode, cim138.cimreasoncodemapstage, cim138.cimreasontype, cim138.cimreportfilter, cim138.cimreporthead, cim138.cimroute, cim138.cimroutemap, cim138.cimsampling, cim138.cimsamplingdetail, cim138.cimsamplingrate, cim138.cimsngenlog, cim138.cimsngenlogdetail, cim138.cimsnrule, cim138.cimsnrulecarryinfo, cim138.cimspecifyitem, cim138.cimstage, cim138.cimupninfo, cim138.cimupnpackage, cim138.cimupntestroute, cim138.cimuser, cim138.cimuseraddinfo, cim138.cimuserapusehistory, cim138.cimusergrid, cim138.cimusergroup, cim138.cimusermenu, cim138.cimworkcentermap, cim138.cimworkstation, cim138.cimxraytype
  WITH (publish = 'insert, update, delete, truncate', publish_via_partition_root = true);

CREATE PUBLICATION pub_fa138
  FOR TABLE fa138.mesusn_qis, fa138.doausndefect, fa138.doausnrepair, fa138.doausnrepairitem, fa138.mescarton, fa138.mescartonitem, fa138.mescheckinout, fa138.meserpgmreasoncode, fa138.meserpmo, fa138.meserpmoitem, fa138.meserppalletitem, fa138.mesktloutevent, fa138.mesmocsn, fa138.mesmoidready, fa138.mesmoinfo, fa138.mesmopickinglist, fa138.mesmorework, fa138.mesmoreworklist, fa138.mesmousnvice, fa138.mesmoweightchange, fa138.mesoob_inspections, fa138.mesoob_modelupn, fa138.mesoob_usndefect, fa138.mesortusn, fa138.mespallet, fa138.mespalletitem, fa138.mesptlrack, fa138.messnrange, fa138.messubmoitem, fa138.messubusn, fa138.messubusnitem, fa138.mestemplocation, fa138.mestransactioncache, fa138.mestransaction_retest, fa138.mesupnweight, fa138.mesusnepc, fa138.mesusnid, fa138.mesusnroutechange, fa138.mesusnscrap, fa138.mesusn, fa138.mestransaction, fa138.mesusninfo, fa138.mesesddata, fa138.mescausestationtransactionnifi, fa138.sandra_checkovertimelinenifi, fa138.mescausestationtransaction, fa138.mesimtransaction, fa138.mesbuyauomaterial, fa138.mesbuyauomaterialdetail, fa138.auo_spl_chiphist, fa138.mesbuyauomaterialarrive, fa138.mesauooobdefectloc, fa138.mesauocelljudge, fa138.mesbuyauomaterialscrap, fa138.mesbuyauomaterialdefectplt, fa138.mesbuyauomaterialdefect, fa138.mesauofilelog, fa138.mesauofilelogdetail, fa138.mesauolabel, fa138.mesauolabelitem, fa138.mesauobrisamo, fa138.auo_ct_code_temp, fa138.auo_ct_code_old_temp, fa138.mesauoreportseq, fa138.auo_targetgrade, fa138.mesusnitem_auo, fa138.mesauojobresend, fa138.mescsotlabel, fa138.meslcmcsotheader, fa138.meslcmcsotdetail, fa138.meslcmfilelog, fa138.meslcmfilelogdetail, fa138.meslcmwoinfo, fa138.mesfilmmaterialbase, fa138.mesfilmweight, fa138.mesfilmcuttingfeedstock, fa138.mesfilmcuttingfeedstocklog, fa138.mesfilmcuttingoffline, fa138.mesfilmcutting, fa138.mesfilmcuttingonline, fa138.mesfilmmaterialusedrecord, fa138.mesusnoemctcode, fa138.customer_defectcode_mapping
  WITH (publish = 'insert, update, delete, truncate', publish_via_partition_root = true);

CREATE PUBLICATION pub_pcb138
  FOR TABLE pcb138.mesusn_qis, pcb138.dfimbdefectnum, pcb138.doausndefect, pcb138.doausnrepair, pcb138.doausnrepairitem, pcb138.mesadvshipment, pcb138.mescarton, pcb138.mescartoncache, pcb138.mescartonitem, pcb138.mescausestationtransaction, pcb138.mescheckinout, pcb138.mescleanmblog, pcb138.meserpgmreasoncode, pcb138.meserpmo, pcb138.meserpmoitem, pcb138.meserppalletitem, pcb138.mesmocsn, pcb138.mesmoidready, pcb138.mesmoinfo, pcb138.mesmopickinglist, pcb138.mesmorework, pcb138.mesmoreworklist, pcb138.mesmousnvice, pcb138.mesmoweightchange, pcb138.mespallet, pcb138.mespalletitem, pcb138.mesrepairlocrestrict, pcb138.messnrange, pcb138.messpilog, pcb138.messpilogsum, pcb138.messubmoitem, pcb138.mestemplocation, pcb138.mestransactioncache, pcb138.mesupnteststandardtime, pcb138.mesupntimerestrict, pcb138.mesusndefectloc, pcb138.mesusnforcestorein, pcb138.mesusnid, pcb138.mesusnpacked, pcb138.mesusnroutechange, pcb138.mesusnscrap, pcb138.sctstencil, pcb138.sctstencilmodel, pcb138.sctstenciltransfer, pcb138.sctstencil_upn, pcb138.mesusn, pcb138.mestransaction, pcb138.mesmo, pcb138.mesmoitem, pcb138.mesusnitem, pcb138.mesusninfo
  WITH (publish = 'insert, update, delete, truncate', publish_via_partition_root = true);

CREATE PUBLICATION pub_pp138
  FOR TABLE pp138.dpmfpyr, pp138.dpmoee, pp138.dpmoeechargehour, pp138.dpmoeedetail, pp138.dpmoeeissue, pp138.dpmoeeissuegroup, pp138.dpmoeelosshour, pp138.dpmpdtissues, pp138.dpmpdtissuesgroupdetail, pp138.dpmproductivity, pp138.dpmproductivitydetail, pp138.dpmuph, pp138.dpmupph, pp138.dpmupphn, pp138.dpmupphndetail, pp138.dpmyrissue, pp138.dpmyrusndetail, pp138.ppflinestopdetail, pp138.ppflinestophead, pp138.ppfstdtimehead, pp138.ppfstopcode, pp138.uppuhinfo, pp138.nppbasedata, pp138.nppgenlog, pp138.nppmanualmo, pp138.nppofflineratio, pp138.dpmfpyrbymodel, pp138.dpmfpyrperiodbase, pp138.dpmerrlog, pp138.dpmyrissuelog, pp138.dpmupphnbyupn, pp138.ppsmtstandardtimedetail, pp138.ppsmtstandardtimehead, pp138.ppfstdtimedetail, pp138.ppfparentchildset, pp138.mifpyrhourly, pp138.ppfshiftid, pp138.ppftustdtimedetail, pp138.syncidcontrol, pp138.ppctaclinestate, pp138.ppctacplcnode_temp, pp138.att_station_audit, pp138.dccesoprequest, pp138.att_station_of_post, pp138.att_station_post_skill, pp138.dccesop, pp138.epmrfiddata, pp138.att_employee_authority, pp138.att_station, pp138.att_skill, pp138.dccesopdetail, pp138.epmshiftnifi, pp138.epmstationskillnifi, pp138.epmstationempskillnifi, pp138.linekbdetail, pp138.epmusnmappingnifi
  WITH (publish = 'insert, update, delete, truncate', publish_via_partition_root = true);

alt text

  • **4.2 Check if there are any related triggers in the subscription table and if the trigger activation mode is A or R(execute on subscripton,A represents always, R represents Replica; Regardless of how the data trigger is inserted for representative A, it will still trigger, and it is necessary to evaluate whether it can be truncated; For R, it will only be triggered when the data source is logical replication, and can be directly deleted)
select * from pg_trigger where tgrelid in (select srrelid from pg_subscription_rel) and tgenabled in ('A','R');
  • **4.3 Keep create subscription SQL(execute on subscription,Use pgadmin to record,Because the copied file does not contain a password, it is necessary to manually add the password field 'password=XXX' in the options, and to rewrite 'creat_stlot=true')
CREATE SUBSCRIPTION sub_cim138
    CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
    PUBLICATION pub_cim138
    WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_cim138, synchronous_commit = 'off');

CREATE SUBSCRIPTION sub_fa138
    CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
    PUBLICATION pub_fa138
    WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_fa138, synchronous_commit = 'off');

CREATE SUBSCRIPTION sub_pcb138
    CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
    PUBLICATION pub_pcb138
    WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_pcb138, synchronous_commit = 'off');

CREATE SUBSCRIPTION sub_pp138
    CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
    PUBLICATION pub_pp138
    WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_pp138, synchronous_commit = 'off');

alt text

  • **4.4 SELETE Truncate Table SQL And record (Execute on subscriptio,Remember to execute on the subscription end. If there are triggers A and R, please contact SFCS to confirm the table and delete the table data)
select 'truncate ' || srrelid::regclass || ';' from pg_subscription_rel;
  • **4.5 drop subscription(execute on subscription)
drop subscription sub_cim138 cascade;
drop subscription sub_fa138 cascade;
drop subscription sub_pcb138 cascade;
drop subscription sub_pp138 cascade;
  • **4.6 Execute truncate Table SQK (Execute on subscriptio,Remember to execute on the subscription end. If there are triggers A and R, please contact SFCS to confirm the table and delete the table data)
truncate table1;
truncate table2;
truncate table3;
truncate table...
  • **4.7 Recreate subscription (Execute on subscription)
CREATE SUBSCRIPTION sub_cim138
    CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
    PUBLICATION pub_cim138
    WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_cim138, synchronous_commit = 'off');

CREATE SUBSCRIPTION sub_fa138
    CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
    PUBLICATION pub_fa138
    WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_fa138, synchronous_commit = 'off');

CREATE SUBSCRIPTION sub_pcb138
    CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
    PUBLICATION pub_pcb138
    WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_pcb138, synchronous_commit = 'off');

CREATE SUBSCRIPTION sub_pp138
    CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
    PUBLICATION pub_pp138
    WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_pp138, synchronous_commit = 'off');
  • **4.8 Waiting for data initialization (subscription end, usually check if there is a lock)

alt text

sub_xxx_xxx_sync_xxx mean is initial