diff options
Diffstat (limited to 'vendor/github.com/uptrace/bun/db.go')
| -rw-r--r-- | vendor/github.com/uptrace/bun/db.go | 126 |
1 files changed, 8 insertions, 118 deletions
diff --git a/vendor/github.com/uptrace/bun/db.go b/vendor/github.com/uptrace/bun/db.go index 067996d1c..9ac99a586 100644 --- a/vendor/github.com/uptrace/bun/db.go +++ b/vendor/github.com/uptrace/bun/db.go @@ -2,14 +2,13 @@ package bun import ( "context" - "crypto/rand" + cryptorand "crypto/rand" "database/sql" "encoding/hex" "fmt" "reflect" "strings" "sync/atomic" - "time" "github.com/uptrace/bun/dialect/feature" "github.com/uptrace/bun/internal" @@ -41,6 +40,12 @@ func WithDiscardUnknownColumns() DBOption { } } +// ConnResolver enables routing queries to multiple databases. +type ConnResolver interface { + ResolveConn(query Query) IConn + Close() error +} + func WithConnResolver(resolver ConnResolver) DBOption { return func(db *DB) { db.resolver = resolver @@ -633,7 +638,7 @@ func (tx Tx) Begin() (Tx, error) { func (tx Tx) BeginTx(ctx context.Context, _ *sql.TxOptions) (Tx, error) { // mssql savepoint names are limited to 32 characters sp := make([]byte, 14) - _, err := rand.Read(sp) + _, err := cryptorand.Read(sp) if err != nil { return Tx{}, err } @@ -739,121 +744,6 @@ func (tx Tx) NewDropColumn() *DropColumnQuery { return NewDropColumnQuery(tx.db).Conn(tx) } -//------------------------------------------------------------------------------ - func (db *DB) makeQueryBytes() []byte { return internal.MakeQueryBytes() } - -//------------------------------------------------------------------------------ - -// ConnResolver enables routing queries to multiple databases. -type ConnResolver interface { - ResolveConn(query Query) IConn - Close() error -} - -// TODO: -// - make monitoring interval configurable -// - make ping timeout configutable -// - allow adding read/write replicas for multi-master replication -type ReadWriteConnResolver struct { - replicas []*sql.DB // read-only replicas - healthyReplicas atomic.Pointer[[]*sql.DB] - nextReplica atomic.Int64 - closed atomic.Bool -} - -func NewReadWriteConnResolver(opts ...ReadWriteConnResolverOption) *ReadWriteConnResolver { - r := new(ReadWriteConnResolver) - - for _, opt := range opts { - opt(r) - } - - if len(r.replicas) > 0 { - r.healthyReplicas.Store(&r.replicas) - go r.monitor() - } - - return r -} - -type ReadWriteConnResolverOption func(r *ReadWriteConnResolver) - -func WithReadOnlyReplica(dbs ...*sql.DB) ReadWriteConnResolverOption { - return func(r *ReadWriteConnResolver) { - r.replicas = append(r.replicas, dbs...) - } -} - -func (r *ReadWriteConnResolver) Close() error { - if r.closed.Swap(true) { - return nil - } - - var firstErr error - for _, db := range r.replicas { - if err := db.Close(); err != nil && firstErr == nil { - firstErr = err - } - } - return firstErr -} - -// healthyReplica returns a random healthy replica. -func (r *ReadWriteConnResolver) ResolveConn(query Query) IConn { - if len(r.replicas) == 0 || !isReadOnlyQuery(query) { - return nil - } - - replicas := r.loadHealthyReplicas() - if len(replicas) == 0 { - return nil - } - if len(replicas) == 1 { - return replicas[0] - } - i := r.nextReplica.Add(1) - return replicas[int(i)%len(replicas)] -} - -func isReadOnlyQuery(query Query) bool { - sel, ok := query.(*SelectQuery) - if !ok { - return false - } - for _, el := range sel.with { - if !isReadOnlyQuery(el.query) { - return false - } - } - return true -} - -func (r *ReadWriteConnResolver) loadHealthyReplicas() []*sql.DB { - if ptr := r.healthyReplicas.Load(); ptr != nil { - return *ptr - } - return nil -} - -func (r *ReadWriteConnResolver) monitor() { - const interval = 5 * time.Second - for !r.closed.Load() { - healthy := make([]*sql.DB, 0, len(r.replicas)) - - for _, replica := range r.replicas { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - err := replica.PingContext(ctx) - cancel() - - if err == nil { - healthy = append(healthy, replica) - } - } - - r.healthyReplicas.Store(&healthy) - time.Sleep(interval) - } -} |
