Publication and Subscription Introduction¶
Introduction logical replication:¶
Logical replication is a method of replicating data objects and their changes based on the replication identifier (usually the primary key) of the data object. Logical replication allows for finer grained control over data replication and security
Logical replication Usage scenarios: 1、Merge data from multiple databases into a single database for data analysis in a data warehouse 2、Logical replication does not rely on version restrictions and can perform data replication between PG databases of different major versions 3、Sharing partial data among multiple databases
The advantages of logical replication: 1、Can cross database versions 2、When publishing, insert can be supported delete、update、truncate 3、A database can have multiple publications, ensuring that the publications do not have duplicate names 4、A publication allows multiple subscribers 5、Allow all tables to be published at once or only partial tables to be published
The disadvantages of logical replication: Logical replication only has global switches The table name and schema for subscription publishing must be the same Need to create corresponding schema and table on the subscription end first Only supports replication DML operations, not DDL operations. If you want to make DDL changes, you need to first execute DDL on the > subscription side, and then execute DDL on the publishing side Copying can only be from base table to base table, and does not support views, materialized views, external tables, etc. If the table > is a partitioned table, it needs to be replicated based on partitions
Limitations on Logical Publishing: The source database user for logical replication must have the replication or superuser role The subscription side can have extra columns, but all columns on the publishing side must have The prerequisite for logical replication is to set the wal_level of the publishing database to logical At the same time, ensure that there are sufficient max_logical_replication_workers and max_wal_senders Ensure that the subscription segment can be connected to the source database through the stream replication protocol (pg_hba.conf) When publishing with delete and update operations, the table must set replica identity to identify old rows (pk, uk, full) For tables without primary/unique keys in logical synchronization, only insert operations are allowed on the publishing side, and > delete and update operations cannot be performed
There are two models for logical replication:
Publication
Nodes that have published are called publishers,Publishing is a collection of changes generated from a table or a set of tables, and can also be described as changing or copying a collection
Subscription
Nodes that have subscriptions are called subscriber,A subscription will define a connection to another database and the collection of publications it wants to subscribe to,Each subscription will receive changes through a replication slot.The initial data synchronization process of pre-existing table data may require additional temporary replication slots,When a subscription is deleted and rebuilt, synchronization information will be lost. This means that the data must be resynchronized
🔍 Create publication syntax:¶
CREATE PUBLICATION name
[ FOR TABLE [ ONLY ] table_name [ * ] [, ...]
| FOR ALL TABLES ]
[ WITH ( publication_parameter [= value] [, ... ] ) ]
exapmle: Create a publication to publish all changes in two tables: CREATE PUBLICATION mypublication FOR TABLE users, departments;
Create a publication to publish all changes in all tables: CREATE PUBLICATION all tables FOR ALL TABLES;
Create a publication that only publishes the POST operation in one table: CREATE PUBLICATION insert_only FOR TABLE mydata WITH (publish = 'insert');
🔍 Modify publication syntax:¶
ALTER PUBLICATION name ADD TABLE [ ONLY ] table_name [ * ] [, ...]
ALTER PUBLICATION name SET TABLE [ ONLY ] table_name [ * ] [, ...]
ALTER PUBLICATION name DROP TABLE [ ONLY ] table_name [ * ] [, ...]
ALTER PUBLICATION name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
ALTER PUBLICATION name RENAME TO new_name
exapmle: Change the publication to publish only deletes and updates: ALTER PUBLICATION noinsert SET (publish = 'update, delete');
Add some tables to the publication: ALTER PUBLICATION mypublication ADD TABLE users, departments;
🔍 Create subscription syntax:¶
CREATE SUBSCRIPTION subscription_name
CONNECTION 'conninfo'
PUBLICATION publication_name [, ...]
[ WITH ( subscription_parameter [= value] [, ... ] ) ]
exapmle: Create a subscription to a remote server, replicate the tables in insert_only publication, and do not start replication until replication is enabled later: CREATE SUBSCRIPTION mysub CONNECTION 'host=192.168.1.50 port=5432 user=postgres password=****** dbname=foodb' PUBLICATION insert_only WITH (enabled = false);
🔍 Modify subscription syntax:¶
ALTER SUBSCRIPTION name CONNECTION 'conninfo'
ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] [ WITH ( set_publication_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name REFRESH PUBLICATION [ WITH ( refresh_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name ENABLE
ALTER SUBSCRIPTION name DISABLE
ALTER SUBSCRIPTION name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
ALTER SUBSCRIPTION name RENAME TO new_name
1. Setup Publication & Subscription¶
- **1.1 Modify configuration file on publication and subscription:
vim /pgdata/Clustername/postgresql.conf
max_worker_processes = 100 #Set the maximum number of background processes that the system can support
max_logical_replication_workers = 100 #Specify the maximum number of logical replication workers
max_replication_slots = 100 #Specify the maximum number of replication slots that the server can support
max_sync_workers_per_subscription = 25 #The maximum number of synchronization workers per subscription
- **1.2 Modify configuration file on publication(Add Subscription User&IP):
- **1.3 Modify wal_level on publication
- **1.4 Reload Cluster
- **1.5 Create Publication
Note: Tables that need to be published under the same schema in WZS should be created under the same publication to distinguish them pub_fa131: Name the publication as pub_ {schema} fa131: Name the schema for publishing tablename: For tables that need to be published (copied)
- **1.6 Create Subscription
create subscription sub_fa131 connection 'host=XXX dbname=MESP131 user=postgres password=****** port=5432' publication pub_fa131 WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_fa131, synchronous_commit = 'off');
sub_fa131:Name the subscription as sub_ {schema} host:For the IP address of the publishing end dbname:Name the DB for the publishing end user/password:To support logical replication of accounts/passwords port:For the port of the publishing end pub_fa131:Specify which publication is associated with the publishing end sub_fa131:Slot name
- **1.7 Add table to publication
- **1.8 drop Table from publication
- **1.9 Refresh subscription
2. Monitor script Setup¶
- **2.1 monitor_rep.sh(On Publication)
vim /pgdata/wistron/script/MESP130/monitor_rep.sh
#!/bin/sh
ource ~/.bash_profile
##设置需要备份DB的标识###
ID=MESP130
##设置PGDATA环境变量###
gdata=/pgdata/$SID
##设置端口号###
ort=`cat $pgdata/postmaster.pid|sed -n 4p`
##设置脚本日志存放路径###
criptfolder=/pgdata/wistron/script/$SID
ow=`date "+%Y-%m-%d %H:%M:%S"`
apcounts=0
sql -p $port -c "select slot_name,plugin,slot_type,database,active,xmin,catalog_xmin,pg_wal_lsn_diff(pg_current_wal_insert_lsn(), estart_lsn) restart_delay,to_timestamp(to_char((pg_stat_file(current_setting('data_directory') || '/pg_wal/' || pg_walfile_name restart_lsn))).modification, 'YYYY-MM-DD HH24:MI:SS.MSOF'),'yyyy-mm-dd hh24:MI:SS'),round(cast(date_part('epoch', to_timestamp(now() :text,'yyyy-mm-dd hh24:MI:SS') - to_timestamp(to_char((pg_stat_file(current_setting('data_directory') || '/pg_wal/' || pg_walfile_name restart_lsn))).modification, 'YYYY-MM-DD HH24:MI:SS.MSOF'),'yyyy-mm-dd hh24:MI:SS'))/60 as numeric),0) as \"gap(min)\" from g_replication_slots;" > $scriptfolder/monitor_rep_"$port".log
f [ -s /pgdata/wistron/script/MESP130/monitor_rep_"$port".log ]
hen
for a in `cat -n /pgdata/wistron/script/MESP130/monitor_rep_"$port".log|grep logical|awk '{print $1}'`; do #进入循环
db=`cat /pgdata/wistron/script/MESP130/monitor_rep_"$port".log|awk '{print $7}'|sed -n "$a"p`
slotname=`cat /pgdata/wistron/script/MESP130/monitor_rep_"$port".log|awk '{print $1}'|sed -n "$a"p`
gapmin=`cat /pgdata/wistron/script/MESP130/monitor_rep_"$port".log|awk '{print $19}'|sed -n "$a"p`
gapsize=`cat /pgdata/wistron/script/MESP130/monitor_rep_"$port".log|awk '{print $14}'|sed -n "$a"p`
if [ "${gapmin}" -gt 10 ];then
gapcounts=$(expr $gapcounts + 1)
fi
done
fi
if [ "${gapcounts}" -gt 0 ];then
/bin/echo -e "Replication Check Failed in $now ! DB port is $port" >> /pgdata/wistron/script/MESP130/monitor_rep_"$port"_status.log
/bin/mail -s "Replication Check Failed in $now ! DB port is $port" -r "SE_PG_Replication_Check" "mzl320.wzs.wistron@wistron.com" < / pgdata/wistron/script/MESP130/monitor_rep_"$port".log
else
echo -e "Replication Check Success in $now ! DB port is $port" >> /pgdata/wistron/script/MESP130/monitor_rep_"$port"_status.log
fi
crontab -l
############Replication monitor###############
*/15 * * * * sh /pgdata/wistron/script/MESP130/monitor_rep.sh
- **2.2 subscription_check.sh(On subscription)
vim /pgdata/wistron/script/REP130/subscription_check.sh
#!/bin/bash
now=`/usr/bin/date "+%Y-%m-%d %H:%M:%S"`
now2=`/usr/bin/date "+%Y-%m-%d"`
pgsoft=/usr/pgsql-13/bin
port=6434
pgdata=/pgdata/REP130
scriptfolder=/pgdata/wistron/script/REP130
$pgsoft/pg_isready -p $port > $scriptfolder/pg_response_"$port"_tmp_status.txt
pgstatus=`/usr/bin/cat $scriptfolder/pg_response_"$port"_tmp_status.txt|/usr/bin/awk '{print $3 " " $4}'`
currentlog=`/usr/bin/cat $pgdata/current_logfiles|/usr/bin/awk '{print $2}'`
######根据DB状态检查订阅检查记录######
if [[ "${pgstatus}" = "accepting connections" ]];then
$pgsoft/psql -p $port -A -F ' ' -t -c "select pg_stat_subscription.subname,datname,pid from pg_stat_subscription,pg_SUBSCRIPTION, pg_database where pg_stat_subscription.subid=pg_SUBSCRIPTION.oid and pg_database.oid=pg_SUBSCRIPTION.subdbid;" > $scriptfolder/ sub_status_"$port".txt
######输出订阅检查记录日志######
echo "订阅检查结果记录: $scriptfolder/sub_status_"$port".txt"
######循环判断复制槽状态######
for a in `/usr/bin/cat -n $scriptfolder/sub_status_"$port".txt|/usr/bin/awk '{print $1}'`; do #进入循环
nowsubname=`/usr/bin/cat $scriptfolder/sub_status_"$port".txt |/usr/bin/sed -n "$a"p|/usr/bin/awk '{print $1}'` #获取当前订阅名称
nowsubnamedb=`/usr/bin/cat $scriptfolder/sub_status_"$port".txt |/usr/bin/sed -n "$a"p|/usr/bin/awk '{print $2}'` #获取当前数据库名称
dbname=$nowsubnamedb
nowsubnamepid=`/usr/bin/cat $scriptfolder/sub_status_"$port".txt |/usr/bin/sed -n "$a"p|/usr/bin/awk '{print $3}'` #获取当前pid
######根据数据库指定通知邮件人######
if [[ "${dbname}" = "REP130" ]];then
mailpic="mzl320.wzs.wistron@wistron.com,mzl120.wzs.wistron@wistron.com,Joyly_Hua@wistron.com,Robert_Pan@wistron.com,Jean_Ouyang@wistron. com,Binge_Chen@wistron.com,junli2@wistronits.com"
else
mailpic="mzl320.wzs.wistron@wistron.com"
fi
######判断是否获取到pid号######
if [ -z "$nowsubnamepid" ];then
/usr/bin/echo "$nowsubname is not response on pid $nowsubnamepid , logfile path is $pgdata/$currentlog , Please Check !!! " >> $scriptfolder/sub_status_"$port"_list.txt ##记录检查记录
echo "订阅检查结果历史记录: $scriptfolder/sub_status_"$port"_list.txt"
/usr/bin/cat $pgdata/$currentlog|/usr/bin/grep "$nowsubname" > $scriptfolder/tmpissuetable_"$port".txt ##获取日志中问题订阅出现的的最后一 次pid
tmpissuepid=`tac $scriptfolder/tmpissuetable_"$port".txt|head -n 1|/usr/bin/awk '{printf $4}'` ##截取pid
tmpissuepid2=`/usr/bin/echo ${tmpissuepid#*[}` ##截取pid
issuepid=`/usr/bin/echo ${tmpissuepid2%]*}` ##截取pid
/usr/bin/cat $pgdata/$currentlog|/usr/bin/grep "$now2"|/usr/bin/grep -E "\[$issuepid\]" > $scriptfolder/issuetable_"$port".txt
tmpmiss=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "missing some replicated columns"|/usr/bin/grep relation|wc -l` ##输出缺失列计数
tmplong=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "value too long for type character"|wc -l` ##输出栏位长度不匹配计数
tmpconstraint=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "unique constraint"|wc -l` ##输出违反唯一约束计数
tmpnull=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "null value in column"|wc -l` ##输出违反空值计数
tmpbigint=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "invalid input syntax for"| wc -l` ##输出字段类型不匹配计数
tmpidentity=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "does not have REPLICA IDENTITY FULL"|wc -l` ##输出缺失复制标识计数
notable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "does not exist"|wc -l` ##输 出订阅未建表计数
tmpoverflow=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "numeric field overflow"| wc -l` ##输出精度不匹配计数
tmpidentity2=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "publisher did not send replica identity column"|wc -l` ##输出缺失复制标识计数
tmppartition=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "no partition of relation"|wc -l` ##输出缺失分区或关系计数
echo "可能存在问题的pid进程号: $issuepid,当前日志路径: $pgdata/$currentlog"
echo "缺失列计数 : $tmpmiss"
echo "栏位长度不匹配计数 : $tmplong"
echo "违反唯一约束计数 : $tmpconstraint"
echo "违反空值计数 : $tmpnull"
echo "字段类型不匹配计数 : $tmpbigint"
echo "缺失复制标识计数 : $tmpidentity"
echo "订阅未建表计数 : $notable"
echo "订阅精度未匹配计数 : $tmpoverflow"
echo "缺失复制标识计数 : $tmpidentity2"
echo "缺失分区或关系计数 : $tmppartition"
if [ "$tmpmiss" -gt 0 ];then ###存在缺失列计数进入判断并告知收件人
tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
issuetable=`/usr/bin/echo ${tmptable%%\"*}`
/usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic" < $scriptfolder/issuetable_"$port".txt
elif [ "$tmplong" -gt 0 ];then ###存在栏位长度不匹配计数进入判断并告知收件人
tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
issuetable=`/usr/bin/echo ${tmptable%%\"*}`
/usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic" < $scriptfolder/issuetable_"$port".txt
elif [ "$tmpconstraint" -gt 0 ];then ###存在违反唯一约束计数进入判断并告知收件人
tmpconstraint=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "unique constraint"`
tmpconstraint=`/usr/bin/echo ${tmpconstraint#*constraint \"}`
constraint=`/usr/bin/echo ${tmpconstraint%\"*}`
$pgsoft/psql -p $port -A -F ' ' -t -d $dbname -c "SELECT table_name FROM (SELECT c.connamespace::regnamespace::text as table_schema, c.conrelid::regclass::text as table_name, con.column_name, c.conname as constraint_name, pg_get_constraintdef(c.oid) FROM pg_constraint c JOIN pg_namespace ON pg_namespace.oid = c.connamespace JOIN pg_class ON c.conrelid = pg_class.oid LEFT JOIN information_schema.constraint_column_usage con ON c.conname = con.constraint_name AND pg_namespace.nspname = con.constraint_schema UNION ALL SELECT table_schema, table_name, column_name, NULL, 'NOT NULL' FROM information_schema.columns WHERE is_nullable = 'NO') all_constraints WHERE table_schema NOT IN ('pg_/usr/bin/catalog', 'information_schema') and constraint_name='$constraint' limit 1;" > $scriptfolder/tmptable_"$port".txt
issuetable=`/usr/bin/cat $scriptfolder/tmptable_"$port".txt`
echo "$constraint"
echo "/usr/bin/cat $scriptfolder/tmptable_"$port".txt"
/usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic" < $scriptfolder/issuetable_"$port".txt
elif [ "$tmpnull" -gt 0 ];then ###存在违反空值计数进入判断并告知收件人
tmpnull=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep -E "\[$issuepid\]"|/usr/bin/grep "null value in column"`
tmpnull=`/usr/bin/echo ${tmpnull#*null value in column \"}`
null=`/usr/bin/echo ${tmpnull%%\"*}`
$pgsoft/psql -p $port -A -F ' ' -t -d $dbname -c "select attrelid::regclass from pg_attribute where attname='$null' and attnotnull is true and attrelid in (select srrelid from pg_subscription_rel where srsubid in (select subid from pg_stat_subscription where subname='$nowsubname'));" > $scriptfolder/tmptable_"$port".txt
issuetable=`/usr/bin/cat $scriptfolder/tmptable_"$port".txt`
/usr/bin/echo -e "\n\tPlease check the following list , maybe have null in table list" >> $scriptfolder/issuetable_"$port".txt
/usr/bin/mail -s "Replication Check Failed ! DB name is $dbname" -r "SE_PG_Replication_Check" -a $scriptfolder/tmptable_"$port".txt "$mailpic" < $scriptfolder/issuetable_"$port".txt
elif [ "$tmpbigint" -gt 0 ];then ###存在字段类型不匹配计数进入判断并告知收件人
tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
issuetable=`/usr/bin/echo ${tmptable%%\"*}`
/usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic" < $scriptfolder/issuetable_"$port".txt
elif [ "$tmpidentity" -gt 0 ];then ###存在缺失复制标识计数进入判断并告知收件人
tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
issuetable=`/usr/bin/echo ${tmptable%%\"*}`
/usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic" < $scriptfolder/issuetable_"$port".txt
elif [ "$notable" -gt 0 ];then ###存在缺失复制表计数进入判断并告知收件人
tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
issuetable=`/usr/bin/echo ${tmptable%%\"*}`
/usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic" < $scriptfolder/issuetable_"$port".txt
elif [ "$tmpoverflow" -gt 0 ];then ###存在精度未匹配计数进入判断并告知收件人
tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
issuetable=`/usr/bin/echo ${tmptable%%\"*}`
/usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic" < $scriptfolder/issuetable_"$port".txt
elif [ "$tmpidentity2" -gt 0 ];then ###存在缺失复制标识计数进入判断并告知收件人
tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
issuetable=`/usr/bin/echo ${tmptable%%\"*}`
/usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic" < $scriptfolder/issuetable_"$port".txt
elif [ "$tmppartition" -gt 0 ];then ###存在缺失分区或关系计数进入判断并告知收件人
tmptable=`/usr/bin/cat $scriptfolder/issuetable_"$port".txt|/usr/bin/grep "$issuepid"|/usr/bin/grep relation`
tmptable=`/usr/bin/echo ${tmptable#*relation \"}`
issuetable=`/usr/bin/echo ${tmptable%%\"*}`
/usr/bin/mail -s "Replication Check Failed ! table name is $issuetable , DB name is $dbname" -r "SE_PG_Replication_Check" "$mailpic" < $scriptfolder/issuetable_"$port".txt
else ###未发现上述定义错误发送邮件通知
/usr/bin/mail -s "Replication Check Failed ! Unknown , Please Check LOG $pgdata/$currentlog , DB name is $dbname" -r "SE_PG_Replication_Check" "mzl320.wzs.wistron@wistron.com" < $scriptfolder/sub_status_"$port".txt
fi
else
/usr/bin/echo "$dbname logical replication $nowsubname is runing on pid $nowsubnamepid , Check success in $now !!! " >> $scriptfolder/ sub_status_"$port"_list.txt
fi
done
/usr/bin/echo "--------------------------------------------------------------------------------" >> $scriptfolder/ sub_status_"$port"_list.txt
else
/usr/bin/mail -s "DB response failed . Please check it , $now ! DB Port is $port" -r "SE_PG_Response_Check" "mzl320.wzs.wistron@wistron. com" < "$scriptfolder/pg_response_"$port"_tmp_status.txt"
fi
crontab -l
############Replication table check###############
*/10 * * * * sh /pgdata/wistron/script/REP130/subscription_check.sh
3. Issue share¶
- **3.1 Without a primary key constraint, data can be inserted and cannot be deleted or modified. Please inform the User. If there is no primary key, use the following command to use all columns of the table as identifiers (executed on the publishing side)
- **3.2 Data cannot be synchronized. According to the PG database log on the subscription end, Column is missing. Please ask SFCS/AP Users to fill in the missing column on the subscription end
- **3.3 Processes not enough
- **3.4 Different constraints
- **3.4 Have Lock
4. Recreate Subscription¶
- **4.1 Keep create publication SQL(Publication,Use pgadmin to record)
CREATE PUBLICATION pub_cim138
FOR TABLE cim138.cimaltitem, cim138.cimapgroup, cim138.cimapimage, cim138.cimapini, cim138.cimapiniworkstation, cim138.cimapmapping, cim138.cimapplication, cim138.cimapuser, cim138.cimapversion, cim138.cimbgbase, cim138.cimbomlocation, cim138.cimcarrysystem, cim138.cimcategory, cim138.cimcategorymapping, cim138.cimcausestationstage, cim138.cimcompareitem, cim138.cimcomparesetting, cim138.cimconfiguration, cim138.cimconfigurationitem, cim138.cimcpninfo, cim138.cimcpnrohsapprove, cim138.cimcpnrule, cim138.cimcpnseq, cim138.cimcpnstatus, cim138.cimcsnrule, cim138.cimcustomererrorcode, cim138.cimcustomererrortype, cim138.cimcustomize, cim138.cimdepartment, cim138.cimderivecategory, cim138.cimderiveitem, cim138.cimdiybasedata, cim138.cimdiybasedatacrictl, cim138.cimdiybasedataeditctl, cim138.cimdynamicquery, cim138.cimdynamicquerydetail, cim138.cimerrorcode, cim138.cimesop, cim138.cimexceptionroute, cim138.cimexcludeitem, cim138.cimgateway, cim138.cimgreenfactor, cim138.cimgroup, cim138.cimid, cim138.cimidlist, cim138.cimidrange, cim138.cimidrangelist, cim138.cimidspecification, cim138.cimidvalue, cim138.cimlabeljobsetting, cim138.cimline, cim138.cimlogon, cim138.cimmaterial, cim138.cimmaterialgroup, cim138.cimmessage, cim138.cimmessagemapping, cim138.cimmodel, cim138.cimmodwnlog, cim138.cimmodwnlogdetail, cim138.cimmotype, cim138.cimoperation, cim138.cimpassword, cim138.cimportallayout, cim138.cimprofitcenter, cim138.cimreasoncode, cim138.cimreasoncodemapstage, cim138.cimreasontype, cim138.cimreportfilter, cim138.cimreporthead, cim138.cimroute, cim138.cimroutemap, cim138.cimsampling, cim138.cimsamplingdetail, cim138.cimsamplingrate, cim138.cimsngenlog, cim138.cimsngenlogdetail, cim138.cimsnrule, cim138.cimsnrulecarryinfo, cim138.cimspecifyitem, cim138.cimstage, cim138.cimupninfo, cim138.cimupnpackage, cim138.cimupntestroute, cim138.cimuser, cim138.cimuseraddinfo, cim138.cimuserapusehistory, cim138.cimusergrid, cim138.cimusergroup, cim138.cimusermenu, cim138.cimworkcentermap, cim138.cimworkstation, cim138.cimxraytype
WITH (publish = 'insert, update, delete, truncate', publish_via_partition_root = true);
CREATE PUBLICATION pub_fa138
FOR TABLE fa138.mesusn_qis, fa138.doausndefect, fa138.doausnrepair, fa138.doausnrepairitem, fa138.mescarton, fa138.mescartonitem, fa138.mescheckinout, fa138.meserpgmreasoncode, fa138.meserpmo, fa138.meserpmoitem, fa138.meserppalletitem, fa138.mesktloutevent, fa138.mesmocsn, fa138.mesmoidready, fa138.mesmoinfo, fa138.mesmopickinglist, fa138.mesmorework, fa138.mesmoreworklist, fa138.mesmousnvice, fa138.mesmoweightchange, fa138.mesoob_inspections, fa138.mesoob_modelupn, fa138.mesoob_usndefect, fa138.mesortusn, fa138.mespallet, fa138.mespalletitem, fa138.mesptlrack, fa138.messnrange, fa138.messubmoitem, fa138.messubusn, fa138.messubusnitem, fa138.mestemplocation, fa138.mestransactioncache, fa138.mestransaction_retest, fa138.mesupnweight, fa138.mesusnepc, fa138.mesusnid, fa138.mesusnroutechange, fa138.mesusnscrap, fa138.mesusn, fa138.mestransaction, fa138.mesusninfo, fa138.mesesddata, fa138.mescausestationtransactionnifi, fa138.sandra_checkovertimelinenifi, fa138.mescausestationtransaction, fa138.mesimtransaction, fa138.mesbuyauomaterial, fa138.mesbuyauomaterialdetail, fa138.auo_spl_chiphist, fa138.mesbuyauomaterialarrive, fa138.mesauooobdefectloc, fa138.mesauocelljudge, fa138.mesbuyauomaterialscrap, fa138.mesbuyauomaterialdefectplt, fa138.mesbuyauomaterialdefect, fa138.mesauofilelog, fa138.mesauofilelogdetail, fa138.mesauolabel, fa138.mesauolabelitem, fa138.mesauobrisamo, fa138.auo_ct_code_temp, fa138.auo_ct_code_old_temp, fa138.mesauoreportseq, fa138.auo_targetgrade, fa138.mesusnitem_auo, fa138.mesauojobresend, fa138.mescsotlabel, fa138.meslcmcsotheader, fa138.meslcmcsotdetail, fa138.meslcmfilelog, fa138.meslcmfilelogdetail, fa138.meslcmwoinfo, fa138.mesfilmmaterialbase, fa138.mesfilmweight, fa138.mesfilmcuttingfeedstock, fa138.mesfilmcuttingfeedstocklog, fa138.mesfilmcuttingoffline, fa138.mesfilmcutting, fa138.mesfilmcuttingonline, fa138.mesfilmmaterialusedrecord, fa138.mesusnoemctcode, fa138.customer_defectcode_mapping
WITH (publish = 'insert, update, delete, truncate', publish_via_partition_root = true);
CREATE PUBLICATION pub_pcb138
FOR TABLE pcb138.mesusn_qis, pcb138.dfimbdefectnum, pcb138.doausndefect, pcb138.doausnrepair, pcb138.doausnrepairitem, pcb138.mesadvshipment, pcb138.mescarton, pcb138.mescartoncache, pcb138.mescartonitem, pcb138.mescausestationtransaction, pcb138.mescheckinout, pcb138.mescleanmblog, pcb138.meserpgmreasoncode, pcb138.meserpmo, pcb138.meserpmoitem, pcb138.meserppalletitem, pcb138.mesmocsn, pcb138.mesmoidready, pcb138.mesmoinfo, pcb138.mesmopickinglist, pcb138.mesmorework, pcb138.mesmoreworklist, pcb138.mesmousnvice, pcb138.mesmoweightchange, pcb138.mespallet, pcb138.mespalletitem, pcb138.mesrepairlocrestrict, pcb138.messnrange, pcb138.messpilog, pcb138.messpilogsum, pcb138.messubmoitem, pcb138.mestemplocation, pcb138.mestransactioncache, pcb138.mesupnteststandardtime, pcb138.mesupntimerestrict, pcb138.mesusndefectloc, pcb138.mesusnforcestorein, pcb138.mesusnid, pcb138.mesusnpacked, pcb138.mesusnroutechange, pcb138.mesusnscrap, pcb138.sctstencil, pcb138.sctstencilmodel, pcb138.sctstenciltransfer, pcb138.sctstencil_upn, pcb138.mesusn, pcb138.mestransaction, pcb138.mesmo, pcb138.mesmoitem, pcb138.mesusnitem, pcb138.mesusninfo
WITH (publish = 'insert, update, delete, truncate', publish_via_partition_root = true);
CREATE PUBLICATION pub_pp138
FOR TABLE pp138.dpmfpyr, pp138.dpmoee, pp138.dpmoeechargehour, pp138.dpmoeedetail, pp138.dpmoeeissue, pp138.dpmoeeissuegroup, pp138.dpmoeelosshour, pp138.dpmpdtissues, pp138.dpmpdtissuesgroupdetail, pp138.dpmproductivity, pp138.dpmproductivitydetail, pp138.dpmuph, pp138.dpmupph, pp138.dpmupphn, pp138.dpmupphndetail, pp138.dpmyrissue, pp138.dpmyrusndetail, pp138.ppflinestopdetail, pp138.ppflinestophead, pp138.ppfstdtimehead, pp138.ppfstopcode, pp138.uppuhinfo, pp138.nppbasedata, pp138.nppgenlog, pp138.nppmanualmo, pp138.nppofflineratio, pp138.dpmfpyrbymodel, pp138.dpmfpyrperiodbase, pp138.dpmerrlog, pp138.dpmyrissuelog, pp138.dpmupphnbyupn, pp138.ppsmtstandardtimedetail, pp138.ppsmtstandardtimehead, pp138.ppfstdtimedetail, pp138.ppfparentchildset, pp138.mifpyrhourly, pp138.ppfshiftid, pp138.ppftustdtimedetail, pp138.syncidcontrol, pp138.ppctaclinestate, pp138.ppctacplcnode_temp, pp138.att_station_audit, pp138.dccesoprequest, pp138.att_station_of_post, pp138.att_station_post_skill, pp138.dccesop, pp138.epmrfiddata, pp138.att_employee_authority, pp138.att_station, pp138.att_skill, pp138.dccesopdetail, pp138.epmshiftnifi, pp138.epmstationskillnifi, pp138.epmstationempskillnifi, pp138.linekbdetail, pp138.epmusnmappingnifi
WITH (publish = 'insert, update, delete, truncate', publish_via_partition_root = true);
- **4.2 Check if there are any related triggers in the subscription table and if the trigger activation mode is A or R(execute on subscripton,A represents always, R represents Replica; Regardless of how the data trigger is inserted for representative A, it will still trigger, and it is necessary to evaluate whether it can be truncated; For R, it will only be triggered when the data source is logical replication, and can be directly deleted)
select * from pg_trigger where tgrelid in (select srrelid from pg_subscription_rel) and tgenabled in ('A','R');
- **4.3 Keep create subscription SQL(execute on subscription,Use pgadmin to record,Because the copied file does not contain a password, it is necessary to manually add the password field 'password=XXX' in the options, and to rewrite 'creat_stlot=true')
CREATE SUBSCRIPTION sub_cim138
CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
PUBLICATION pub_cim138
WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_cim138, synchronous_commit = 'off');
CREATE SUBSCRIPTION sub_fa138
CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
PUBLICATION pub_fa138
WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_fa138, synchronous_commit = 'off');
CREATE SUBSCRIPTION sub_pcb138
CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
PUBLICATION pub_pcb138
WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_pcb138, synchronous_commit = 'off');
CREATE SUBSCRIPTION sub_pp138
CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
PUBLICATION pub_pp138
WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_pp138, synchronous_commit = 'off');
- **4.4 SELETE Truncate Table SQL And record (Execute on subscriptio,Remember to execute on the subscription end. If there are triggers A and R, please contact SFCS to confirm the table and delete the table data)
- **4.5 drop subscription(execute on subscription)
drop subscription sub_cim138 cascade;
drop subscription sub_fa138 cascade;
drop subscription sub_pcb138 cascade;
drop subscription sub_pp138 cascade;
- **4.6 Execute truncate Table SQK (Execute on subscriptio,Remember to execute on the subscription end. If there are triggers A and R, please contact SFCS to confirm the table and delete the table data)
- **4.7 Recreate subscription (Execute on subscription)
CREATE SUBSCRIPTION sub_cim138
CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
PUBLICATION pub_cim138
WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_cim138, synchronous_commit = 'off');
CREATE SUBSCRIPTION sub_fa138
CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
PUBLICATION pub_fa138
WITH (connect = true, enabled = true, create_slot = false, slot_name = sub_fa138, synchronous_commit = 'off');
CREATE SUBSCRIPTION sub_pcb138
CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
PUBLICATION pub_pcb138
WITH (connect = true, enabled = true, create_slot = false, slot_name = sub_pcb138, synchronous_commit = 'off');
CREATE SUBSCRIPTION sub_pp138
CONNECTION 'host=10.55.240.134 port=6435 user=postgres dbname=MESP138 password=W2$XXX'
PUBLICATION pub_pp138
WITH (connect = true, enabled = true, create_slot = true, slot_name = sub_pp138, synchronous_commit = 'off');
- **4.8 Waiting for data initialization (subscription end, usually check if there is a lock)
sub_xxx_xxx_sync_xxx mean is initial





















