From 2d6f9bb421990f2e708ef656e84bffadf0256718 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Mon, 27 Oct 2025 15:31:13 +0530
Subject: [PATCH v20251030 1/3] New worker for sequence synchronization during
 subscription management

This patch introduces sequence synchronization:
Sequences have 2 states:
   - INIT (needs [re]synchronizing)
   - READY (is already synchronized)

A new sequencesync worker is launched as needed to synchronize sequences.
It does the following:
    a) Retrieves remote values of sequences with pg_sequence_state() INIT.
    b) Logs a warning if the sequence parameters differ between the publisher and subscriber.
    c) Sets the local sequence values accordingly.
    d) Updates the local sequence state to READY.
    e) Repeats until all done; Commits synchronized sequences in batches of 100

Sequence synchronization occurs in 3 places:
1) CREATE SUBSCRIPTION
    - (The command syntax remains unchanged from PG18 to PG19.)
    - The subscriber retrieves sequences associated with publications.
    - Published sequences are added to pg_subscription_rel with INIT state.
    - Initiate the sequencesync worker (see above) to synchronize all
      sequences.

2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    - (The command syntax remains unchanged from PG18 to PG19.)
    - Dropped published sequences are removed from pg_subscription_rel.
    - Newly published sequences are added to pg_subscription_rel with INIT state.
    - Initiate the sequencesync worker (see above) to synchronize only
      newly added sequences.

3) ALTER SUBSCRIPTION ... REFRESH SEQUENCES
    - (A new command introduced in PG19 by a prior patch.)
    - All sequences in pg_subscription_rel are reset to DATASYNC state.
    - Initiate the sequencesync worker (see above) to synchronize all
      sequences.
    - Unlike "ALTER SUBSCRIPTION ... REFRESH PUBLICATION" command,
      addition and removal of missing sequences will not be done in this
      case

Author: Vignesh C <vignesh21@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Nisha Moond <nisha.moond412@gmail.com>
Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com>
Discussion: https://www.postgresql.org/message-id/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
---
 src/backend/catalog/pg_subscription.c         |   2 +-
 src/backend/commands/sequence.c               |  18 +-
 src/backend/postmaster/bgworker.c             |   5 +-
 src/backend/replication/logical/Makefile      |   1 +
 src/backend/replication/logical/launcher.c    |  51 +-
 src/backend/replication/logical/meson.build   |   1 +
 .../replication/logical/sequencesync.c        | 690 ++++++++++++++++++
 src/backend/replication/logical/syncutils.c   | 134 +++-
 src/backend/replication/logical/tablesync.c   |  73 +-
 src/backend/replication/logical/worker.c      |  69 +-
 src/backend/utils/misc/guc_parameters.dat     |   2 +-
 src/include/catalog/pg_proc.dat               |   2 +-
 src/include/catalog/pg_subscription_rel.h     |  11 +
 src/include/commands/sequence.h               |   1 +
 src/include/replication/logicalworker.h       |   3 +-
 src/include/replication/worker_internal.h     |  22 +-
 src/test/subscription/t/036_sequences.pl      | 180 ++++-
 src/tools/pgindent/typedefs.list              |   2 +
 18 files changed, 1153 insertions(+), 114 deletions(-)
 create mode 100644 src/backend/replication/logical/sequencesync.c

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 15b233a37d8..1945627ed88 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -354,7 +354,7 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 							  ObjectIdGetDatum(relid),
 							  ObjectIdGetDatum(subid));
 	if (!HeapTupleIsValid(tup))
-		elog(ERROR, "subscription table %u in subscription %u does not exist",
+		elog(ERROR, "subscription relation %u in subscription %u does not exist",
 			 relid, subid);
 
 	/* Update the tuple. */
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index c23dee5231c..8d671b7a29d 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -112,7 +112,6 @@ static void init_params(ParseState *pstate, List *options, bool for_identity,
 						bool *is_called,
 						bool *need_seq_rewrite,
 						List **owned_by);
-static void do_setval(Oid relid, int64 next, bool iscalled);
 static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity);
 
 
@@ -954,8 +953,8 @@ lastval(PG_FUNCTION_ARGS)
  * it is the only way to clear the is_called flag in an existing
  * sequence.
  */
-static void
-do_setval(Oid relid, int64 next, bool iscalled)
+void
+SetSequence(Oid relid, int64 next, bool iscalled)
 {
 	SeqTable	elm;
 	Relation	seqrel;
@@ -1056,7 +1055,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
 
 /*
  * Implement the 2 arg setval procedure.
- * See do_setval for discussion.
+ * See SetSequence for discussion.
  */
 Datum
 setval_oid(PG_FUNCTION_ARGS)
@@ -1064,14 +1063,14 @@ setval_oid(PG_FUNCTION_ARGS)
 	Oid			relid = PG_GETARG_OID(0);
 	int64		next = PG_GETARG_INT64(1);
 
-	do_setval(relid, next, true);
+	SetSequence(relid, next, true);
 
 	PG_RETURN_INT64(next);
 }
 
 /*
  * Implement the 3 arg setval procedure.
- * See do_setval for discussion.
+ * See SetSequence for discussion.
  */
 Datum
 setval3_oid(PG_FUNCTION_ARGS)
@@ -1080,7 +1079,7 @@ setval3_oid(PG_FUNCTION_ARGS)
 	int64		next = PG_GETARG_INT64(1);
 	bool		iscalled = PG_GETARG_BOOL(2);
 
-	do_setval(relid, next, iscalled);
+	SetSequence(relid, next, iscalled);
 
 	PG_RETURN_INT64(next);
 }
@@ -1797,8 +1796,9 @@ pg_sequence_parameters(PG_FUNCTION_ARGS)
 /*
  * Return the sequence tuple along with its page LSN.
  *
- * This is primarily intended for use by pg_dump to gather sequence data
- * without needing to individually query each sequence relation.
+ * This is primarily used by pg_dump to efficiently collect sequence data
+ * without querying each sequence individually, and is also leveraged by
+ * logical replication while synchronizing sequences.
  */
 Datum
 pg_get_sequence_data(PG_FUNCTION_ARGS)
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 1ad65c237c3..142a02eb5e9 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -131,7 +131,10 @@ static const struct
 		"ParallelApplyWorkerMain", ParallelApplyWorkerMain
 	},
 	{
-		"TablesyncWorkerMain", TablesyncWorkerMain
+		"TableSyncWorkerMain", TableSyncWorkerMain
+	},
+	{
+		"SequenceSyncWorkerMain", SequenceSyncWorkerMain
 	}
 };
 
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c62c8c67521..c719af1f8a9 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -26,6 +26,7 @@ OBJS = \
 	proto.o \
 	relation.o \
 	reorderbuffer.o \
+	sequencesync.o \
 	slotsync.o \
 	snapbuild.o \
 	syncutils.o \
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 95b5cae9a55..a3e84d369bd 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -248,9 +248,10 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
  * Walks the workers array and searches for one that matches given worker type,
  * subscription id, and relation id.
  *
- * For apply workers, the relid should be set to InvalidOid, as they manage
- * changes across all tables. For table sync workers, the relid should be set
- * to the OID of the relation being synchronized.
+ * For apply workers and sequencesync workers, the relid should be set to
+ * InvalidOid, as they manage changes across all tables. For tablesync
+ * workers, the relid should be set to the OID of the relation being
+ * synchronized.
  */
 LogicalRepWorker *
 logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
@@ -334,6 +335,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
 	int			nparallelapplyworkers;
 	TimestampTz now;
 	bool		is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
+	bool		is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
 	bool		is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
 
 	/*----------
@@ -422,7 +424,8 @@ retry:
 	 * sync worker limit per subscription. So, just return silently as we
 	 * might get here because of an otherwise harmless race condition.
 	 */
-	if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
+	if ((is_tablesync_worker || is_sequencesync_worker) &&
+		nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
 		return false;
@@ -511,8 +514,16 @@ retry:
 			memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
 			break;
 
+		case WORKERTYPE_SEQUENCESYNC:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication sequencesync worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
+			break;
+
 		case WORKERTYPE_TABLESYNC:
-			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
 			snprintf(bgw.bgw_name, BGW_MAXLEN,
 					 "logical replication tablesync worker for subscription %u sync %u",
 					 subid,
@@ -848,6 +859,29 @@ logicalrep_launcher_onexit(int code, Datum arg)
 	LogicalRepCtx->launcher_pid = 0;
 }
 
+/*
+ * Reset the last_seqsync_start_time of the sequencesync worker in the
+ * subscription's apply worker.
+ *
+ * Note that this value is not stored in the sequencesync worker, because that
+ * has finished already and is about to exit.
+ */
+void
+logicalrep_reset_seqsync_start_time(void)
+{
+	LogicalRepWorker *worker;
+
+	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+
+	worker = logicalrep_worker_find(WORKERTYPE_APPLY,
+									MyLogicalRepWorker->subid, InvalidOid,
+									true);
+	if (worker)
+		worker->last_seqsync_start_time = 0;
+
+	LWLockRelease(LogicalRepWorkerLock);
+}
+
 /*
  * Cleanup function.
  *
@@ -896,7 +930,7 @@ logicalrep_sync_worker_count(Oid subid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (isTablesyncWorker(w) && w->subid == subid)
+		if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
 			res++;
 	}
 
@@ -1610,7 +1644,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		worker_pid = worker.proc->pid;
 
 		values[0] = ObjectIdGetDatum(worker.subid);
-		if (isTablesyncWorker(&worker))
+		if (isTableSyncWorker(&worker))
 			values[1] = ObjectIdGetDatum(worker.relid);
 		else
 			nulls[1] = true;
@@ -1650,6 +1684,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 			case WORKERTYPE_PARALLEL_APPLY:
 				values[9] = CStringGetTextDatum("parallel apply");
 				break;
+			case WORKERTYPE_SEQUENCESYNC:
+				values[9] = CStringGetTextDatum("sequence synchronization");
+				break;
 			case WORKERTYPE_TABLESYNC:
 				values[9] = CStringGetTextDatum("table synchronization");
 				break;
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index 9283e996ef4..a2268d8361e 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -12,6 +12,7 @@ backend_sources += files(
   'proto.c',
   'relation.c',
   'reorderbuffer.c',
+  'sequencesync.c',
   'slotsync.c',
   'snapbuild.c',
   'syncutils.c',
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
new file mode 100644
index 00000000000..3d81d671560
--- /dev/null
+++ b/src/backend/replication/logical/sequencesync.c
@@ -0,0 +1,690 @@
+/*-------------------------------------------------------------------------
+ * sequencesync.c
+ *	  PostgreSQL logical replication: sequence synchronization
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/sequencesync.c
+ *
+ * NOTES
+ *	  This file contains code for sequence synchronization for
+ *	  logical replication.
+ *
+ * Sequences requiring synchronization are tracked in the pg_subscription_rel
+ * catalog.
+ *
+ * Sequences to be synchronized will be added with state INIT when either of
+ * the following commands is executed:
+ * CREATE SUBSCRIPTION
+ * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
+ *
+ * Executing the following command resets all sequences in the subscription to
+ * state INIT, triggering re-synchronization:
+ * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
+ *
+ * The apply worker periodically scans pg_subscription_rel for sequences in
+ * INIT state. When such sequences are found, it spawns a sequencesync worker
+ * to handle synchronization.
+ *
+ * A single sequencesync worker is responsible for synchronizing all sequences
+ * in INIT state in pg_subscription_rel. It begins by retrieving the list of
+ * sequences flagged for synchronization. These sequences are then processed
+ * in batches, allowing multiple entries to be synchronized within a single
+ * transaction. The worker fetches the current sequence values and page LSNs
+ * from the remote publisher, updates the corresponding sequences on the local
+ * subscriber, and finally marks each sequence as READY upon successful
+ * synchronization.
+ *
+ * Sequence state transitions follow this pattern:
+ *   INIT → READY
+ *
+ * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
+ * sequences are synchronized per transaction. The locks on the sequence
+ * relation will be periodically released at each transaction commit.
+ *
+ * XXX: We didn't choose launcher process to maintain the launch of sequencesync
+ * worker as it didn't have database connection to access the sequences from the
+ * pg_subscription_rel system catalog that need to be synchronized.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/table.h"
+#include "catalog/pg_sequence.h"
+#include "catalog/pg_subscription_rel.h"
+#include "commands/sequence.h"
+#include "pgstat.h"
+#include "postmaster/interrupt.h"
+#include "replication/logicalworker.h"
+#include "replication/worker_internal.h"
+#include "utils/acl.h"
+#include "utils/fmgroids.h"
+#include "utils/guc.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
+#include "utils/syscache.h"
+#include "utils/usercontext.h"
+
+#define REMOTE_SEQ_COL_COUNT 10
+
+typedef enum CopySeqResult
+{
+	COPYSEQ_SUCCESS,
+	COPYSEQ_MISMATCH,
+	COPYSEQ_INSUFFICIENT_PERM,
+	COPYSEQ_SKIPPED
+} CopySeqResult;
+
+static List *seqinfos = NIL;
+
+/*
+ * Apply worker determines if sequence synchronization is needed.
+ *
+ * Start a sequencesync worker if one is not already running. The active
+ * sequencesync worker will handle all pending sequence synchronization. If any
+ * sequences remain unsynchronized after it exits, a new worker can be started
+ * in the next iteration.
+ */
+void
+ProcessSyncingSequencesForApply(void)
+{
+	LogicalRepWorker *sequencesync_worker;
+	int			nsyncworkers;
+	bool		has_pending_sequences;
+	bool		started_tx;
+
+	FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
+
+	if (started_tx)
+	{
+		CommitTransactionCommand();
+		pgstat_report_stat(true);
+	}
+
+	if (!has_pending_sequences)
+		return;
+
+	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+	/* Check if there is a sequencesync worker already running? */
+	sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
+												 MyLogicalRepWorker->subid,
+												 InvalidOid, true);
+	if (sequencesync_worker)
+	{
+		LWLockRelease(LogicalRepWorkerLock);
+		return;
+	}
+
+	/*
+	 * Count running sync workers for this subscription, while we have the
+	 * lock.
+	 */
+	nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+	LWLockRelease(LogicalRepWorkerLock);
+
+	launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
+					   &MyLogicalRepWorker->last_seqsync_start_time);
+}
+
+/*
+ * report_sequence_errors
+ *
+ * Report discrepancies in sequence data between the publisher and subscriber.
+ * It identifies sequences that do not have sufficient privileges, as well as
+ * sequences that exist on both sides but have mismatched values.
+ */
+static void
+report_sequence_errors(StringInfo insuffperm_seqs, StringInfo mismatched_seqs,
+					   StringInfo missing_seqs)
+{
+	StringInfo	detail = makeStringInfo();
+	StringInfo	hint = makeStringInfo();
+	bool		need_separator = false;
+
+	if (insuffperm_seqs->len)
+	{
+		appendStringInfo(detail, "Insufficient privileges on the sequence(s): (%s)",
+						 insuffperm_seqs->data);
+		appendStringInfoString(hint, "Grant UPDATE privilege on the sequence(s).");
+		need_separator = true;
+	}
+
+	if (mismatched_seqs->len)
+	{
+		appendStringInfo(detail, "%sMismatched or renamed sequence(s) on subscriber: (%s)",
+						 need_separator ? "; " : "", mismatched_seqs->data);
+		appendStringInfo(hint, "%sFor mismatched sequences, alter or re-create local sequences to match the publisher's defintion.",
+						 need_separator ? " " : "");
+		need_separator = true;
+	}
+
+	if (missing_seqs->len)
+	{
+		appendStringInfo(detail, "%sMissing sequence(s) on publisher: (%s)",
+						 need_separator ? "; " : "", missing_seqs->data);
+		appendStringInfo(hint, "%sFor missing sequences, remove them locally or run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to refresh the subscription.",
+						 need_separator ? " " : "");
+	}
+
+	ereport(ERROR,
+			errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			errmsg("logical replication sequence synchronization failed for subscription \"%s\"", MySubscription->name),
+			errdetail("%s.", detail->data),
+			errhint("%s", hint->data));
+}
+
+/*
+ * Appends a qualified sequence name to a StringInfo buffer. Optionally
+ * increments a counter if provided. Used to build comma-separated lists of
+ * sequences.
+ */
+static void
+append_sequence_name(StringInfo buf, const char *nspname, const char *seqname,
+					 int *count)
+{
+	if (buf->len > 0)
+		appendStringInfoString(buf, ", ");
+
+	appendStringInfo(buf, "\"%s.%s\"", nspname, seqname);
+
+	if (count)
+		(*count)++;
+}
+
+/*
+ * get_and_validate_seq_info
+ *
+ * Extracts remote sequence information from the tuple slot received from the
+ * publisher and validates it against the corresponding local sequence
+ * definition.
+ */
+static CopySeqResult
+get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
+						  LogicalRepSequenceInfo **seqinfo,
+						  XLogRecPtr *page_lsn, int64 *last_value,
+						  bool *is_called)
+{
+	bool		isnull;
+	int			col = 0;
+	int			seqidx;
+	Oid			remote_typid;
+	int64		remote_start;
+	int64		remote_increment;
+	int64		remote_min;
+	int64		remote_max;
+	bool		remote_cycle;
+	CopySeqResult result = COPYSEQ_SUCCESS;
+	HeapTuple	tup;
+	Form_pg_sequence local_seq;
+	LogicalRepSequenceInfo *seqinfo_local;
+
+	seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
+	Assert(!isnull);
+
+	*last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+	Assert(!isnull);
+
+	*is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
+	Assert(!isnull);
+
+	*page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
+	Assert(!isnull);
+
+	remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
+	Assert(!isnull);
+
+	remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+	Assert(!isnull);
+
+	remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+	Assert(!isnull);
+
+	remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+	Assert(!isnull);
+
+	remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+	Assert(!isnull);
+
+	remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
+	Assert(!isnull);
+
+	/* Sanity check */
+	Assert(col == REMOTE_SEQ_COL_COUNT);
+
+	seqinfo_local = (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
+	seqinfo_local->found_on_pub = true;
+	*seqinfo = seqinfo_local;
+
+	*sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
+	if (!*sequence_rel)			/* Sequence was concurrently dropped? */
+		return COPYSEQ_SKIPPED;
+
+	tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
+	if (!HeapTupleIsValid(tup)) /* Sequence was concurrently dropped? */
+		return COPYSEQ_SKIPPED;
+
+	local_seq = (Form_pg_sequence) GETSTRUCT(tup);
+
+	/* Sequence parameters for remote/local are the same? */
+	if (local_seq->seqtypid != remote_typid ||
+		local_seq->seqstart != remote_start ||
+		local_seq->seqincrement != remote_increment ||
+		local_seq->seqmin != remote_min ||
+		local_seq->seqmax != remote_max ||
+		local_seq->seqcycle != remote_cycle)
+		result = COPYSEQ_MISMATCH;
+
+	/* Sequence was concurrently renamed? */
+	if (strcmp(seqinfo_local->nspname,
+			   get_namespace_name(get_rel_namespace(seqinfo_local->localrelid))) ||
+		strcmp(seqinfo_local->seqname,
+			   get_rel_name(seqinfo_local->localrelid)))
+		result = COPYSEQ_MISMATCH;
+
+	ReleaseSysCache(tup);
+	return result;
+}
+
+/*
+ * Apply remote sequence state to local sequence and mark it as
+ * synchronized (READY).
+ */
+static CopySeqResult
+copy_sequence(LogicalRepSequenceInfo *seqinfo, int64 last_value,
+			  bool is_called, XLogRecPtr page_lsn, Oid seqowner)
+{
+	UserContext ucxt;
+	AclResult	aclresult;
+	bool		run_as_owner = MySubscription->runasowner;
+	Oid			seqoid = seqinfo->localrelid;
+
+	/*
+	 * Make sure that the sequence is copied as sequence owner, unless the
+	 * user has opted out of that behaviour.
+	 */
+	if (!run_as_owner)
+		SwitchToUntrustedUser(seqowner, &ucxt);
+
+	aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
+
+	if (aclresult != ACLCHECK_OK)
+	{
+		if (!run_as_owner)
+			RestoreUserContext(&ucxt);
+
+		return COPYSEQ_INSUFFICIENT_PERM;
+	}
+
+	SetSequence(seqoid, last_value, is_called);
+
+	if (!run_as_owner)
+		RestoreUserContext(&ucxt);
+
+	/*
+	 * Record the remote sequence's LSN in pg_subscription_rel and mark the
+	 * sequence as READY.
+	 */
+	UpdateSubscriptionRelState(MySubscription->oid, seqoid,
+							   SUBREL_STATE_READY, page_lsn, false);
+
+	return COPYSEQ_SUCCESS;
+}
+
+/*
+ * Copy existing data of sequences from the publisher.
+ */
+static void
+copy_sequences(WalReceiverConn *conn)
+{
+	int			cur_batch_base_index = 0;
+	StringInfo	mismatched_seqs = makeStringInfo();
+	StringInfo	missing_seqs = makeStringInfo();
+	StringInfo	insuffperm_seqs = makeStringInfo();
+	StringInfo	seqstr = makeStringInfo();
+	StringInfo	cmd = makeStringInfo();
+
+#define MAX_SEQUENCES_SYNC_PER_BATCH 100
+
+	ereport(LOG,
+			errmsg("logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
+				   MySubscription->name, list_length(seqinfos)));
+
+	while (cur_batch_base_index < list_length(seqinfos))
+	{
+		Oid			seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
+		BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
+		int			batch_size = 0;
+		int			batch_succeeded_count = 0;
+		int			batch_mismatched_count = 0;
+		int			batch_skipped_count = 0;
+		int			batch_insuffperm_count = 0;
+		int			batch_missing_count;
+		Relation	sequence_rel;
+
+		WalRcvExecResult *res;
+		TupleTableSlot *slot;
+
+		StartTransactionCommand();
+
+		for (int idx = cur_batch_base_index; idx < list_length(seqinfos); idx++)
+		{
+			LogicalRepSequenceInfo *seqinfo =
+				(LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
+
+			if (seqstr->len > 0)
+				appendStringInfoString(seqstr, ", ");
+
+			appendStringInfo(seqstr, "(\'%s\', \'%s\', %d)",
+							 seqinfo->nspname, seqinfo->seqname, idx);
+
+			if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
+				break;
+		}
+
+		/*
+		 * We deliberately avoid acquiring a local lock on the sequence before
+		 * querying the publisher to prevent potential distributed deadlocks
+		 * in bi-directional replication setups. For instance, a concurrent
+		 * ALTER SEQUENCE on one node might block this worker, while the
+		 * worker's own local lock simultaneously blocks a similar operation
+		 * on the other nod resulting in a circular wait that spans both nodes
+		 * and remains undetected.
+		 *
+		 * Note: Each entry in VALUES includes an index 'seqidx' that
+		 * represents the sequence's position in the local 'seqinfos' list.
+		 * This index is propagated to the query results and later used to
+		 * directly map the fetched publisher sequence rows back to their
+		 * corresponding local entries without relying on result order or name
+		 * matching.
+		 */
+		appendStringInfo(cmd,
+						 "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
+						 "       seq.seqstart, seq.seqincrement, seq.seqmin,\n"
+						 "       seq.seqmax, seq.seqcycle\n"
+						 "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
+						 "JOIN pg_namespace n ON n.nspname = s.schname\n"
+						 "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
+						 "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
+						 "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
+						 seqstr->data);
+
+		res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					errcode(ERRCODE_CONNECTION_FAILURE),
+					errmsg("could not fetch sequence information from the publisher: %s",
+						   res->err));
+
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+		{
+			int64		last_value;
+			bool		is_called;
+			XLogRecPtr	page_lsn;
+			CopySeqResult sync_status;
+			LogicalRepSequenceInfo *seqinfo;
+
+			CHECK_FOR_INTERRUPTS();
+
+			if (ConfigReloadPending)
+			{
+				ConfigReloadPending = false;
+				ProcessConfigFile(PGC_SIGHUP);
+			}
+
+			sync_status = get_and_validate_seq_info(slot, &sequence_rel,
+													&seqinfo, &page_lsn,
+													&last_value, &is_called);
+			if (sync_status == COPYSEQ_SUCCESS)
+				sync_status = copy_sequence(seqinfo, last_value, is_called,
+											page_lsn,
+											sequence_rel->rd_rel->relowner);
+
+			switch (sync_status)
+			{
+				case COPYSEQ_MISMATCH:
+					append_sequence_name(mismatched_seqs, seqinfo->nspname,
+										 seqinfo->seqname,
+										 &batch_mismatched_count);
+					break;
+				case COPYSEQ_INSUFFICIENT_PERM:
+					append_sequence_name(insuffperm_seqs, seqinfo->nspname,
+										 seqinfo->seqname,
+										 &batch_insuffperm_count);
+					break;
+				case COPYSEQ_SKIPPED:
+					ereport(LOG,
+							errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
+								   seqinfo->nspname, seqinfo->seqname));
+					batch_skipped_count++;
+					break;
+
+				case COPYSEQ_SUCCESS:
+					ereport(DEBUG1,
+							errmsg_internal("logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
+											MySubscription->name,
+											seqinfo->nspname,
+											seqinfo->seqname));
+					batch_succeeded_count++;
+					break;
+			}
+
+			if (sequence_rel)
+				table_close(sequence_rel, NoLock);
+		}
+
+		ExecDropSingleTupleTableSlot(slot);
+		walrcv_clear_result(res);
+		resetStringInfo(seqstr);
+		resetStringInfo(cmd);
+
+		batch_missing_count = batch_size - (batch_succeeded_count +
+											batch_mismatched_count +
+											batch_insuffperm_count +
+											batch_skipped_count);
+
+		ereport(LOG,
+				errmsg("logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d skipped, %d mismatched, %d insufficient permission, %d missing from publisher",
+					   MySubscription->name,
+					   (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
+					   batch_size, batch_succeeded_count, batch_skipped_count,
+					   batch_mismatched_count, batch_insuffperm_count,
+					   batch_missing_count));
+
+		/* Commit this batch, and prepare for next batch */
+		CommitTransactionCommand();
+
+		if (batch_missing_count)
+		{
+			for (int idx = cur_batch_base_index; idx < batch_size; idx++)
+			{
+				LogicalRepSequenceInfo *seqinfo =
+					(LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
+
+				/* If the sequence was not found on publisher, record it */
+				if (!seqinfo->found_on_pub)
+					append_sequence_name(missing_seqs, seqinfo->nspname,
+										 seqinfo->seqname, NULL);
+			}
+		}
+
+		/*
+		 * cur_batch_base_index is not incremented sequentially because some
+		 * sequences may be missing, and the number of fetched rows may not
+		 * match the batch size.
+		 */
+		cur_batch_base_index += batch_size;
+	}
+
+	/* Report permission issues, mismatches, or missing sequences */
+	if (insuffperm_seqs->len || mismatched_seqs->len || missing_seqs->len)
+		report_sequence_errors(insuffperm_seqs, mismatched_seqs, missing_seqs);
+
+	destroyStringInfo(missing_seqs);
+	destroyStringInfo(mismatched_seqs);
+	destroyStringInfo(insuffperm_seqs);
+}
+
+/*
+ * Determines which sequences require synchronization and initiates their
+ * synchronization process.
+ */
+static void
+LogicalRepSyncSequences(void)
+{
+	char	   *err;
+	bool		must_use_password;
+	Relation	rel;
+	HeapTuple	tup;
+	ScanKeyData skey[2];
+	SysScanDesc scan;
+	Oid			subid = MyLogicalRepWorker->subid;
+	StringInfoData app_name;
+
+	StartTransactionCommand();
+
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&skey[0],
+				Anum_pg_subscription_rel_srsubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(subid));
+
+	ScanKeyInit(&skey[1],
+				Anum_pg_subscription_rel_srsubstate,
+				BTEqualStrategyNumber, F_CHAREQ,
+				CharGetDatum(SUBREL_STATE_INIT));
+
+	scan = systable_beginscan(rel, InvalidOid, false,
+							  NULL, 2, skey);
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_subscription_rel subrel;
+		char		relkind;
+		LogicalRepSequenceInfo *seq;
+		Relation	sequence_rel;
+		MemoryContext oldctx;
+
+		CHECK_FOR_INTERRUPTS();
+
+		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+		/* Skip if the relation is not a sequence */
+		relkind = get_rel_relkind(subrel->srrelid);
+		if (relkind != RELKIND_SEQUENCE)
+			continue;
+
+		sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
+
+		/* Skip if sequence was dropped concurrently */
+		if (!sequence_rel)
+			continue;
+
+		/*
+		 * Worker needs to process sequences across transaction boundary, so
+		 * allocate them under long-lived context.
+		 */
+		oldctx = MemoryContextSwitchTo(TopMemoryContext);
+
+		seq = palloc0_object(LogicalRepSequenceInfo);
+		seq->localrelid = subrel->srrelid;
+		seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
+		seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
+		seqinfos = lappend(seqinfos, seq);
+
+		MemoryContextSwitchTo(oldctx);
+
+		table_close(sequence_rel, NoLock);
+	}
+
+	/* Cleanup */
+	systable_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	CommitTransactionCommand();
+
+	/*
+	 * Exit early if no catalog entries found, likely due to concurrent drops.
+	 */
+	if (!seqinfos)
+		return;
+
+	/* Is the use of a password mandatory? */
+	must_use_password = MySubscription->passwordrequired &&
+		!MySubscription->ownersuperuser;
+
+	initStringInfo(&app_name);
+	appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
+					 MySubscription->oid, GetSystemIdentifier());
+
+	/*
+	 * Establish the connection to the publisher for sequence synchronization.
+	 */
+	LogRepWorkerWalRcvConn =
+		walrcv_connect(MySubscription->conninfo, true, true,
+					   must_use_password,
+					   app_name.data, &err);
+	if (LogRepWorkerWalRcvConn == NULL)
+		ereport(ERROR,
+				errcode(ERRCODE_CONNECTION_FAILURE),
+				errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
+					   MySubscription->name, err));
+
+	pfree(app_name.data);
+
+	copy_sequences(LogRepWorkerWalRcvConn);
+}
+
+/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if required.
+ *
+ * Allocate the slot name in long-lived context on return. Note that we don't
+ * handle FATAL errors which are probably because of system resource error and
+ * are not repeatable.
+ */
+static void
+start_sequence_sync()
+{
+	Assert(am_sequencesync_worker());
+
+	PG_TRY();
+	{
+		/* Call initial sync. */
+		LogicalRepSyncSequences();
+	}
+	PG_CATCH();
+	{
+		if (MySubscription->disableonerr)
+			DisableSubscriptionAndExit();
+		else
+		{
+			/*
+			 * Report the worker failed during sequence synchronization. Abort
+			 * the current transaction so that the stats message is sent in an
+			 * idle state.
+			 */
+			AbortOutOfAnyTransaction();
+			PG_RE_THROW();
+		}
+	}
+	PG_END_TRY();
+}
+
+/* Logical Replication sequencesync worker entry point */
+void
+SequenceSyncWorkerMain(Datum main_arg)
+{
+	int			worker_slot = DatumGetInt32(main_arg);
+
+	SetupApplyOrSyncWorker(worker_slot);
+
+	start_sequence_sync();
+
+	FinishSyncWorker();
+}
diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c
index ae8c9385916..c79a9c3baac 100644
--- a/src/backend/replication/logical/syncutils.c
+++ b/src/backend/replication/logical/syncutils.c
@@ -16,6 +16,7 @@
 
 #include "catalog/pg_subscription_rel.h"
 #include "pgstat.h"
+#include "replication/logicallauncher.h"
 #include "replication/worker_internal.h"
 #include "storage/ipc.h"
 #include "utils/lsyscache.h"
@@ -48,6 +49,8 @@ static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEE
 pg_noreturn void
 FinishSyncWorker(void)
 {
+	Assert(am_sequencesync_worker() || am_tablesync_worker());
+
 	/*
 	 * Commit any outstanding transaction. This is the usual case, unless
 	 * there was nothing to do for the table.
@@ -62,15 +65,29 @@ FinishSyncWorker(void)
 	XLogFlush(GetXLogWriteRecPtr());
 
 	StartTransactionCommand();
-	ereport(LOG,
-			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
-					MySubscription->name,
-					get_rel_name(MyLogicalRepWorker->relid))));
+	if (am_sequencesync_worker())
+		ereport(LOG,
+				errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
+					   MySubscription->name));
+	else
+		ereport(LOG,
+				errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
+					   MySubscription->name,
+					   get_rel_name(MyLogicalRepWorker->relid)));
+
 	CommitTransactionCommand();
 
-	/* Find the leader apply worker and signal it. */
-	logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
-							 InvalidOid);
+	if (am_sequencesync_worker())
+	{
+		/* Find the leader apply worker and reset last_seqsync_start_time. */
+		logicalrep_reset_seqsync_start_time();
+	}
+	else
+	{
+		/* Find the leader apply worker and signal it. */
+		logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
+								 InvalidOid);
+	}
 
 	/* Stop gracefully */
 	proc_exit(0);
@@ -86,7 +103,52 @@ InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
 }
 
 /*
- * Process possible state change(s) of relations that are being synchronized.
+ * Attempt to launch a sync worker (sequence or table) if there is a sync
+ * worker slot available and the retry interval has elapsed.
+ *
+ * wtype: sync worker type.
+ * nsyncworkers: Number of currently running sync workers for the subscription.
+ * relid:  InvalidOid for sequencesync worker, actual relid for tablesync
+ * worker.
+ * last_start_time: Pointer to the last start time of the worker.
+ */
+void
+launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid,
+				   TimestampTz *last_start_time)
+{
+	TimestampTz now;
+
+	Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
+		   (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));
+
+	/* If there is a free sync worker slot, start a new sync worker */
+	if (nsyncworkers >= max_sync_workers_per_subscription)
+		return;
+
+	now = GetCurrentTimestamp();
+
+	if (!(*last_start_time) ||
+		TimestampDifferenceExceeds(*last_start_time, now,
+								   wal_retrieve_retry_interval))
+	{
+		/*
+		 * Set the last_start_time even if we fail to start the worker, so
+		 * that we won't retry until wal_retrieve_retry_interval has elapsed.
+		 */
+		*last_start_time = now;
+		(void) logicalrep_worker_launch(wtype,
+										MyLogicalRepWorker->dbid,
+										MySubscription->oid,
+										MySubscription->name,
+										MyLogicalRepWorker->userid,
+										relid, DSM_HANDLE_INVALID, false);
+	}
+}
+
+/*
+ * Process possible state change(s) of relations that are being synchronized
+ * and start new tablesync workers for the newly added tables. Also, start a
+ * new sequencesync worker for the newly added sequences.
  */
 void
 ProcessSyncingRelations(XLogRecPtr current_lsn)
@@ -108,6 +170,12 @@ ProcessSyncingRelations(XLogRecPtr current_lsn)
 
 		case WORKERTYPE_APPLY:
 			ProcessSyncingTablesForApply(current_lsn);
+			ProcessSyncingSequencesForApply();
+			break;
+
+		case WORKERTYPE_SEQUENCESYNC:
+			/* Should never happen. */
+			elog(ERROR, "Sequence synchronization worker not expected to process relations");
 			break;
 
 		case WORKERTYPE_UNKNOWN:
@@ -117,17 +185,29 @@ ProcessSyncingRelations(XLogRecPtr current_lsn)
 }
 
 /*
- * Common code to fetch the up-to-date sync state info into the static lists.
+ * Common code to fetch the up-to-date sync state info for tables and sequences.
  *
- * Returns true if subscription has 1 or more tables, else false.
+ * The pg_subscription_rel catalog is shared by tables and sequences. Changes
+ * to either sequences or tables can affect the validity of relation states, so
+ * we identify non-READY tables and non-READY sequences together to ensure
+ * consistency.
  *
- * Note: If this function started the transaction (indicated by the parameter)
- * then it is the caller's responsibility to commit it.
+ * has_pending_subtables: true if the subscription has one or more tables that
+ * are not in READY state, otherwise false.
+ * has_pending_subsequences: true if the subscription has one or more sequences
+ * that are not in READY state, otherwise false.
  */
-bool
-FetchRelationStates(bool *started_tx)
+void
+FetchRelationStates(bool *has_pending_subtables,
+					bool *has_pending_subsequences,
+					bool *started_tx)
 {
+	/*
+	 * has_subtables and has_subsequences_non_ready are declared as static,
+	 * since the same value can be used until the system table is invalidated.
+	 */
 	static bool has_subtables = false;
+	static bool has_subsequences_non_ready = false;
 
 	*started_tx = false;
 
@@ -135,10 +215,10 @@ FetchRelationStates(bool *started_tx)
 	{
 		MemoryContext oldctx;
 		List	   *rstates;
-		ListCell   *lc;
 		SubscriptionRelState *rstate;
 
 		relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
+		has_subsequences_non_ready = false;
 
 		/* Clean the old lists. */
 		list_free_deep(table_states_not_ready);
@@ -150,17 +230,23 @@ FetchRelationStates(bool *started_tx)
 			*started_tx = true;
 		}
 
-		/* Fetch tables that are in non-ready state. */
-		rstates = GetSubscriptionRelations(MySubscription->oid, true, false,
+		/* Fetch tables and sequences that are in non-READY state. */
+		rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
 										   true);
 
 		/* Allocate the tracking info in a permanent memory context. */
 		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
-		foreach(lc, rstates)
+		foreach_ptr(SubscriptionRelState, subrel, rstates)
 		{
-			rstate = palloc(sizeof(SubscriptionRelState));
-			memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
-			table_states_not_ready = lappend(table_states_not_ready, rstate);
+			if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
+				has_subsequences_non_ready = true;
+			else
+			{
+				rstate = palloc(sizeof(SubscriptionRelState));
+				memcpy(rstate, subrel, sizeof(SubscriptionRelState));
+				table_states_not_ready = lappend(table_states_not_ready,
+												 rstate);
+			}
 		}
 		MemoryContextSwitchTo(oldctx);
 
@@ -185,5 +271,9 @@ FetchRelationStates(bool *started_tx)
 			relation_states_validity = SYNC_RELATIONS_STATE_VALID;
 	}
 
-	return has_subtables;
+	if (has_pending_subtables)
+		*has_pending_subtables = has_subtables;
+
+	if (has_pending_subsequences)
+		*has_pending_subsequences = has_subsequences_non_ready;
 }
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 58c98488d7b..c3b525e6557 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -374,14 +374,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
 	};
 	static HTAB *last_start_times = NULL;
 	ListCell   *lc;
-	bool		started_tx = false;
+	bool		started_tx;
 	bool		should_exit = false;
 	Relation	rel = NULL;
 
 	Assert(!IsTransactionState());
 
 	/* We need up-to-date sync state info for subscription tables here. */
-	FetchRelationStates(&started_tx);
+	FetchRelationStates(NULL, NULL, &started_tx);
 
 	/*
 	 * Prepare a hash table for tracking last start times of workers, to avoid
@@ -415,6 +415,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
 	{
 		SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
 
+		if (!started_tx)
+		{
+			StartTransactionCommand();
+			started_tx = true;
+		}
+
+		Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
+
 		if (rstate->state == SUBREL_STATE_SYNCDONE)
 		{
 			/*
@@ -428,11 +436,6 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
 
 				rstate->state = SUBREL_STATE_READY;
 				rstate->lsn = current_lsn;
-				if (!started_tx)
-				{
-					StartTransactionCommand();
-					started_tx = true;
-				}
 
 				/*
 				 * Remove the tablesync origin tracking if exists.
@@ -552,43 +555,19 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
 				 */
 				int			nsyncworkers =
 					logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+				struct tablesync_start_time_mapping *hentry;
+				bool		found;
 
 				/* Now safe to release the LWLock */
 				LWLockRelease(LogicalRepWorkerLock);
 
-				/*
-				 * If there are free sync worker slot(s), start a new sync
-				 * worker for the table.
-				 */
-				if (nsyncworkers < max_sync_workers_per_subscription)
-				{
-					TimestampTz now = GetCurrentTimestamp();
-					struct tablesync_start_time_mapping *hentry;
-					bool		found;
+				hentry = hash_search(last_start_times, &rstate->relid,
+									 HASH_ENTER, &found);
+				if (!found)
+					hentry->last_start_time = 0;
 
-					hentry = hash_search(last_start_times, &rstate->relid,
-										 HASH_ENTER, &found);
-
-					if (!found ||
-						TimestampDifferenceExceeds(hentry->last_start_time, now,
-												   wal_retrieve_retry_interval))
-					{
-						/*
-						 * Set the last_start_time even if we fail to start
-						 * the worker, so that we won't retry until
-						 * wal_retrieve_retry_interval has elapsed.
-						 */
-						hentry->last_start_time = now;
-						(void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
-														MyLogicalRepWorker->dbid,
-														MySubscription->oid,
-														MySubscription->name,
-														MyLogicalRepWorker->userid,
-														rstate->relid,
-														DSM_HANDLE_INVALID,
-														false);
-					}
-				}
+				launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers,
+								   rstate->relid, &hentry->last_start_time);
 			}
 		}
 	}
@@ -1596,7 +1575,7 @@ run_tablesync_worker()
 
 /* Logical Replication Tablesync worker entry point */
 void
-TablesyncWorkerMain(Datum main_arg)
+TableSyncWorkerMain(Datum main_arg)
 {
 	int			worker_slot = DatumGetInt32(main_arg);
 
@@ -1618,11 +1597,11 @@ TablesyncWorkerMain(Datum main_arg)
 bool
 AllTablesyncsReady(void)
 {
-	bool		started_tx = false;
-	bool		has_subrels = false;
+	bool		started_tx;
+	bool		has_tables;
 
 	/* We need up-to-date sync state info for subscription tables here. */
-	has_subrels = FetchRelationStates(&started_tx);
+	FetchRelationStates(&has_tables, NULL, &started_tx);
 
 	if (started_tx)
 	{
@@ -1634,7 +1613,7 @@ AllTablesyncsReady(void)
 	 * Return false when there are no tables in subscription or not all tables
 	 * are in ready state; true otherwise.
 	 */
-	return has_subrels && (table_states_not_ready == NIL);
+	return has_tables && (table_states_not_ready == NIL);
 }
 
 /*
@@ -1649,10 +1628,10 @@ bool
 HasSubscriptionTablesCached(void)
 {
 	bool		started_tx;
-	bool		has_subrels;
+	bool		has_tables;
 
 	/* We need up-to-date subscription tables info here */
-	has_subrels = FetchRelationStates(&started_tx);
+	FetchRelationStates(&has_tables, NULL, &started_tx);
 
 	if (started_tx)
 	{
@@ -1660,7 +1639,7 @@ HasSubscriptionTablesCached(void)
 		pgstat_report_stat(true);
 	}
 
-	return has_subrels;
+	return has_tables;
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7edd1c9cf06..8026a007ec3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -702,6 +702,11 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 					(rel->state == SUBREL_STATE_SYNCDONE &&
 					 rel->statelsn <= remote_final_lsn));
 
+		case WORKERTYPE_SEQUENCESYNC:
+			/* Should never happen. */
+			elog(ERROR, "Sequence synchronization worker not expected to apply changes");
+			break;
+
 		case WORKERTYPE_UNKNOWN:
 			/* Should never happen. */
 			elog(ERROR, "Unknown worker type");
@@ -1243,7 +1248,10 @@ apply_handle_commit(StringInfo s)
 
 	apply_handle_commit_internal(&commit_data);
 
-	/* Process any tables that are being synchronized in parallel. */
+	/*
+	 * Process any tables that are being synchronized in parallel, as well as
+	 * any newly added tables or sequences.
+	 */
 	ProcessSyncingRelations(commit_data.end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
@@ -1365,7 +1373,10 @@ apply_handle_prepare(StringInfo s)
 
 	in_remote_transaction = false;
 
-	/* Process any tables that are being synchronized in parallel. */
+	/*
+	 * Process any tables that are being synchronized in parallel, as well as
+	 * any newly added tables or sequences.
+	 */
 	ProcessSyncingRelations(prepare_data.end_lsn);
 
 	/*
@@ -1421,7 +1432,10 @@ apply_handle_commit_prepared(StringInfo s)
 	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
 	in_remote_transaction = false;
 
-	/* Process any tables that are being synchronized in parallel. */
+	/*
+	 * Process any tables that are being synchronized in parallel, as well as
+	 * any newly added tables or sequences.
+	 */
 	ProcessSyncingRelations(prepare_data.end_lsn);
 
 	clear_subscription_skip_lsn(prepare_data.end_lsn);
@@ -1487,7 +1501,10 @@ apply_handle_rollback_prepared(StringInfo s)
 	store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
 	in_remote_transaction = false;
 
-	/* Process any tables that are being synchronized in parallel. */
+	/*
+	 * Process any tables that are being synchronized in parallel, as well as
+	 * any newly added tables or sequences.
+	 */
 	ProcessSyncingRelations(rollback_data.rollback_end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
@@ -1622,7 +1639,10 @@ apply_handle_stream_prepare(StringInfo s)
 
 	pgstat_report_stat(false);
 
-	/* Process any tables that are being synchronized in parallel. */
+	/*
+	 * Process any tables that are being synchronized in parallel, as well as
+	 * any newly added tables or sequences.
+	 */
 	ProcessSyncingRelations(prepare_data.end_lsn);
 
 	/*
@@ -2465,7 +2485,10 @@ apply_handle_stream_commit(StringInfo s)
 			break;
 	}
 
-	/* Process any tables that are being synchronized in parallel. */
+	/*
+	 * Process any tables that are being synchronized in parallel, as well as
+	 * any newly added tables or sequences.
+	 */
 	ProcessSyncingRelations(commit_data.end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
@@ -4137,7 +4160,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			AcceptInvalidationMessages();
 			maybe_reread_subscription();
 
-			/* Process any table synchronization changes. */
+			/*
+			 * Process any relations that are being synchronized in parallel
+			 * and any newly added tables or sequences.
+			 */
 			ProcessSyncingRelations(last_received);
 		}
 
@@ -5580,7 +5606,8 @@ start_apply(XLogRecPtr origin_startpos)
 			 * idle state.
 			 */
 			AbortOutOfAnyTransaction();
-			pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+			pgstat_report_subscription_error(MySubscription->oid,
+											 !am_tablesync_worker());
 
 			PG_RE_THROW();
 		}
@@ -5700,8 +5727,8 @@ run_apply_worker()
 }
 
 /*
- * Common initialization for leader apply worker, parallel apply worker and
- * tablesync worker.
+ * Common initialization for leader apply worker, parallel apply worker,
+ * tablesync worker and sequencesync worker.
  *
  * Initialize the database connection, in-memory subscription and necessary
  * config options.
@@ -5812,6 +5839,10 @@ InitializeLogRepWorker(void)
 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
 						MySubscription->name,
 						get_rel_name(MyLogicalRepWorker->relid))));
+	else if (am_sequencesync_worker())
+		ereport(LOG,
+				(errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
+						MySubscription->name)));
 	else
 		ereport(LOG,
 				(errmsg("logical replication apply worker for subscription \"%s\" has started",
@@ -5831,14 +5862,16 @@ replorigin_reset(int code, Datum arg)
 	replorigin_session_origin_timestamp = 0;
 }
 
-/* Common function to setup the leader apply or tablesync worker. */
+/*
+ * Common function to setup the leader apply, tablesync and sequencesync worker.
+ */
 void
 SetupApplyOrSyncWorker(int worker_slot)
 {
 	/* Attach to slot */
 	logicalrep_worker_attach(worker_slot);
 
-	Assert(am_tablesync_worker() || am_leader_apply_worker());
+	Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker());
 
 	/* Setup signal handling */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -5921,9 +5954,15 @@ DisableSubscriptionAndExit(void)
 
 	RESUME_INTERRUPTS();
 
-	/* Report the worker failed during either table synchronization or apply */
-	pgstat_report_subscription_error(MyLogicalRepWorker->subid,
-									 !am_tablesync_worker());
+	if (am_leader_apply_worker() || am_tablesync_worker())
+	{
+		/*
+		 * Report the worker failed during either table synchronization or
+		 * apply.
+		 */
+		pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+										 !am_tablesync_worker());
+	}
 
 	/* Disable the subscription */
 	StartTransactionCommand();
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index d6fc8333850..0b49b98da99 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -1924,7 +1924,7 @@
 },
 
 { name => 'max_sync_workers_per_subscription', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_SUBSCRIBERS',
-  short_desc => 'Maximum number of table synchronization workers per subscription.',
+  short_desc => 'Maximum number of workers per subscription for synchronizing tables and sequences.',
   variable => 'max_sync_workers_per_subscription',
   boot_val => '2',
   min => '0',
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9121a382f76..34b7fddb0e7 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -3433,7 +3433,7 @@
   proname => 'pg_sequence_last_value', provolatile => 'v', proparallel => 'u',
   prorettype => 'int8', proargtypes => 'regclass',
   prosrc => 'pg_sequence_last_value' },
-{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump',
+{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump and sequence synchronization',
   proname => 'pg_get_sequence_data', provolatile => 'v', proparallel => 'u',
   prorettype => 'record', proargtypes => 'regclass',
   proallargtypes => '{regclass,int8,bool,pg_lsn}', proargmodes => '{i,o,o,o}',
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 9f88498ecd3..034c4524932 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -82,6 +82,17 @@ typedef struct SubscriptionRelState
 	char		state;
 } SubscriptionRelState;
 
+/*
+ * Stores metadata about a sequence involved in logical replication.
+ */
+typedef struct LogicalRepSequenceInfo
+{
+	char	   *seqname;
+	char	   *nspname;
+	Oid			localrelid;
+	bool		found_on_pub;
+} LogicalRepSequenceInfo;
+
 extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
 									XLogRecPtr sublsn, bool retain_lock);
 extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 9ac0b67683d..46b4d89dd6e 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -60,6 +60,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
 extern void SequenceChangePersistence(Oid relid, char newrelpersistence);
 extern void DeleteSequenceTuple(Oid relid);
 extern void ResetSequence(Oid seq_relid);
+extern void SetSequence(Oid relid, int64 next, bool is_called);
 extern void ResetSequenceCaches(void);
 
 extern void seq_redo(XLogReaderState *record);
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 88912606e4d..56fa79b648e 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -18,7 +18,8 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
 
 extern void ApplyWorkerMain(Datum main_arg);
 extern void ParallelApplyWorkerMain(Datum main_arg);
-extern void TablesyncWorkerMain(Datum main_arg);
+extern void TableSyncWorkerMain(Datum main_arg);
+extern void SequenceSyncWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 extern bool IsLogicalParallelApplyWorker(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index e23fa9a4514..32ef365f4a6 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType
 {
 	WORKERTYPE_UNKNOWN = 0,
 	WORKERTYPE_TABLESYNC,
+	WORKERTYPE_SEQUENCESYNC,
 	WORKERTYPE_APPLY,
 	WORKERTYPE_PARALLEL_APPLY,
 } LogicalRepWorkerType;
@@ -106,6 +107,8 @@ typedef struct LogicalRepWorker
 	TimestampTz last_recv_time;
 	XLogRecPtr	reply_lsn;
 	TimestampTz reply_time;
+
+	TimestampTz last_seqsync_start_time;
 } LogicalRepWorker;
 
 /*
@@ -264,6 +267,8 @@ extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
 									 Oid userid, Oid relid,
 									 dsm_handle subworker_dsm,
 									 bool retain_dead_tuples);
+extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
+							   Oid relid, TimestampTz *last_start_time);
 extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid,
 								   Oid relid);
 extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
@@ -272,6 +277,7 @@ extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid,
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int	logicalrep_sync_worker_count(Oid subid);
+extern void logicalrep_reset_seqsync_start_time(void);
 
 extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 											   char *originname, Size szoriginname);
@@ -282,11 +288,13 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state);
 
 extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
 extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
+extern void ProcessSyncingSequencesForApply(void);
 
 pg_noreturn extern void FinishSyncWorker(void);
 extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
 extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
-extern bool FetchRelationStates(bool *started_tx);
+extern void FetchRelationStates(bool *has_pending_subtables,
+								bool *has_pending_sequences, bool *started_tx);
 
 extern void stream_start_internal(TransactionId xid, bool first_segment);
 extern void stream_stop_internal(TransactionId xid);
@@ -353,13 +361,21 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 
 #define isParallelApplyWorker(worker) ((worker)->in_use && \
 									   (worker)->type == WORKERTYPE_PARALLEL_APPLY)
-#define isTablesyncWorker(worker) ((worker)->in_use && \
+#define isTableSyncWorker(worker) ((worker)->in_use && \
 								   (worker)->type == WORKERTYPE_TABLESYNC)
+#define isSequenceSyncWorker(worker) ((worker)->in_use && \
+									  (worker)->type == WORKERTYPE_SEQUENCESYNC)
 
 static inline bool
 am_tablesync_worker(void)
 {
-	return isTablesyncWorker(MyLogicalRepWorker);
+	return isTableSyncWorker(MyLogicalRepWorker);
+}
+
+static inline bool
+am_sequencesync_worker(void)
+{
+	return isSequenceSyncWorker(MyLogicalRepWorker);
 }
 
 static inline bool
diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl
index 557fc91c017..897f08877ed 100644
--- a/src/test/subscription/t/036_sequences.pl
+++ b/src/test/subscription/t/036_sequences.pl
@@ -1,7 +1,7 @@
 
 # Copyright (c) 2025, PostgreSQL Global Development Group
 
-# This tests that sequences are registered to be synced to the subscriber
+# This tests that sequences are synced correctly to the subscriber
 use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
@@ -14,6 +14,7 @@ my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 # Avoid checkpoint during the test, otherwise, extra values will be fetched for
 # the sequences which will cause the test to fail randomly.
 $node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'checkpoint_timeout = 1h');
 $node_publisher->start;
 
 # Initialize subscriber node
@@ -28,7 +29,14 @@ my $ddl = qq(
 );
 $node_publisher->safe_psql('postgres', $ddl);
 
-# Setup the same structure on the subscriber
+# Setup the same structure on the subscriber, plus some extra sequences that
+# we'll create on the publisher later
+$ddl = qq(
+	CREATE TABLE regress_seq_test (v BIGINT);
+	CREATE SEQUENCE regress_s1;
+	CREATE SEQUENCE regress_s2;
+	CREATE SEQUENCE regress_s3;
+);
 $node_subscriber->safe_psql('postgres', $ddl);
 
 # Insert initial test data
@@ -46,10 +54,170 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_connstr' PUBLICATION regress_seq_pub"
 );
 
-# Confirm sequences can be listed in pg_subscription_rel
-my $result = $node_subscriber->safe_psql('postgres',
-	"SELECT relname, srsubstate FROM pg_class, pg_subscription_rel WHERE oid = srrelid"
+# Wait for initial sync to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check the initial data on subscriber
+my $result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	SELECT last_value, log_cnt, is_called FROM regress_s1;
+));
+is($result, '100|0|t', 'initial test data replicated');
+
+##########
+## ALTER SUBSCRIPTION ... REFRESH PUBLICATION should cause sync of new
+# sequences of the publisher, but changes to existing sequences should
+# not be synced.
+##########
+
+# Create a new sequence 'regress_s2', and update existing sequence 'regress_s1'
+$node_publisher->safe_psql(
+	'postgres', qq(
+	CREATE SEQUENCE regress_s2;
+	INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100);
+
+	-- Existing sequence
+	INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100);
+));
+
+# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION;
+));
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$result = $node_publisher->safe_psql(
+	'postgres', qq(
+	SELECT last_value, log_cnt, is_called FROM regress_s1;
+));
+is($result, '200|31|t', 'Check sequence value in the publisher');
+
+# Check - existing sequence ('regress_s1') is not synced
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	SELECT last_value, log_cnt, is_called FROM regress_s1;
+));
+is($result, '100|0|t', 'REFRESH PUBLICATION will not sync existing sequence');
+
+# Check - newly published sequence ('regress_s2') is synced
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	SELECT last_value, log_cnt, is_called FROM regress_s2;
+));
+is($result, '100|0|t',
+	'REFRESH PUBLICATION will sync newly published sequence');
+
+##########
+# Test: REFRESH SEQUENCES and REFRESH PUBLICATION (copy_data = off)
+#
+# 1. ALTER SUBSCRIPTION ... REFRESH SEQUENCES should re-synchronize all
+#    existing sequences, but not synchronize newly added ones.
+# 2. ALTER SUBSCRIPTION ... REFRESH PUBLICATION with (copy_data = off) should
+#    also not update sequence values for newly added sequences.
+##########
+
+# Create a new sequence 'regress_s3', and update the existing sequence
+# 'regress_s2'.
+$node_publisher->safe_psql(
+	'postgres', qq(
+	CREATE SEQUENCE regress_s3;
+	INSERT INTO regress_seq_test SELECT nextval('regress_s3') FROM generate_series(1,100);
+
+	-- Existing sequence
+	INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100);
+));
+
+# 1. Do ALTER SUBSCRIPTION ... REFRESH SEQUENCES
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;
+));
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check - existing sequences ('regress_s1' and 'regress_s2') are synced
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	SELECT last_value, log_cnt, is_called FROM regress_s1;
+));
+is($result, '200|0|t', 'REFRESH SEQUENCES will sync existing sequences');
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	SELECT last_value, log_cnt, is_called FROM regress_s2;
+));
+is($result, '200|0|t', 'REFRESH SEQUENCES will sync existing sequences');
+
+# Check - newly published sequence ('regress_s3') is not synced
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	SELECT last_value, log_cnt, is_called FROM regress_s3;
+));
+is($result, '1|0|f',
+	'REFRESH SEQUENCES will not sync newly published sequence');
+
+# 2. Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data as false
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION WITH (copy_data = false);
+));
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check - newly published sequence ('regress_s3') is not synced when
+# (copy_data = off).
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	SELECT last_value, log_cnt, is_called FROM regress_s3;
+));
+is($result, '1|0|f',
+	'REFRESH PUBLICATION will not sync newly published sequence with copy_data as off'
+);
+
+##########
+# ALTER SUBSCRIPTION ... REFRESH PUBLICATION should report an error when:
+# a) sequence definitions differ between the publisher and subscriber, or
+# b) a sequence is missing on the publisher.
+##########
+
+# Create a new sequence 'regress_s4' whose START value is not the same in the
+# publisher and subscriber.
+$node_publisher->safe_psql(
+	'postgres', qq(
+	CREATE SEQUENCE regress_s4 START 1 INCREMENT 2;
+));
+
+$node_subscriber->safe_psql(
+	'postgres', qq(
+	CREATE SEQUENCE regress_s4 START 10 INCREMENT 2;
+));
+
+my $log_offset = -s $node_subscriber->logfile;
+
+# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION
+$node_subscriber->safe_psql(
+	'postgres', "
+	ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION"
 );
-is($result, 'regress_s1|i', "Sequence can be in pg_subscription_rel catalog");
+
+# Verify that an error is logged for parameter differences on sequence
+# ('regress_s4').
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? logical replication sequence synchronization failed for subscription "regress_seq_sub"\n.*DETAIL:.* Mismatched or renamed sequence\(s\) on subscriber: \("public.regress_s4"\)/,
+	$log_offset);
+
+$node_publisher->safe_psql(
+	'postgres', qq(
+	DROP SEQUENCE regress_s4;
+));
+
+# Verify that an error is logged for the missing sequence ('regress_s4').
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? logical replication sequence synchronization failed for subscription "regress_seq_sub"\n.*DETAIL:.* Missing sequence\(s\) on publisher: \("public.regress_s4"\)/,
+	$log_offset);
 
 done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index ac2da4c98cf..735dd8317d6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -526,6 +526,7 @@ CopyMethod
 CopyMultiInsertBuffer
 CopyMultiInsertInfo
 CopyOnErrorChoice
+CopySeqResult
 CopySource
 CopyStmt
 CopyToRoutine
@@ -1628,6 +1629,7 @@ LogicalRepRelId
 LogicalRepRelMapEntry
 LogicalRepRelation
 LogicalRepRollbackPreparedTxnData
+LogicalRepSequenceInfo
 LogicalRepStreamAbortData
 LogicalRepTupleData
 LogicalRepTyp
-- 
2.43.0

