some small tweaks
This commit is contained in:
parent
2fe7258d9d
commit
e92b36e765
|
@ -131,12 +131,12 @@ func NewBunDBService(ctx context.Context, state *state.State) (db.DB, error) {
|
||||||
|
|
||||||
switch t {
|
switch t {
|
||||||
case "postgres":
|
case "postgres":
|
||||||
db, err = pgConn(ctx, state)
|
db, err = pgConn(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
case "sqlite":
|
case "sqlite":
|
||||||
db, err = sqliteConn(ctx, state)
|
db, err = sqliteConn(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -293,7 +293,7 @@ func NewBunDBService(ctx context.Context, state *state.State) (db.DB, error) {
|
||||||
return ps, nil
|
return ps, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func pgConn(ctx context.Context, state *state.State) (*bun.DB, error) {
|
func pgConn(ctx context.Context) (*bun.DB, error) {
|
||||||
opts, err := deriveBunDBPGOptions() //nolint:contextcheck
|
opts, err := deriveBunDBPGOptions() //nolint:contextcheck
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not create bundb postgres options: %w", err)
|
return nil, fmt.Errorf("could not create bundb postgres options: %w", err)
|
||||||
|
@ -324,7 +324,7 @@ func pgConn(ctx context.Context, state *state.State) (*bun.DB, error) {
|
||||||
return db, nil
|
return db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func sqliteConn(ctx context.Context, state *state.State) (*bun.DB, error) {
|
func sqliteConn(ctx context.Context) (*bun.DB, error) {
|
||||||
// validate db address has actually been set
|
// validate db address has actually been set
|
||||||
address := config.GetDbAddress()
|
address := config.GetDbAddress()
|
||||||
if address == "" {
|
if address == "" {
|
||||||
|
@ -352,7 +352,7 @@ func sqliteConn(ctx context.Context, state *state.State) (*bun.DB, error) {
|
||||||
|
|
||||||
// ping to check the db is there and listening
|
// ping to check the db is there and listening
|
||||||
if err := db.PingContext(ctx); err != nil {
|
if err := db.PingContext(ctx); err != nil {
|
||||||
err = processSQLiteError(err) // this adds error code information
|
err = processSQLiteError(err) // adds error code
|
||||||
return nil, fmt.Errorf("sqlite ping: %w", err)
|
return nil, fmt.Errorf("sqlite ping: %w", err)
|
||||||
}
|
}
|
||||||
log.Infof(ctx, "connected to SQLITE database with address %s", address)
|
log.Infof(ctx, "connected to SQLITE database with address %s", address)
|
||||||
|
@ -515,7 +515,7 @@ func buildSQLiteAddress(addr string) string {
|
||||||
|
|
||||||
// use immediate transaction lock mode to fail quickly if tx can't lock
|
// use immediate transaction lock mode to fail quickly if tx can't lock
|
||||||
// see https://pkg.go.dev/modernc.org/sqlite#Driver.Open
|
// see https://pkg.go.dev/modernc.org/sqlite#Driver.Open
|
||||||
prefs.Add("_txlock", "immediate")
|
// prefs.Add("_txlock", "immediate")
|
||||||
|
|
||||||
if addr == ":memory:" {
|
if addr == ":memory:" {
|
||||||
log.Warn(nil, "using sqlite in-memory mode; all data will be deleted when gts shuts down; this mode should only be used for debugging or running tests")
|
log.Warn(nil, "using sqlite in-memory mode; all data will be deleted when gts shuts down; this mode should only be used for debugging or running tests")
|
||||||
|
@ -526,14 +526,8 @@ func buildSQLiteAddress(addr string) string {
|
||||||
|
|
||||||
// in-mem-specific preferences
|
// in-mem-specific preferences
|
||||||
// (shared cache so that tests don't fail)
|
// (shared cache so that tests don't fail)
|
||||||
prefs.Add("mode", "memory")
|
// prefs.Add("mode", "memory")
|
||||||
prefs.Add("cache", "shared")
|
// prefs.Add("cache", "shared")
|
||||||
}
|
|
||||||
|
|
||||||
if dur := config.GetDbSqliteBusyTimeout(); dur > 0 {
|
|
||||||
// Set the user provided SQLite busy timeout
|
|
||||||
// NOTE: MUST BE SET BEFORE THE JOURNAL MODE.
|
|
||||||
prefs.Add("_pragma", fmt.Sprintf("busy_timeout(%d)", dur.Milliseconds()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if mode := config.GetDbSqliteJournalMode(); mode != "" {
|
if mode := config.GetDbSqliteJournalMode(); mode != "" {
|
||||||
|
|
|
@ -42,18 +42,8 @@ var (
|
||||||
// global SQLite3 driver instance.
|
// global SQLite3 driver instance.
|
||||||
sqliteDriver = &sqlite.Driver{
|
sqliteDriver = &sqlite.Driver{
|
||||||
Init: func(c *sqlite3.Conn) error {
|
Init: func(c *sqlite3.Conn) error {
|
||||||
return c.BusyHandler(func(ctx context.Context, i int) (retry bool) {
|
// unset an busy handler.
|
||||||
backoff := 2 * time.Millisecond * (1 << (2*i + 1))
|
return c.BusyHandler(nil)
|
||||||
if backoff > 5*time.Minute {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return false
|
|
||||||
case <-time.After(backoff):
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
})
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,9 +258,6 @@ func (c *SQLiteConn) QueryContext(ctx context.Context, query string, args []driv
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if rows != nil {
|
|
||||||
_ = rows.Close()
|
|
||||||
}
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &SQLiteTmpStmtRows{
|
return &SQLiteTmpStmtRows{
|
||||||
|
@ -286,7 +273,7 @@ func (c *SQLiteConn) Close() (err error) {
|
||||||
// see: https://www.sqlite.org/pragma.html#pragma_optimize
|
// see: https://www.sqlite.org/pragma.html#pragma_optimize
|
||||||
const onClose = "PRAGMA analysis_limit=1000; PRAGMA optimize;"
|
const onClose = "PRAGMA analysis_limit=1000; PRAGMA optimize;"
|
||||||
if r, ok := c.ConnIface.(sqlite3.DriverConn); ok {
|
if r, ok := c.ConnIface.(sqlite3.DriverConn); ok {
|
||||||
_ = r.Raw().Exec(onClose) // perform ASAP
|
_ = r.Raw().Exec(onClose)
|
||||||
_ = r.Raw().Close()
|
_ = r.Raw().Close()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
@ -295,6 +282,7 @@ func (c *SQLiteConn) Close() (err error) {
|
||||||
type SQLiteTx struct{ driver.Tx }
|
type SQLiteTx struct{ driver.Tx }
|
||||||
|
|
||||||
func (tx *SQLiteTx) Commit() (err error) {
|
func (tx *SQLiteTx) Commit() (err error) {
|
||||||
|
// use background ctx as this commit MUST happen.
|
||||||
return retryOnBusy(context.Background(), func() error {
|
return retryOnBusy(context.Background(), func() error {
|
||||||
err = tx.Tx.Commit()
|
err = tx.Tx.Commit()
|
||||||
err = processSQLiteError(err)
|
err = processSQLiteError(err)
|
||||||
|
@ -303,6 +291,7 @@ func (tx *SQLiteTx) Commit() (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx *SQLiteTx) Rollback() (err error) {
|
func (tx *SQLiteTx) Rollback() (err error) {
|
||||||
|
// use background ctx as this rollback MUST happen.
|
||||||
return retryOnBusy(context.Background(), func() error {
|
return retryOnBusy(context.Background(), func() error {
|
||||||
err = tx.Tx.Rollback()
|
err = tx.Tx.Rollback()
|
||||||
err = processSQLiteError(err)
|
err = processSQLiteError(err)
|
||||||
|
@ -345,6 +334,7 @@ func (stmt *SQLiteStmt) QueryContext(ctx context.Context, args []driver.NamedVal
|
||||||
}
|
}
|
||||||
|
|
||||||
func (stmt *SQLiteStmt) Close() (err error) {
|
func (stmt *SQLiteStmt) Close() (err error) {
|
||||||
|
// use background ctx as this stmt MUST be closed.
|
||||||
err = retryOnBusy(context.Background(), func() error {
|
err = retryOnBusy(context.Background(), func() error {
|
||||||
err = stmt.StmtIface.Close()
|
err = stmt.StmtIface.Close()
|
||||||
err = processSQLiteError(err)
|
err = processSQLiteError(err)
|
||||||
|
@ -368,6 +358,7 @@ func (r *SQLiteRows) Next(dest []driver.Value) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SQLiteRows) Close() (err error) {
|
func (r *SQLiteRows) Close() (err error) {
|
||||||
|
// use background ctx as these rows MUST be closed.
|
||||||
err = retryOnBusy(context.Background(), func() error {
|
err = retryOnBusy(context.Background(), func() error {
|
||||||
err = r.RowsIface.Close()
|
err = r.RowsIface.Close()
|
||||||
err = processSQLiteError(err)
|
err = processSQLiteError(err)
|
||||||
|
|
|
@ -32,11 +32,6 @@ var errBusy = errors.New("busy")
|
||||||
|
|
||||||
// processPostgresError processes an error, replacing any postgres specific errors with our own error type
|
// processPostgresError processes an error, replacing any postgres specific errors with our own error type
|
||||||
func processPostgresError(err error) error {
|
func processPostgresError(err error) error {
|
||||||
// Catch nil errs.
|
|
||||||
if err == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempt to cast as postgres
|
// Attempt to cast as postgres
|
||||||
pgErr, ok := err.(*pgconn.PgError)
|
pgErr, ok := err.(*pgconn.PgError)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -50,7 +45,9 @@ func processPostgresError(err error) error {
|
||||||
return db.ErrAlreadyExists
|
return db.ErrAlreadyExists
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
// Wrap the returned error with the code and
|
||||||
|
// extended code for easier debugging later.
|
||||||
|
return fmt.Errorf("%w (code=%s)", err, pgErr.Code)
|
||||||
}
|
}
|
||||||
|
|
||||||
func processSQLiteError(err error) error {
|
func processSQLiteError(err error) error {
|
||||||
|
|
Loading…
Reference in New Issue