feat(api): add Turso/libSQL backend for heartbeat repositories#886
feat(api): add Turso/libSQL backend for heartbeat repositories#886AchoArnold wants to merge 8 commits into
Conversation
Co-authored-by: Copilot <[email protected]>
Add alternative HeartbeatRepository and HeartbeatMonitorRepository implementations using libSQL (Turso) via database/sql. Switchable via HEARTBEAT_DB_BACKEND=turso env var. Requires TURSO_DATABASE_URL and TURSO_AUTH_TOKEN when enabled. Co-authored-by: Copilot <[email protected]>
Also update design spec to reference correct package (libsql-client-go, not go-libsql). Co-authored-by: Copilot <[email protected]>
Not up to standards ⛔🔴 Issues
|
| Category | Results |
|---|---|
| BestPractice | 1 minor |
| Comprehensibility | 1 minor |
| Security | 8 critical |
| CodeStyle | 48 minor |
🟢 Metrics 88 complexity · 40 duplication
Metric Results Complexity 88 Duplication 40
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
Greptile SummaryThis PR adds Turso/libSQL as an optional backend for
Confidence Score: 3/5Safe to enable for read-heavy workloads only after fixing the silent partial-result bug in Index; the PostgreSQL path is completely unaffected. The Index method can return a truncated heartbeat list with no error when the remote Turso connection drops mid-iteration, because rows.Err() is never consulted. Callers would silently see fewer results than actually exist. Additionally, malformed UUIDs in the database would produce zero-value IDs in returned entities instead of surfaced errors, making corruption invisible. Both issues are confined to the new libSQL code path. api/pkg/repositories/libsql_heartbeat_repository.go and api/pkg/repositories/libsql_heartbeat_monitor_repository.go need the rows.Err() check and uuid.Parse error handling fixed before the Turso backend is enabled in production. Important Files Changed
Sequence DiagramsequenceDiagram
participant App
participant Container
participant LibsqlRepo as libsqlHeartbeatRepository
participant TursoDB as Turso (libSQL)
participant GormRepo as gormHeartbeatRepository
participant PostgreSQL
App->>Container: HeartbeatRepository()
alt "HEARTBEAT_DB_BACKEND == turso"
Container->>Container: TursoDB() (lazy init)
Container->>TursoDB: sql.Open + Ping + CREATE TABLE IF NOT EXISTS
Container-->>App: libsqlHeartbeatRepository
App->>LibsqlRepo: Store / Index / Last / DeleteAllForUser
LibsqlRepo->>TursoDB: raw SQL over HTTPS
TursoDB-->>LibsqlRepo: result rows
LibsqlRepo-->>App: entities
else default
Container-->>App: gormHeartbeatRepository
App->>GormRepo: Store / Index / Last / DeleteAllForUser
GormRepo->>PostgreSQL: GORM query
PostgreSQL-->>GormRepo: result
GormRepo-->>App: entities
end
|
| heartbeats := make([]entities.Heartbeat, 0) | ||
| for rows.Next() { | ||
| heartbeat, scanErr := scanHeartbeat(rows) | ||
| if scanErr != nil { | ||
| msg := fmt.Sprintf("cannot scan heartbeat row for owner [%s]", owner) | ||
| return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(scanErr, msg)) | ||
| } | ||
| heartbeats = append(heartbeats, *heartbeat) | ||
| } | ||
|
|
||
| return &heartbeats, nil |
There was a problem hiding this comment.
rows.Err() is never checked after the loop. When the libSQL remote driver encounters a network error or timeout mid-iteration, rows.Next() returns false and the loop exits normally — but the error is silently lost and the caller receives a partial slice with no indication of failure.
| heartbeats := make([]entities.Heartbeat, 0) | |
| for rows.Next() { | |
| heartbeat, scanErr := scanHeartbeat(rows) | |
| if scanErr != nil { | |
| msg := fmt.Sprintf("cannot scan heartbeat row for owner [%s]", owner) | |
| return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(scanErr, msg)) | |
| } | |
| heartbeats = append(heartbeats, *heartbeat) | |
| } | |
| return &heartbeats, nil | |
| heartbeats := make([]entities.Heartbeat, 0) | |
| for rows.Next() { | |
| heartbeat, scanErr := scanHeartbeat(rows) | |
| if scanErr != nil { | |
| msg := fmt.Sprintf("cannot scan heartbeat row for owner [%s]", owner) | |
| return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(scanErr, msg)) | |
| } | |
| heartbeats = append(heartbeats, *heartbeat) | |
| } | |
| if rowsErr := rows.Err(); rowsErr != nil { | |
| msg := fmt.Sprintf("error iterating heartbeat rows for owner [%s]", owner) | |
| return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(rowsErr, msg)) | |
| } | |
| return &heartbeats, nil |
| func scanHeartbeat(rows *sql.Rows) (*entities.Heartbeat, error) { | ||
| heartbeat := new(entities.Heartbeat) | ||
| var id string | ||
| var charging int | ||
| var userID string | ||
| err := rows.Scan(&id, &heartbeat.Owner, &heartbeat.Version, &charging, &userID, &heartbeat.Timestamp) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| heartbeat.ID, _ = uuid.Parse(id) | ||
| heartbeat.Charging = charging != 0 | ||
| heartbeat.UserID = entities.UserID(userID) | ||
| return heartbeat, nil | ||
| } | ||
|
|
||
| func scanHeartbeatRow(row *sql.Row) (*entities.Heartbeat, error) { | ||
| heartbeat := new(entities.Heartbeat) | ||
| var id string | ||
| var charging int | ||
| var userID string | ||
| err := row.Scan(&id, &heartbeat.Owner, &heartbeat.Version, &charging, &userID, &heartbeat.Timestamp) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| heartbeat.ID, _ = uuid.Parse(id) | ||
| heartbeat.Charging = charging != 0 | ||
| heartbeat.UserID = entities.UserID(userID) | ||
| return heartbeat, nil | ||
| } |
| func scanHeartbeatMonitorRow(row *sql.Row) (*entities.HeartbeatMonitor, error) { | ||
| monitor := new(entities.HeartbeatMonitor) | ||
| var id, phoneID, userID string | ||
| var phoneOnline int | ||
| err := row.Scan(&id, &phoneID, &userID, &monitor.QueueID, &monitor.Owner, &phoneOnline, &monitor.CreatedAt, &monitor.UpdatedAt) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| monitor.ID, _ = uuid.Parse(id) | ||
| monitor.PhoneID, _ = uuid.Parse(phoneID) | ||
| monitor.UserID = entities.UserID(userID) | ||
| monitor.PhoneOnline = phoneOnline != 0 | ||
| return monitor, nil | ||
| } |
Co-authored-by: Copilot <[email protected]>
Add composite repositories that write to GORM (primary) and Turso (secondary) with fail-open semantics. Secondary failures are logged and counted via OTel metric. Activated via HEARTBEAT_DB_BACKEND=hedging. Co-authored-by: Copilot <[email protected]>
Add sqld (libSQL server) to test docker-compose. Integration test stores a heartbeat via the hedging repository and reads it back from both PostgreSQL (primary) and Turso/libSQL (secondary) to verify dual-write. Gated by TEST_DATABASE_URL and TEST_TURSO_DATABASE_URL environment variables. Co-authored-by: Copilot <[email protected]>
Wait for sqld health before running tests. Set TEST_DATABASE_URL and TEST_TURSO_DATABASE_URL env vars pointing to docker compose services. Co-authored-by: Copilot <[email protected]>
Store a heartbeat via POST /v1/heartbeats and read it back via the Index endpoint. The API is configured with HEARTBEAT_DB_BACKEND=hedging so it dual-writes to both PostgreSQL and Turso/sqld. The test only interacts with the HTTP API, no implementation details exposed. - Add sqld dependency to API service in docker-compose - Add HEARTBEAT_DB_BACKEND, TURSO_DATABASE_URL to .env.test - Remove repo-level integration test in favor of black-box test - Keep sqld health wait in CI workflow Co-authored-by: Copilot <[email protected]>
Summary
Add alternative HeartbeatRepository and HeartbeatMonitorRepository implementations using libSQL (Turso) via \database/sql, switchable via environment variable.
Changes
Configuration
When \HEARTBEAT_DB_BACKEND\ is unset or any value other than \ urso, the existing PostgreSQL/GORM path remains unchanged.
Design
See \docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md\ for the full spec.