Skip to content

Configuration


Register every table that participates in sync. Registration order matters for dependency resolution.

registry := synchro.NewRegistry()
// User-owned table: push and pull enabled
registry.Register(&synchro.TableConfig{
TableName: "tasks",
PushPolicy: synchro.PushPolicyOwnerOnly,
OwnerColumn: "user_id",
BucketByColumn: "user_id",
BucketPrefix: "user:",
})
// Child table: inherits ownership through parent chain
registry.Register(&synchro.TableConfig{
TableName: "comments",
PushPolicy: synchro.PushPolicyOwnerOnly,
ParentTable: "tasks",
ParentFKCol: "task_id",
Dependencies: []string{"tasks"},
BucketByColumn: "task_id",
BucketPrefix: "task:",
})
// Reference table: pull-only
registry.Register(&synchro.TableConfig{
TableName: "categories",
PushPolicy: synchro.PushPolicyDisabled,
})
// Nullable owner + global-read behavior is explicit
registry.Register(&synchro.TableConfig{
TableName: "tags",
PushPolicy: synchro.PushPolicyOwnerOnly,
OwnerColumn: "user_id",
BucketByColumn: "user_id",
BucketPrefix: "user:",
GlobalWhenBucketNull: true,
AllowGlobalRead: true,
})
FieldRequiredDefaultDescription
TableNameYesDatabase table name
PushPolicyNoinferredowner_only or disabled
OwnerColumnConditional""Column holding user ID. Required for pushable tables without ParentTable.
ParentTableNo""Parent table name for child records
ParentFKColConditional""FK column to parent. Required when ParentTable is set.
SyncColumnsNonilColumn subset to include in pull responses. nil = all columns.
DependenciesNonilTables that must sync first (push ordering hint for clients)
IDColumnNo"id"Primary key column name
UpdatedAtColumnNo"updated_at"Timestamp column for conflict detection
DeletedAtColumnNo"deleted_at"Soft delete column
ProtectedColumnsNonilAdditional columns clients cannot write (beyond defaults)
BucketByColumnNoOwnerColumnFast-path bucket source column
BucketPrefixNo"user:"Prefix applied to BucketByColumn values
GlobalWhenBucketNullNofalseEmits global when bucket source value is null/empty
AllowGlobalReadNofalseAdds global-read RLS behavior for null-owner rows
BucketFunctionNo""Optional SQL bucket resolver function override

Registry.Validate() runs automatically on NewEngine() and checks:

  • ParentTable references a registered table.
  • ParentFKCol is set when ParentTable is set.
  • Parent chains terminate at a table with OwnerColumn (no orphaned chains).
  • No cycles in parent chains.
  • Pushable tables have either OwnerColumn or ParentTable.
  • ProtectedColumns does not redundantly list default protected columns.

engine, err := synchro.NewEngine(synchro.Config{
DB: db, // *sql.DB
Registry: registry,
Hooks: hooks, // optional
ConflictResolver: nil, // defaults to LWWResolver
Ownership: nil, // defaults to JoinResolver
MinClientVersion: "1.2.0", // optional semver gate
ClockSkewTolerance: 5 * time.Second, // optional LWW tolerance
Logger: slog.Default(), // optional, defaults to slog.Default()
Compactor: &synchro.CompactorConfig{ // optional, enables changelog compaction
StaleThreshold: 7 * 24 * time.Hour, // deactivate clients inactive for 7 days
BatchSize: 10000, // rows deleted per batch
},
})
// Manual compaction (e.g., from a cron job)
result, err := engine.RunCompaction(ctx)
// Or start background compaction on an interval
engine.StartCompaction(ctx, 1*time.Hour)

All hooks are optional. Set the ones you need.

hooks := synchro.Hooks{
// Called within the push transaction after records are applied.
// Use for side effects like rebuilding search indexes.
OnPushAccepted: func(ctx context.Context, tx *sql.Tx, accepted []synchro.AcceptedRecord) error {
for _, rec := range accepted {
if rec.TableName == "tasks" {
// Rebuild search index for the modified task
_, err := tx.ExecContext(ctx,
"SELECT rebuild_task_search_index($1)", rec.ID)
if err != nil {
return err
}
}
}
return nil
},
// Informational callback when a conflict is detected.
// Cannot change the resolution.
OnConflict: func(ctx context.Context, conflict synchro.Conflict, resolution synchro.Resolution) {
slog.InfoContext(ctx, "sync conflict resolved",
"table", conflict.Table,
"record_id", conflict.RecordID,
"winner", resolution.Winner)
},
// Called after a successful pull.
OnPullComplete: func(ctx context.Context, clientID string, checkpoint int64, count int) {
slog.InfoContext(ctx, "pull complete",
"client_id", clientID,
"checkpoint", checkpoint,
"count", count)
},
// Called when a client version is below minimum.
OnSchemaIncompatible: func(ctx context.Context, clientID, clientVer, minVer string) {
slog.WarnContext(ctx, "client version too old",
"client_id", clientID,
"client_version", clientVer,
"min_version", minVer)
},
// Called when a client hasn't synced recently.
// Return true to allow, false to reject.
OnStaleClient: func(ctx context.Context, clientID string, lastSync time.Time) bool {
return time.Since(lastSync) < 30*24*time.Hour // allow up to 30 days
},
// Called after a compaction run completes.
OnCompaction: func(ctx context.Context, result synchro.CompactResult) {
slog.InfoContext(ctx, "compaction complete",
"deactivated", result.DeactivatedClients,
"safe_seq", result.SafeSeq,
"deleted", result.DeletedEntries)
},
// Called when a client must rebuild from a full snapshot.
OnSnapshotRequired: func(ctx context.Context, clientID string, checkpoint, minSeq int64, reason string) {
slog.WarnContext(ctx, "client requires snapshot rebuild",
"client_id", clientID,
"checkpoint", checkpoint,
"min_seq", minSeq,
"reason", reason)
},
}

The default JoinResolver handles most cases. Implement OwnershipResolver for custom bucketing logic (e.g., sharing, group ownership).

type OwnershipResolver interface {
ResolveOwner(ctx context.Context, db synchro.DB, table string, recordID string, data map[string]any) ([]string, error)
}

The returned slice contains bucket IDs. A record can belong to multiple buckets.

type SharingResolver struct {
inner synchro.OwnershipResolver
db *sql.DB
}
func (r *SharingResolver) ResolveOwner(ctx context.Context, db synchro.DB, table string, recordID string, data map[string]any) ([]string, error) {
// Get the standard owner buckets
buckets, err := r.inner.ResolveOwner(ctx, db, table, recordID, data)
if err != nil {
return nil, err
}
// Check if record is shared with other users
if table == "tasks" {
rows, err := r.db.QueryContext(ctx,
"SELECT shared_with_user_id FROM task_shares WHERE task_id = $1",
recordID)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var sharedUserID string
if err := rows.Scan(&sharedUserID); err != nil {
return nil, err
}
buckets = append(buckets, fmt.Sprintf("user:%s", sharedUserID))
}
}
return buckets, nil
}

type ConflictResolver interface {
Resolve(ctx context.Context, conflict synchro.Conflict) (synchro.Resolution, error)
}

The Conflict struct provides:

FieldTypeDescription
TablestringTable name
RecordIDstringRecord primary key
ClientIDstringPushing client
UserIDstringAuthenticated user
ClientDatajson.RawMessageClient’s version of the record
ServerDatajson.RawMessageServer’s current version
ClientTimetime.TimeClient’s client_updated_at
ServerTimetime.TimeServer’s updated_at
BaseVersion*time.TimeClient’s base_updated_at (optional, for optimistic concurrency)

Return Resolution{Winner: "client"} to accept the push, or Resolution{Winner: "server"} to reject it.

type PerTableResolver struct {
resolvers map[string]synchro.ConflictResolver
fallback synchro.ConflictResolver
}
func (r *PerTableResolver) Resolve(ctx context.Context, c synchro.Conflict) (synchro.Resolution, error) {
if resolver, ok := r.resolvers[c.Table]; ok {
return resolver.Resolve(ctx, c)
}
return r.fallback.Resolve(ctx, c)
}

Synchro uses Config.Logger for structured logs. For OpenTelemetry correlation, configure your logger pipeline in the host application (for example via otelslog) and pass that logger into Config.Logger.

Pass a custom *slog.Logger to Config.Logger. If nil, slog.Default() is used.

logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
engine, _ := synchro.NewEngine(synchro.Config{
// ...
Logger: logger,
})

Synchro provides net/http handlers. Wire them into your router.

import "github.com/trainstar/synchro/handler"
h := handler.New(engine)
mux := http.NewServeMux()
mux.HandleFunc("POST /sync/register", h.ServeRegister)
mux.HandleFunc("POST /sync/pull", h.ServePull)
mux.HandleFunc("POST /sync/push", h.ServePush)
mux.HandleFunc("POST /sync/snapshot", h.ServeSnapshot)
mux.HandleFunc("GET /sync/tables", h.ServeTableMeta)
mux.HandleFunc("GET /sync/schema", h.ServeSchema)
import "github.com/trainstar/synchro/handler"
h := handler.New(engine)
g := e.Group("/sync")
g.POST("/register", echo.WrapHandler(http.HandlerFunc(h.ServeRegister)))
g.POST("/pull", echo.WrapHandler(http.HandlerFunc(h.ServePull)))
g.POST("/push", echo.WrapHandler(http.HandlerFunc(h.ServePush)))
g.POST("/snapshot", echo.WrapHandler(http.HandlerFunc(h.ServeSnapshot)))
g.GET("/tables", echo.WrapHandler(http.HandlerFunc(h.ServeTableMeta)))
g.GET("/schema", echo.WrapHandler(http.HandlerFunc(h.ServeSchema)))

Or wrap directly in your Echo handler to use Echo’s context:

g.POST("/push", func(c echo.Context) error {
userID := c.Get("user_id").(string)
ctx := handler.WithUserID(c.Request().Context(), userID)
h.ServePush(c.Response(), c.Request().WithContext(ctx))
return nil
})
import "github.com/trainstar/synchro/handler"
h := handler.New(engine)
r := chi.NewRouter()
r.Route("/sync", func(r chi.Router) {
r.Post("/register", h.ServeRegister)
r.Post("/pull", h.ServePull)
r.Post("/push", h.ServePush)
r.Post("/snapshot", h.ServeSnapshot)
r.Get("/tables", h.ServeTableMeta)
r.Get("/schema", h.ServeSchema)
})

Synchro handlers read the user ID from context via handler.UserIDFromContext(ctx). You must inject it.

wrapped := handler.UserIDMiddleware("X-User-ID", mux)
func authMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userID := extractUserIDFromJWT(r) // your auth logic
ctx := handler.WithUserID(r.Context(), userID)
next.ServeHTTP(w, r.WithContext(ctx))
})
}

Optional middleware that rejects clients below a minimum version:

wrapped := handler.VersionCheckMiddleware("X-App-Version", "1.2.0", mux)

Synchro automatically returns 503 Service Unavailable with a Retry-After header when it detects transient errors (closed DB pool, connection timeout, network failure). The default retry interval is 5 seconds:

h := handler.New(engine, handler.WithDefaultRetryAfter(10)) // 10 seconds

For application-level backpressure (rate limiting, maintenance mode), use the middleware. The consuming app provides the policy:

wrapped := handler.RetryAfterMiddleware(func(r *http.Request) (bool, int, int) {
if rateLimiter.IsExceeded(r) {
return true, 429, 60
}
return false, 0, 0
}, mux)
wrapped := handler.RetryAfterMiddleware(func(r *http.Request) (bool, int, int) {
if maintenanceMode {
return true, 503, 300
}
return false, 0, 0
}, mux)

Both the HTTP Retry-After header and a retry_after field in the JSON body are set, so clients can use either.


The WAL consumer runs as a long-lived goroutine alongside your server.

import "github.com/trainstar/synchro/wal"
consumer := wal.NewConsumer(wal.ConsumerConfig{
ConnString: "postgres://user:pass@host:5432/db?replication=database",
SlotName: "synchro_slot",
PublicationName: "synchro_pub",
Registry: registry,
Assigner: synchro.NewJoinResolverWithDB(registry, db), // implements BucketAssigner
ChangelogDB: db, // *sql.DB
Logger: logger,
StandbyTimeout: 10 * time.Second, // default
})
// Run in a goroutine — blocks until context is cancelled
go func() {
if err := consumer.Start(ctx); err != nil && ctx.Err() == nil {
log.Fatal("WAL consumer failed", "err", err)
}
}()
-- Create the publication for tables you want to sync
CREATE PUBLICATION synchro_pub FOR TABLE
tasks, comments, tags, categories;
-- Ensure wal_level = logical (requires restart)
ALTER SYSTEM SET wal_level = 'logical';

Synchro provides migration SQL but does not run migrations itself. Integrate with your migration system.

import "github.com/trainstar/synchro/migrate"
// Get infrastructure table DDL
stmts := migrate.Migrations()
// Get RLS policies from your registry
rlsStmts := synchro.GenerateRLSPolicies(registry)
// Run through your migration system
for _, stmt := range append(stmts, rlsStmts...) {
_, err := db.Exec(stmt)
if err != nil {
log.Fatal(err)
}
}
TablePurpose
sync_changelogAppend-only changelog with BIGSERIAL seq, indexed by (bucket_id, seq)
sync_clientsClient registration, bucket subscriptions, checkpoint tracking
sync_wal_positionWAL consumer LSN tracking for crash recovery
sync_bucket_edgesMembership index for bucket delta assignment
sync_rule_failuresResolver failures for operational debugging and replay tooling
sync_schema_manifestSchema contract version/hash history

GenerateRLSPolicies(registry) produces:

  • PushPolicyDisabled tables — read-only behavior (no write policies generated).
  • Tables with OwnerColumnSELECT, INSERT, UPDATE, DELETE policies scoped to owner_col::text = current_setting('app.user_id', true).
  • Tables with AllowGlobalRead=trueSELECT additionally allows owner_col IS NULL.
  • Child tablesEXISTS subquery through parent chain to verify ownership.