summaryrefslogtreecommitdiff
path: root/vendor/github.com/uptrace/bun/db.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/uptrace/bun/db.go')
-rw-r--r--vendor/github.com/uptrace/bun/db.go126
1 files changed, 8 insertions, 118 deletions
diff --git a/vendor/github.com/uptrace/bun/db.go b/vendor/github.com/uptrace/bun/db.go
index 067996d1c..9ac99a586 100644
--- a/vendor/github.com/uptrace/bun/db.go
+++ b/vendor/github.com/uptrace/bun/db.go
@@ -2,14 +2,13 @@ package bun
import (
"context"
- "crypto/rand"
+ cryptorand "crypto/rand"
"database/sql"
"encoding/hex"
"fmt"
"reflect"
"strings"
"sync/atomic"
- "time"
"github.com/uptrace/bun/dialect/feature"
"github.com/uptrace/bun/internal"
@@ -41,6 +40,12 @@ func WithDiscardUnknownColumns() DBOption {
}
}
+// ConnResolver enables routing queries to multiple databases.
+type ConnResolver interface {
+ ResolveConn(query Query) IConn
+ Close() error
+}
+
func WithConnResolver(resolver ConnResolver) DBOption {
return func(db *DB) {
db.resolver = resolver
@@ -633,7 +638,7 @@ func (tx Tx) Begin() (Tx, error) {
func (tx Tx) BeginTx(ctx context.Context, _ *sql.TxOptions) (Tx, error) {
// mssql savepoint names are limited to 32 characters
sp := make([]byte, 14)
- _, err := rand.Read(sp)
+ _, err := cryptorand.Read(sp)
if err != nil {
return Tx{}, err
}
@@ -739,121 +744,6 @@ func (tx Tx) NewDropColumn() *DropColumnQuery {
return NewDropColumnQuery(tx.db).Conn(tx)
}
-//------------------------------------------------------------------------------
-
func (db *DB) makeQueryBytes() []byte {
return internal.MakeQueryBytes()
}
-
-//------------------------------------------------------------------------------
-
-// ConnResolver enables routing queries to multiple databases.
-type ConnResolver interface {
- ResolveConn(query Query) IConn
- Close() error
-}
-
-// TODO:
-// - make monitoring interval configurable
-// - make ping timeout configutable
-// - allow adding read/write replicas for multi-master replication
-type ReadWriteConnResolver struct {
- replicas []*sql.DB // read-only replicas
- healthyReplicas atomic.Pointer[[]*sql.DB]
- nextReplica atomic.Int64
- closed atomic.Bool
-}
-
-func NewReadWriteConnResolver(opts ...ReadWriteConnResolverOption) *ReadWriteConnResolver {
- r := new(ReadWriteConnResolver)
-
- for _, opt := range opts {
- opt(r)
- }
-
- if len(r.replicas) > 0 {
- r.healthyReplicas.Store(&r.replicas)
- go r.monitor()
- }
-
- return r
-}
-
-type ReadWriteConnResolverOption func(r *ReadWriteConnResolver)
-
-func WithReadOnlyReplica(dbs ...*sql.DB) ReadWriteConnResolverOption {
- return func(r *ReadWriteConnResolver) {
- r.replicas = append(r.replicas, dbs...)
- }
-}
-
-func (r *ReadWriteConnResolver) Close() error {
- if r.closed.Swap(true) {
- return nil
- }
-
- var firstErr error
- for _, db := range r.replicas {
- if err := db.Close(); err != nil && firstErr == nil {
- firstErr = err
- }
- }
- return firstErr
-}
-
-// healthyReplica returns a random healthy replica.
-func (r *ReadWriteConnResolver) ResolveConn(query Query) IConn {
- if len(r.replicas) == 0 || !isReadOnlyQuery(query) {
- return nil
- }
-
- replicas := r.loadHealthyReplicas()
- if len(replicas) == 0 {
- return nil
- }
- if len(replicas) == 1 {
- return replicas[0]
- }
- i := r.nextReplica.Add(1)
- return replicas[int(i)%len(replicas)]
-}
-
-func isReadOnlyQuery(query Query) bool {
- sel, ok := query.(*SelectQuery)
- if !ok {
- return false
- }
- for _, el := range sel.with {
- if !isReadOnlyQuery(el.query) {
- return false
- }
- }
- return true
-}
-
-func (r *ReadWriteConnResolver) loadHealthyReplicas() []*sql.DB {
- if ptr := r.healthyReplicas.Load(); ptr != nil {
- return *ptr
- }
- return nil
-}
-
-func (r *ReadWriteConnResolver) monitor() {
- const interval = 5 * time.Second
- for !r.closed.Load() {
- healthy := make([]*sql.DB, 0, len(r.replicas))
-
- for _, replica := range r.replicas {
- ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
- err := replica.PingContext(ctx)
- cancel()
-
- if err == nil {
- healthy = append(healthy, replica)
- }
- }
-
- r.healthyReplicas.Store(&healthy)
- time.Sleep(interval)
- }
-}