Configuration
Table Registration
Section titled “Table Registration”Register every table that participates in sync. Registration order matters for dependency resolution.
registry := synchro.NewRegistry()
// User-owned table: push and pull enabledregistry.Register(&synchro.TableConfig{ TableName: "tasks", PushPolicy: synchro.PushPolicyOwnerOnly, OwnerColumn: "user_id", BucketByColumn: "user_id", BucketPrefix: "user:",})
// Child table: inherits ownership through parent chainregistry.Register(&synchro.TableConfig{ TableName: "comments", PushPolicy: synchro.PushPolicyOwnerOnly, ParentTable: "tasks", ParentFKCol: "task_id", Dependencies: []string{"tasks"}, BucketByColumn: "task_id", BucketPrefix: "task:",})
// Reference table: pull-onlyregistry.Register(&synchro.TableConfig{ TableName: "categories", PushPolicy: synchro.PushPolicyDisabled,})
// Nullable owner + global-read behavior is explicitregistry.Register(&synchro.TableConfig{ TableName: "tags", PushPolicy: synchro.PushPolicyOwnerOnly, OwnerColumn: "user_id", BucketByColumn: "user_id", BucketPrefix: "user:", GlobalWhenBucketNull: true, AllowGlobalRead: true,})TableConfig Fields
Section titled “TableConfig Fields”| Field | Required | Default | Description |
|---|---|---|---|
TableName | Yes | — | Database table name |
PushPolicy | No | inferred | owner_only or disabled |
OwnerColumn | Conditional | "" | Column holding user ID. Required for pushable tables without ParentTable. |
ParentTable | No | "" | Parent table name for child records |
ParentFKCol | Conditional | "" | FK column to parent. Required when ParentTable is set. |
SyncColumns | No | nil | Column subset to include in pull responses. nil = all columns. |
Dependencies | No | nil | Tables that must sync first (push ordering hint for clients) |
IDColumn | No | "id" | Primary key column name |
UpdatedAtColumn | No | "updated_at" | Timestamp column for conflict detection |
DeletedAtColumn | No | "deleted_at" | Soft delete column |
ProtectedColumns | No | nil | Additional columns clients cannot write (beyond defaults) |
BucketByColumn | No | OwnerColumn | Fast-path bucket source column |
BucketPrefix | No | "user:" | Prefix applied to BucketByColumn values |
GlobalWhenBucketNull | No | false | Emits global when bucket source value is null/empty |
AllowGlobalRead | No | false | Adds global-read RLS behavior for null-owner rows |
BucketFunction | No | "" | Optional SQL bucket resolver function override |
Validation Rules
Section titled “Validation Rules”Registry.Validate() runs automatically on NewEngine() and checks:
ParentTablereferences a registered table.ParentFKColis set whenParentTableis set.- Parent chains terminate at a table with
OwnerColumn(no orphaned chains). - No cycles in parent chains.
- Pushable tables have either
OwnerColumnorParentTable. ProtectedColumnsdoes not redundantly list default protected columns.
Engine Setup
Section titled “Engine Setup”engine, err := synchro.NewEngine(synchro.Config{ DB: db, // *sql.DB Registry: registry, Hooks: hooks, // optional ConflictResolver: nil, // defaults to LWWResolver Ownership: nil, // defaults to JoinResolver MinClientVersion: "1.2.0", // optional semver gate ClockSkewTolerance: 5 * time.Second, // optional LWW tolerance Logger: slog.Default(), // optional, defaults to slog.Default() Compactor: &synchro.CompactorConfig{ // optional, enables changelog compaction StaleThreshold: 7 * 24 * time.Hour, // deactivate clients inactive for 7 days BatchSize: 10000, // rows deleted per batch },})
// Manual compaction (e.g., from a cron job)result, err := engine.RunCompaction(ctx)
// Or start background compaction on an intervalengine.StartCompaction(ctx, 1*time.Hour)All hooks are optional. Set the ones you need.
hooks := synchro.Hooks{ // Called within the push transaction after records are applied. // Use for side effects like rebuilding search indexes. OnPushAccepted: func(ctx context.Context, tx *sql.Tx, accepted []synchro.AcceptedRecord) error { for _, rec := range accepted { if rec.TableName == "tasks" { // Rebuild search index for the modified task _, err := tx.ExecContext(ctx, "SELECT rebuild_task_search_index($1)", rec.ID) if err != nil { return err } } } return nil },
// Informational callback when a conflict is detected. // Cannot change the resolution. OnConflict: func(ctx context.Context, conflict synchro.Conflict, resolution synchro.Resolution) { slog.InfoContext(ctx, "sync conflict resolved", "table", conflict.Table, "record_id", conflict.RecordID, "winner", resolution.Winner) },
// Called after a successful pull. OnPullComplete: func(ctx context.Context, clientID string, checkpoint int64, count int) { slog.InfoContext(ctx, "pull complete", "client_id", clientID, "checkpoint", checkpoint, "count", count) },
// Called when a client version is below minimum. OnSchemaIncompatible: func(ctx context.Context, clientID, clientVer, minVer string) { slog.WarnContext(ctx, "client version too old", "client_id", clientID, "client_version", clientVer, "min_version", minVer) },
// Called when a client hasn't synced recently. // Return true to allow, false to reject. OnStaleClient: func(ctx context.Context, clientID string, lastSync time.Time) bool { return time.Since(lastSync) < 30*24*time.Hour // allow up to 30 days },
// Called after a compaction run completes. OnCompaction: func(ctx context.Context, result synchro.CompactResult) { slog.InfoContext(ctx, "compaction complete", "deactivated", result.DeactivatedClients, "safe_seq", result.SafeSeq, "deleted", result.DeletedEntries) },
// Called when a client must rebuild from a full snapshot. OnSnapshotRequired: func(ctx context.Context, clientID string, checkpoint, minSeq int64, reason string) { slog.WarnContext(ctx, "client requires snapshot rebuild", "client_id", clientID, "checkpoint", checkpoint, "min_seq", minSeq, "reason", reason) },}Custom OwnershipResolver
Section titled “Custom OwnershipResolver”The default JoinResolver handles most cases. Implement OwnershipResolver for custom bucketing logic (e.g., sharing, group ownership).
type OwnershipResolver interface { ResolveOwner(ctx context.Context, db synchro.DB, table string, recordID string, data map[string]any) ([]string, error)}The returned slice contains bucket IDs. A record can belong to multiple buckets.
Example: Sharing Resolver
Section titled “Example: Sharing Resolver”type SharingResolver struct { inner synchro.OwnershipResolver db *sql.DB}
func (r *SharingResolver) ResolveOwner(ctx context.Context, db synchro.DB, table string, recordID string, data map[string]any) ([]string, error) { // Get the standard owner buckets buckets, err := r.inner.ResolveOwner(ctx, db, table, recordID, data) if err != nil { return nil, err }
// Check if record is shared with other users if table == "tasks" { rows, err := r.db.QueryContext(ctx, "SELECT shared_with_user_id FROM task_shares WHERE task_id = $1", recordID) if err != nil { return nil, err } defer rows.Close()
for rows.Next() { var sharedUserID string if err := rows.Scan(&sharedUserID); err != nil { return nil, err } buckets = append(buckets, fmt.Sprintf("user:%s", sharedUserID)) } }
return buckets, nil}Custom ConflictResolver
Section titled “Custom ConflictResolver”type ConflictResolver interface { Resolve(ctx context.Context, conflict synchro.Conflict) (synchro.Resolution, error)}The Conflict struct provides:
| Field | Type | Description |
|---|---|---|
Table | string | Table name |
RecordID | string | Record primary key |
ClientID | string | Pushing client |
UserID | string | Authenticated user |
ClientData | json.RawMessage | Client’s version of the record |
ServerData | json.RawMessage | Server’s current version |
ClientTime | time.Time | Client’s client_updated_at |
ServerTime | time.Time | Server’s updated_at |
BaseVersion | *time.Time | Client’s base_updated_at (optional, for optimistic concurrency) |
Return Resolution{Winner: "client"} to accept the push, or Resolution{Winner: "server"} to reject it.
Example: Per-Table Resolver
Section titled “Example: Per-Table Resolver”type PerTableResolver struct { resolvers map[string]synchro.ConflictResolver fallback synchro.ConflictResolver}
func (r *PerTableResolver) Resolve(ctx context.Context, c synchro.Conflict) (synchro.Resolution, error) { if resolver, ok := r.resolvers[c.Table]; ok { return resolver.Resolve(ctx, c) } return r.fallback.Resolve(ctx, c)}Telemetry Integration
Section titled “Telemetry Integration”Synchro uses Config.Logger for structured logs. For OpenTelemetry correlation, configure your logger pipeline in the host application (for example via otelslog) and pass that logger into Config.Logger.
Structured Logging
Section titled “Structured Logging”Pass a custom *slog.Logger to Config.Logger. If nil, slog.Default() is used.
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo,}))
engine, _ := synchro.NewEngine(synchro.Config{ // ... Logger: logger,})HTTP Handler Wiring
Section titled “HTTP Handler Wiring”Synchro provides net/http handlers. Wire them into your router.
Standard Library (net/http)
Section titled “Standard Library (net/http)”import "github.com/trainstar/synchro/handler"
h := handler.New(engine)
mux := http.NewServeMux()mux.HandleFunc("POST /sync/register", h.ServeRegister)mux.HandleFunc("POST /sync/pull", h.ServePull)mux.HandleFunc("POST /sync/push", h.ServePush)mux.HandleFunc("POST /sync/snapshot", h.ServeSnapshot)mux.HandleFunc("GET /sync/tables", h.ServeTableMeta)mux.HandleFunc("GET /sync/schema", h.ServeSchema)import "github.com/trainstar/synchro/handler"
h := handler.New(engine)
g := e.Group("/sync")g.POST("/register", echo.WrapHandler(http.HandlerFunc(h.ServeRegister)))g.POST("/pull", echo.WrapHandler(http.HandlerFunc(h.ServePull)))g.POST("/push", echo.WrapHandler(http.HandlerFunc(h.ServePush)))g.POST("/snapshot", echo.WrapHandler(http.HandlerFunc(h.ServeSnapshot)))g.GET("/tables", echo.WrapHandler(http.HandlerFunc(h.ServeTableMeta)))g.GET("/schema", echo.WrapHandler(http.HandlerFunc(h.ServeSchema)))Or wrap directly in your Echo handler to use Echo’s context:
g.POST("/push", func(c echo.Context) error { userID := c.Get("user_id").(string) ctx := handler.WithUserID(c.Request().Context(), userID) h.ServePush(c.Response(), c.Request().WithContext(ctx)) return nil})import "github.com/trainstar/synchro/handler"
h := handler.New(engine)
r := chi.NewRouter()r.Route("/sync", func(r chi.Router) { r.Post("/register", h.ServeRegister) r.Post("/pull", h.ServePull) r.Post("/push", h.ServePush) r.Post("/snapshot", h.ServeSnapshot) r.Get("/tables", h.ServeTableMeta) r.Get("/schema", h.ServeSchema)})User ID Injection
Section titled “User ID Injection”Synchro handlers read the user ID from context via handler.UserIDFromContext(ctx). You must inject it.
Built-in header middleware (development)
Section titled “Built-in header middleware (development)”wrapped := handler.UserIDMiddleware("X-User-ID", mux)Custom auth middleware (production)
Section titled “Custom auth middleware (production)”func authMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { userID := extractUserIDFromJWT(r) // your auth logic ctx := handler.WithUserID(r.Context(), userID) next.ServeHTTP(w, r.WithContext(ctx)) })}Version Check Middleware
Section titled “Version Check Middleware”Optional middleware that rejects clients below a minimum version:
wrapped := handler.VersionCheckMiddleware("X-App-Version", "1.2.0", mux)Retry-After
Section titled “Retry-After”Synchro automatically returns 503 Service Unavailable with a Retry-After header when it detects transient errors (closed DB pool, connection timeout, network failure). The default retry interval is 5 seconds:
h := handler.New(engine, handler.WithDefaultRetryAfter(10)) // 10 secondsFor application-level backpressure (rate limiting, maintenance mode), use the middleware. The consuming app provides the policy:
Rate limiting (429)
Section titled “Rate limiting (429)”wrapped := handler.RetryAfterMiddleware(func(r *http.Request) (bool, int, int) { if rateLimiter.IsExceeded(r) { return true, 429, 60 } return false, 0, 0}, mux)Maintenance mode (503)
Section titled “Maintenance mode (503)”wrapped := handler.RetryAfterMiddleware(func(r *http.Request) (bool, int, int) { if maintenanceMode { return true, 503, 300 } return false, 0, 0}, mux)Both the HTTP Retry-After header and a retry_after field in the JSON body are set, so clients can use either.
WAL Consumer Setup
Section titled “WAL Consumer Setup”The WAL consumer runs as a long-lived goroutine alongside your server.
import "github.com/trainstar/synchro/wal"
consumer := wal.NewConsumer(wal.ConsumerConfig{ ConnString: "postgres://user:pass@host:5432/db?replication=database", SlotName: "synchro_slot", PublicationName: "synchro_pub", Registry: registry, Assigner: synchro.NewJoinResolverWithDB(registry, db), // implements BucketAssigner ChangelogDB: db, // *sql.DB Logger: logger, StandbyTimeout: 10 * time.Second, // default})
// Run in a goroutine — blocks until context is cancelledgo func() { if err := consumer.Start(ctx); err != nil && ctx.Err() == nil { log.Fatal("WAL consumer failed", "err", err) }}()PostgreSQL Prerequisites
Section titled “PostgreSQL Prerequisites”-- Create the publication for tables you want to syncCREATE PUBLICATION synchro_pub FOR TABLE tasks, comments, tags, categories;
-- Ensure wal_level = logical (requires restart)ALTER SYSTEM SET wal_level = 'logical';Migration Setup
Section titled “Migration Setup”Synchro provides migration SQL but does not run migrations itself. Integrate with your migration system.
import "github.com/trainstar/synchro/migrate"
// Get infrastructure table DDLstmts := migrate.Migrations()
// Get RLS policies from your registryrlsStmts := synchro.GenerateRLSPolicies(registry)
// Run through your migration systemfor _, stmt := range append(stmts, rlsStmts...) { _, err := db.Exec(stmt) if err != nil { log.Fatal(err) }}Infrastructure Tables Created
Section titled “Infrastructure Tables Created”| Table | Purpose |
|---|---|
sync_changelog | Append-only changelog with BIGSERIAL seq, indexed by (bucket_id, seq) |
sync_clients | Client registration, bucket subscriptions, checkpoint tracking |
sync_wal_position | WAL consumer LSN tracking for crash recovery |
sync_bucket_edges | Membership index for bucket delta assignment |
sync_rule_failures | Resolver failures for operational debugging and replay tooling |
sync_schema_manifest | Schema contract version/hash history |
RLS Policies Generated
Section titled “RLS Policies Generated”GenerateRLSPolicies(registry) produces:
PushPolicyDisabledtables — read-only behavior (no write policies generated).- Tables with
OwnerColumn—SELECT,INSERT,UPDATE,DELETEpolicies scoped toowner_col::text = current_setting('app.user_id', true). - Tables with
AllowGlobalRead=true—SELECTadditionally allowsowner_col IS NULL. - Child tables —
EXISTSsubquery through parent chain to verify ownership.