Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ jobs:
- name: Wait for services to be healthy
working-directory: ./tests
run: |
echo "Waiting for sqld to be healthy..."
for i in $(seq 1 20); do
if curl -sf http://localhost:8090/health >/dev/null 2>&1; then
echo "sqld is healthy!"
break
fi
if [ $i -eq 20 ]; then
echo "sqld failed to become healthy"
docker compose logs sqld
exit 1
fi
echo "sqld attempt $i/20 - waiting 3s..."
sleep 3
done

echo "Waiting for API to be healthy..."
for i in $(seq 1 40); do
if docker compose exec api curl -sf http://localhost:8000/health >/dev/null 2>&1; then
Expand Down
4 changes: 4 additions & 0 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ require (
github.com/stretchr/testify v1.11.1
github.com/swaggo/swag v1.16.6
github.com/thedevsaddam/govalidator v1.9.10
github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885
github.com/uptrace/uptrace-go v1.43.0
github.com/xuri/excelize/v2 v2.10.1
go.opentelemetry.io/otel v1.43.0
Expand Down Expand Up @@ -93,11 +94,13 @@ require (
github.com/PuerkitoBio/goquery v1.12.0 // indirect
github.com/andybalholm/brotli v1.2.1 // indirect
github.com/andybalholm/cascadia v1.3.3 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/clipperhouse/displaywidth v0.11.0 // indirect
github.com/clipperhouse/uax29/v2 v2.7.0 // indirect
github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 // indirect
github.com/coder/websocket v1.8.12 // indirect
github.com/envoyproxy/go-control-plane/envoy v1.37.0 // indirect
github.com/envoyproxy/protoc-gen-validate v1.3.3 // indirect
github.com/fatih/color v1.19.0 // indirect
Expand Down Expand Up @@ -183,6 +186,7 @@ require (
go.uber.org/zap v1.28.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.50.0 // indirect
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/mod v0.35.0 // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/oauth2 v0.36.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions api/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eT
github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/andybalholm/cascadia v1.3.3 h1:AG2YHrzJIm4BZ19iwJ/DAua6Btl3IwJX+VI4kktS1LM=
github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmgu1YHNnWw0GeA=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/avast/retry-go/v5 v5.0.0 h1:kf1Qc2UsTZ4qq8elDymqfbISvkyMuhgRxuJqX2NHP7k=
github.com/avast/retry-go/v5 v5.0.0/go.mod h1://d+usmKWio1agtZfS1H/ltTqwtIfBnRq9zEwjc3eH8=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
Expand All @@ -88,6 +90,8 @@ github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 h1:aBangftG7EVZoUb69Os
github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2/go.mod h1:qwXFYgsP6T7XnJtbKlf1HP8AjxZZyzxMmc+Lq5GjlU4=
github.com/cockroachdb/cockroach-go/v2 v2.4.3 h1:LJO3K3jC5WXvMePRQSJE1NsIGoFGcEx1LW83W6RAlhw=
github.com/cockroachdb/cockroach-go/v2 v2.4.3/go.mod h1:9U179XbCx4qFWtNhc7BiWLPfuyMVQ7qdAhfrwLz1vH0=
github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo=
github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down Expand Up @@ -320,6 +324,8 @@ github.com/thedevsaddam/govalidator v1.9.10 h1:m3dLRbSZ5Hts3VUWYe+vxLMG+FdyQuWOj
github.com/thedevsaddam/govalidator v1.9.10/go.mod h1:Ilx8u7cg5g3LXbSS943cx5kczyNuUn7LH/cK5MYuE90=
github.com/tiendc/go-deepcopy v1.7.2 h1:Ut2yYR7W9tWjTQitganoIue4UGxZwCcJy3orjrrIj44=
github.com/tiendc/go-deepcopy v1.7.2/go.mod h1:4bKjNC2r7boYOkD2IOuZpYjmlDdzjbpTRyCx+goBCJQ=
github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885 h1:YssVXwM/9nUAjGNmUWdgvb05JVcsaBrDn5yr+MaJTn0=
github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885/go.mod h1:08inkKyguB6CGGssc/JzhmQWwBgFQBgjlYFjxjRh7nU=
github.com/uptrace/uptrace-go v1.43.0 h1:5QuCdyFJdWUEXx6Fr6sYfezdgO6n6lnkOvUTLlyQO7U=
github.com/uptrace/uptrace-go v1.43.0/go.mod h1:ehDTIdtBSolg4Z0CCvg1C8yR6VX1YFDqBcg2KmsXWn0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
Expand Down Expand Up @@ -410,6 +416,8 @@ golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw=
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ=
golang.org/x/image v0.25.0 h1:Y6uW6rH1y5y/LK1J8BPWZtr6yZ7hrsy6hFrXjgsc2fQ=
golang.org/x/image v0.25.0/go.mod h1:tCAmOEGthTtkalusGp1g3xa2gke8J6c2N565dTyl9Rs=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
Expand Down
101 changes: 89 additions & 12 deletions api/pkg/di/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package di
import (
"context"
"crypto/tls"
"database/sql"
"fmt"
"net/http"
"os"
Expand Down Expand Up @@ -82,6 +83,7 @@ type Container struct {
projectID string
db *gorm.DB
dedicatedDB *gorm.DB
tursoDB *sql.DB
version string
app *fiber.App
eventDispatcher *services.EventDispatcher
Expand Down Expand Up @@ -295,6 +297,43 @@ func (container *Container) DedicatedDB() (db *gorm.DB) {
return container.dedicatedDB
}

// TursoDB creates a *sql.DB connection to a Turso/libSQL database
func (container *Container) TursoDB() *sql.DB {
if container.tursoDB != nil {
return container.tursoDB
}

container.logger.Debug("creating Turso *sql.DB connection")

db, err := repositories.NewTursoDB(
os.Getenv("TURSO_DATABASE_URL"),
os.Getenv("TURSO_AUTH_TOKEN"),
)
if err != nil {
container.logger.Fatal(err)
}

container.tursoDB = db
return container.tursoDB
}

// HedgingFailureCounter creates an OTel counter for hedging secondary write failures
func (container *Container) HedgingFailureCounter() otelMetric.Int64Counter {
meter := otel.GetMeterProvider().Meter(
container.projectID,
otelMetric.WithInstrumentationVersion(otel.Version()),
)
counter, err := meter.Int64Counter(
"hedging.secondary.write.failures",
otelMetric.WithUnit("1"),
otelMetric.WithDescription("Number of failed secondary writes in hedging repositories"),
)
if err != nil {
container.logger.Fatal(stacktrace.Propagate(err, "cannot create hedging failure counter"))
}
return counter
}

// DBWithoutMigration creates an instance of gorm.DB if it has not been created already
func (container *Container) DBWithoutMigration() (db *gorm.DB) {
if container.db != nil {
Expand Down Expand Up @@ -889,12 +928,31 @@ func (container *Container) MessageThreadRepository() (repository repositories.M

// HeartbeatMonitorRepository creates a new instance of repositories.HeartbeatMonitorRepository
func (container *Container) HeartbeatMonitorRepository() (repository repositories.HeartbeatMonitorRepository) {
container.logger.Debug("creating GORM repositories.HeartbeatMonitorRepository")
return repositories.NewGormHeartbeatMonitorRepository(
container.Logger(),
container.Tracer(),
container.DedicatedDB(),
)
switch os.Getenv("HEARTBEAT_DB_BACKEND") {
case "turso":
container.logger.Debug("creating libSQL repositories.HeartbeatMonitorRepository")
return repositories.NewLibsqlHeartbeatMonitorRepository(
container.Logger(),
container.Tracer(),
container.TursoDB(),
)
case "hedging":
container.logger.Debug("creating hedging repositories.HeartbeatMonitorRepository")
return repositories.NewHedgingHeartbeatMonitorRepository(
container.Logger(),
container.Tracer(),
repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()),
repositories.NewLibsqlHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.TursoDB()),
container.HedgingFailureCounter(),
)
default:
container.logger.Debug("creating GORM repositories.HeartbeatMonitorRepository")
return repositories.NewGormHeartbeatMonitorRepository(
container.Logger(),
container.Tracer(),
container.DedicatedDB(),
)
}
}

// HeartbeatService creates a new instance of services.HeartbeatService
Expand Down Expand Up @@ -1708,12 +1766,31 @@ func (container *Container) RegisterSwaggerRoutes() {

// HeartbeatRepository registers a new instance of repositories.HeartbeatRepository
func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository {
container.logger.Debug("creating GORM repositories.HeartbeatRepository")
return repositories.NewGormHeartbeatRepository(
container.Logger(),
container.Tracer(),
container.DedicatedDB(),
)
switch os.Getenv("HEARTBEAT_DB_BACKEND") {
case "turso":
container.logger.Debug("creating libSQL repositories.HeartbeatRepository")
return repositories.NewLibsqlHeartbeatRepository(
container.Logger(),
container.Tracer(),
container.TursoDB(),
)
case "hedging":
container.logger.Debug("creating hedging repositories.HeartbeatRepository")
return repositories.NewHedgingHeartbeatRepository(
container.Logger(),
container.Tracer(),
repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()),
repositories.NewLibsqlHeartbeatRepository(container.Logger(), container.Tracer(), container.TursoDB()),
container.HedgingFailureCounter(),
)
default:
container.logger.Debug("creating GORM repositories.HeartbeatRepository")
return repositories.NewGormHeartbeatRepository(
container.Logger(),
container.Tracer(),
container.DedicatedDB(),
)
}
}

// UserRepository registers a new instance of repositories.UserRepository
Expand Down
128 changes: 128 additions & 0 deletions api/pkg/repositories/hedging_heartbeat_monitor_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package repositories

import (
"context"
"fmt"

"github.com/google/uuid"
otelMetric "go.opentelemetry.io/otel/metric"

"github.com/NdoleStudio/httpsms/pkg/entities"
"github.com/NdoleStudio/httpsms/pkg/telemetry"
"github.com/palantir/stacktrace"
)

// hedgingHeartbeatMonitorRepository writes to both primary and secondary repositories.
// Reads only hit primary. Secondary writes are fail-open.
type hedgingHeartbeatMonitorRepository struct {
logger telemetry.Logger
tracer telemetry.Tracer
primary HeartbeatMonitorRepository
secondary HeartbeatMonitorRepository
failureCounter otelMetric.Int64Counter
}

// NewHedgingHeartbeatMonitorRepository creates a hedging HeartbeatMonitorRepository
func NewHedgingHeartbeatMonitorRepository(
logger telemetry.Logger,
tracer telemetry.Tracer,
primary HeartbeatMonitorRepository,
secondary HeartbeatMonitorRepository,
failureCounter otelMetric.Int64Counter,
) HeartbeatMonitorRepository {
return &hedgingHeartbeatMonitorRepository{
logger: logger.WithService(fmt.Sprintf("%T", &hedgingHeartbeatMonitorRepository{})),
tracer: tracer,
primary: primary,
secondary: secondary,
failureCounter: failureCounter,
}
}

func (repository *hedgingHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error {
ctx, span := repository.tracer.Start(ctx)
defer span.End()

if err := repository.primary.Store(ctx, monitor); err != nil {
return err
}

if err := repository.secondary.Store(ctx, monitor); err != nil {
repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary write failed for monitor [%s]", monitor.ID)))
repository.failureCounter.Add(ctx, 1)
}

return nil
}

func (repository *hedgingHeartbeatMonitorRepository) Load(ctx context.Context, userID entities.UserID, phoneNumber string) (*entities.HeartbeatMonitor, error) {
return repository.primary.Load(ctx, userID, phoneNumber)
}

func (repository *hedgingHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) {
return repository.primary.Exists(ctx, userID, monitorID)
}

func (repository *hedgingHeartbeatMonitorRepository) UpdateQueueID(ctx context.Context, monitorID uuid.UUID, queueID string) error {
ctx, span := repository.tracer.Start(ctx)
defer span.End()

if err := repository.primary.UpdateQueueID(ctx, monitorID, queueID); err != nil {
return err
}

if err := repository.secondary.UpdateQueueID(ctx, monitorID, queueID); err != nil {
repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary UpdateQueueID failed for monitor [%s]", monitorID)))
repository.failureCounter.Add(ctx, 1)
}

return nil
}

func (repository *hedgingHeartbeatMonitorRepository) Delete(ctx context.Context, userID entities.UserID, phoneNumber string) error {
ctx, span := repository.tracer.Start(ctx)
defer span.End()

if err := repository.primary.Delete(ctx, userID, phoneNumber); err != nil {
return err
}

if err := repository.secondary.Delete(ctx, userID, phoneNumber); err != nil {
repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary delete failed for monitor with owner [%s]", phoneNumber)))
repository.failureCounter.Add(ctx, 1)
}

return nil
}

func (repository *hedgingHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, online bool) error {
ctx, span := repository.tracer.Start(ctx)
defer span.End()

if err := repository.primary.UpdatePhoneOnline(ctx, userID, monitorID, online); err != nil {
return err
}

if err := repository.secondary.UpdatePhoneOnline(ctx, userID, monitorID, online); err != nil {
repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary UpdatePhoneOnline failed for monitor [%s]", monitorID)))
repository.failureCounter.Add(ctx, 1)
}

return nil
}

func (repository *hedgingHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
ctx, span := repository.tracer.Start(ctx)
defer span.End()

if err := repository.primary.DeleteAllForUser(ctx, userID); err != nil {
return err
}

if err := repository.secondary.DeleteAllForUser(ctx, userID); err != nil {
repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary delete all failed for user [%s]", userID)))
repository.failureCounter.Add(ctx, 1)
}

return nil
}
Loading
Loading