diff --git a/go.mod b/go.mod index 699d99d56..2d687dd35 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/feeds v1.2.0 github.com/gorilla/websocket v1.5.3 - github.com/jackc/pgx/v5 v5.7.2 + github.com/jackc/pgx/v5 v5.7.3 github.com/k3a/html2text v1.2.1 github.com/microcosm-cc/bluemonday v1.0.27 github.com/miekg/dns v1.1.63 diff --git a/go.sum b/go.sum index 3ca4813b2..ab230d9d9 100644 --- a/go.sum +++ b/go.sum @@ -258,8 +258,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= -github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= +github.com/jackc/pgx/v5 v5.7.3 h1:PO1wNKj/bTAwxSJnO1Z4Ai8j4magtqg2SLNjEDzcXQo= +github.com/jackc/pgx/v5 v5.7.3/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= diff --git a/vendor/github.com/jackc/pgx/v5/CHANGELOG.md b/vendor/github.com/jackc/pgx/v5/CHANGELOG.md index 6470088b2..75578db0a 100644 --- a/vendor/github.com/jackc/pgx/v5/CHANGELOG.md +++ b/vendor/github.com/jackc/pgx/v5/CHANGELOG.md @@ -1,3 +1,16 @@ +# 5.7.3 (March 21, 2025) + +* Expose EmptyAcquireWaitTime in pgxpool.Stat (vamshiaruru32) +* Improve SQL sanitizer performance (ninedraft) +* Fix Scan confusion with json(b), sql.Scanner, and automatic dereferencing (moukoublen, felix-roehrich) +* Fix Values() for xml type always returning nil instead of []byte +* Add ability to send Flush message in pipeline mode (zenkovev) +* Fix pgtype.Timestamp's JSON behavior to match PostgreSQL (pconstantinou) +* Better error messages when scanning structs (logicbomb) +* Fix handling of error on batch write (bonnefoa) +* Match libpq's connection fallback behavior more closely (felix-roehrich) +* Add MinIdleConns to pgxpool (djahandarie) + # 5.7.2 (December 21, 2024) * Fix prepared statement already exists on batch prepare failure @@ -9,6 +22,7 @@ * Implement pgtype.UUID.String() (Konstantin Grachev) * Switch from ExecParams to Exec in ValidateConnectTargetSessionAttrs functions (Alexander Rumyantsev) * Update golang.org/x/crypto +* Fix json(b) columns prefer sql.Scanner interface like database/sql (Ludovico Russo) # 5.7.1 (September 10, 2024) diff --git a/vendor/github.com/jackc/pgx/v5/README.md b/vendor/github.com/jackc/pgx/v5/README.md index bbeb1336c..9da49d866 100644 --- a/vendor/github.com/jackc/pgx/v5/README.md +++ b/vendor/github.com/jackc/pgx/v5/README.md @@ -92,7 +92,7 @@ See the presentation at Golang Estonia, [PGX Top to Bottom](https://www.youtube. ## Supported Go and PostgreSQL Versions -pgx supports the same versions of Go and PostgreSQL that are supported by their respective teams. For [Go](https://golang.org/doc/devel/release.html#policy) that is the two most recent major releases and for [PostgreSQL](https://www.postgresql.org/support/versioning/) the major releases in the last 5 years. This means pgx supports Go 1.21 and higher and PostgreSQL 12 and higher. pgx also is tested against the latest version of [CockroachDB](https://www.cockroachlabs.com/product/). +pgx supports the same versions of Go and PostgreSQL that are supported by their respective teams. For [Go](https://golang.org/doc/devel/release.html#policy) that is the two most recent major releases and for [PostgreSQL](https://www.postgresql.org/support/versioning/) the major releases in the last 5 years. This means pgx supports Go 1.22 and higher and PostgreSQL 13 and higher. pgx also is tested against the latest version of [CockroachDB](https://www.cockroachlabs.com/product/). ## Version Policy @@ -172,3 +172,15 @@ Supports, structs, maps, slices and custom mapping functions. ### [github.com/z0ne-dev/mgx](https://github.com/z0ne-dev/mgx) Code first migration library for native pgx (no database/sql abstraction). + +### [github.com/amirsalarsafaei/sqlc-pgx-monitoring](https://github.com/amirsalarsafaei/sqlc-pgx-monitoring) + +A database monitoring/metrics library for pgx and sqlc. Trace, log and monitor your sqlc query performance using OpenTelemetry. + +### [https://github.com/nikolayk812/pgx-outbox](https://github.com/nikolayk812/pgx-outbox) + +Simple Golang implementation for transactional outbox pattern for PostgreSQL using jackc/pgx driver. + +### [https://github.com/Arlandaren/pgxWrappy](https://github.com/Arlandaren/pgxWrappy) + +Simplifies working with the pgx library, providing convenient scanning of nested structures. diff --git a/vendor/github.com/jackc/pgx/v5/Rakefile b/vendor/github.com/jackc/pgx/v5/Rakefile index d957573e9..3e3aa5030 100644 --- a/vendor/github.com/jackc/pgx/v5/Rakefile +++ b/vendor/github.com/jackc/pgx/v5/Rakefile @@ -2,7 +2,7 @@ require "erb" rule '.go' => '.go.erb' do |task| erb = ERB.new(File.read(task.source)) - File.write(task.name, "// Do not edit. Generated from #{task.source}\n" + erb.result(binding)) + File.write(task.name, "// Code generated from #{task.source}. DO NOT EDIT.\n\n" + erb.result(binding)) sh "goimports", "-w", task.name end diff --git a/vendor/github.com/jackc/pgx/v5/conn.go b/vendor/github.com/jackc/pgx/v5/conn.go index ed6a3a09e..93e2e7182 100644 --- a/vendor/github.com/jackc/pgx/v5/conn.go +++ b/vendor/github.com/jackc/pgx/v5/conn.go @@ -420,7 +420,7 @@ func (c *Conn) IsClosed() bool { return c.pgConn.IsClosed() } -func (c *Conn) die(err error) { +func (c *Conn) die() { if c.IsClosed() { return } @@ -588,14 +588,6 @@ func (c *Conn) execPrepared(ctx context.Context, sd *pgconn.StatementDescription return result.CommandTag, result.Err } -type unknownArgumentTypeQueryExecModeExecError struct { - arg any -} - -func (e *unknownArgumentTypeQueryExecModeExecError) Error() string { - return fmt.Sprintf("cannot use unregistered type %T as query argument in QueryExecModeExec", e.arg) -} - func (c *Conn) execSQLParams(ctx context.Context, sql string, args []any) (pgconn.CommandTag, error) { err := c.eqb.Build(c.typeMap, nil, args) if err != nil { @@ -661,11 +653,12 @@ const ( // should implement pgtype.Int64Valuer. QueryExecModeExec - // Use the simple protocol. Assume the PostgreSQL query parameter types based on the Go type of the arguments. Queries - // are executed in a single round trip. Type mappings can be registered with pgtype.Map.RegisterDefaultPgType. Queries - // will be rejected that have arguments that are unregistered or ambiguous. e.g. A map[string]string may have the - // PostgreSQL type json or hstore. Modes that know the PostgreSQL type can use a map[string]string directly as an - // argument. This mode cannot. + // Use the simple protocol. Assume the PostgreSQL query parameter types based on the Go type of the arguments. This is + // especially significant for []byte values. []byte values are encoded as PostgreSQL bytea. string must be used + // instead for text type values including json and jsonb. Type mappings can be registered with + // pgtype.Map.RegisterDefaultPgType. Queries will be rejected that have arguments that are unregistered or ambiguous. + // e.g. A map[string]string may have the PostgreSQL type json or hstore. Modes that know the PostgreSQL type can use a + // map[string]string directly as an argument. This mode cannot. Queries are executed in a single round trip. // // QueryExecModeSimpleProtocol should have the user application visible behavior as QueryExecModeExec. This includes // the warning regarding differences in text format and binary format encoding with user defined types. There may be diff --git a/vendor/github.com/jackc/pgx/v5/derived_types.go b/vendor/github.com/jackc/pgx/v5/derived_types.go index 22ab069cf..72c0a2423 100644 --- a/vendor/github.com/jackc/pgx/v5/derived_types.go +++ b/vendor/github.com/jackc/pgx/v5/derived_types.go @@ -161,7 +161,7 @@ type derivedTypeInfo struct { // The result of this call can be passed into RegisterTypes to complete the process. func (c *Conn) LoadTypes(ctx context.Context, typeNames []string) ([]*pgtype.Type, error) { m := c.TypeMap() - if typeNames == nil || len(typeNames) == 0 { + if len(typeNames) == 0 { return nil, fmt.Errorf("No type names were supplied.") } @@ -169,13 +169,7 @@ func (c *Conn) LoadTypes(ctx context.Context, typeNames []string) ([]*pgtype.Typ // the SQL not support recent structures such as multirange serverVersion, _ := serverVersion(c) sql := buildLoadDerivedTypesSQL(serverVersion, typeNames) - var rows Rows - var err error - if typeNames == nil { - rows, err = c.Query(ctx, sql, QueryExecModeSimpleProtocol) - } else { - rows, err = c.Query(ctx, sql, QueryExecModeSimpleProtocol, typeNames) - } + rows, err := c.Query(ctx, sql, QueryExecModeSimpleProtocol, typeNames) if err != nil { return nil, fmt.Errorf("While generating load types query: %w", err) } @@ -232,15 +226,15 @@ func (c *Conn) LoadTypes(ctx context.Context, typeNames []string) ([]*pgtype.Typ default: return nil, fmt.Errorf("Unknown typtype %q was found while registering %q", ti.Typtype, ti.TypeName) } - if type_ != nil { - m.RegisterType(type_) - if ti.NspName != "" { - nspType := &pgtype.Type{Name: ti.NspName + "." + type_.Name, OID: type_.OID, Codec: type_.Codec} - m.RegisterType(nspType) - result = append(result, nspType) - } - result = append(result, type_) + + // the type_ is imposible to be null + m.RegisterType(type_) + if ti.NspName != "" { + nspType := &pgtype.Type{Name: ti.NspName + "." + type_.Name, OID: type_.OID, Codec: type_.Codec} + m.RegisterType(nspType) + result = append(result, nspType) } + result = append(result, type_) } return result, nil } diff --git a/vendor/github.com/jackc/pgx/v5/internal/sanitize/benchmmark.sh b/vendor/github.com/jackc/pgx/v5/internal/sanitize/benchmmark.sh new file mode 100644 index 000000000..ec0f7b03a --- /dev/null +++ b/vendor/github.com/jackc/pgx/v5/internal/sanitize/benchmmark.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash + +current_branch=$(git rev-parse --abbrev-ref HEAD) +if [ "$current_branch" == "HEAD" ]; then + current_branch=$(git rev-parse HEAD) +fi + +restore_branch() { + echo "Restoring original branch/commit: $current_branch" + git checkout "$current_branch" +} +trap restore_branch EXIT + +# Check if there are uncommitted changes +if ! git diff --quiet || ! git diff --cached --quiet; then + echo "There are uncommitted changes. Please commit or stash them before running this script." + exit 1 +fi + +# Ensure that at least one commit argument is passed +if [ "$#" -lt 1 ]; then + echo "Usage: $0 ... " + exit 1 +fi + +commits=("$@") +benchmarks_dir=benchmarks + +if ! mkdir -p "${benchmarks_dir}"; then + echo "Unable to create dir for benchmarks data" + exit 1 +fi + +# Benchmark results +bench_files=() + +# Run benchmark for each listed commit +for i in "${!commits[@]}"; do + commit="${commits[i]}" + git checkout "$commit" || { + echo "Failed to checkout $commit" + exit 1 + } + + # Sanitized commmit message + commit_message=$(git log -1 --pretty=format:"%s" | tr -c '[:alnum:]-_' '_') + + # Benchmark data will go there + bench_file="${benchmarks_dir}/${i}_${commit_message}.bench" + + if ! go test -bench=. -count=10 >"$bench_file"; then + echo "Benchmarking failed for commit $commit" + exit 1 + fi + + bench_files+=("$bench_file") +done + +# go install golang.org/x/perf/cmd/benchstat[@latest] +benchstat "${bench_files[@]}" diff --git a/vendor/github.com/jackc/pgx/v5/internal/sanitize/sanitize.go b/vendor/github.com/jackc/pgx/v5/internal/sanitize/sanitize.go index df58c4484..b516817cb 100644 --- a/vendor/github.com/jackc/pgx/v5/internal/sanitize/sanitize.go +++ b/vendor/github.com/jackc/pgx/v5/internal/sanitize/sanitize.go @@ -4,8 +4,10 @@ import ( "bytes" "encoding/hex" "fmt" + "slices" "strconv" "strings" + "sync" "time" "unicode/utf8" ) @@ -24,18 +26,33 @@ type Query struct { // https://github.com/jackc/pgx/issues/1380 const replacementcharacterwidth = 3 +const maxBufSize = 16384 // 16 Ki + +var bufPool = &pool[*bytes.Buffer]{ + new: func() *bytes.Buffer { + return &bytes.Buffer{} + }, + reset: func(b *bytes.Buffer) bool { + n := b.Len() + b.Reset() + return n < maxBufSize + }, +} + +var null = []byte("null") + func (q *Query) Sanitize(args ...any) (string, error) { argUse := make([]bool, len(args)) - buf := &bytes.Buffer{} + buf := bufPool.get() + defer bufPool.put(buf) for _, part := range q.Parts { - var str string switch part := part.(type) { case string: - str = part + buf.WriteString(part) case int: argIdx := part - 1 - + var p []byte if argIdx < 0 { return "", fmt.Errorf("first sql argument must be > 0") } @@ -43,34 +60,41 @@ func (q *Query) Sanitize(args ...any) (string, error) { if argIdx >= len(args) { return "", fmt.Errorf("insufficient arguments") } + + // Prevent SQL injection via Line Comment Creation + // https://github.com/jackc/pgx/security/advisories/GHSA-m7wr-2xf7-cm9p + buf.WriteByte(' ') + arg := args[argIdx] switch arg := arg.(type) { case nil: - str = "null" + p = null case int64: - str = strconv.FormatInt(arg, 10) + p = strconv.AppendInt(buf.AvailableBuffer(), arg, 10) case float64: - str = strconv.FormatFloat(arg, 'f', -1, 64) + p = strconv.AppendFloat(buf.AvailableBuffer(), arg, 'f', -1, 64) case bool: - str = strconv.FormatBool(arg) + p = strconv.AppendBool(buf.AvailableBuffer(), arg) case []byte: - str = QuoteBytes(arg) + p = QuoteBytes(buf.AvailableBuffer(), arg) case string: - str = QuoteString(arg) + p = QuoteString(buf.AvailableBuffer(), arg) case time.Time: - str = arg.Truncate(time.Microsecond).Format("'2006-01-02 15:04:05.999999999Z07:00:00'") + p = arg.Truncate(time.Microsecond). + AppendFormat(buf.AvailableBuffer(), "'2006-01-02 15:04:05.999999999Z07:00:00'") default: return "", fmt.Errorf("invalid arg type: %T", arg) } argUse[argIdx] = true + buf.Write(p) + // Prevent SQL injection via Line Comment Creation // https://github.com/jackc/pgx/security/advisories/GHSA-m7wr-2xf7-cm9p - str = " " + str + " " + buf.WriteByte(' ') default: return "", fmt.Errorf("invalid Part type: %T", part) } - buf.WriteString(str) } for i, used := range argUse { @@ -82,26 +106,99 @@ func (q *Query) Sanitize(args ...any) (string, error) { } func NewQuery(sql string) (*Query, error) { - l := &sqlLexer{ - src: sql, - stateFn: rawState, + query := &Query{} + query.init(sql) + + return query, nil +} + +var sqlLexerPool = &pool[*sqlLexer]{ + new: func() *sqlLexer { + return &sqlLexer{} + }, + reset: func(sl *sqlLexer) bool { + *sl = sqlLexer{} + return true + }, +} + +func (q *Query) init(sql string) { + parts := q.Parts[:0] + if parts == nil { + // dirty, but fast heuristic to preallocate for ~90% usecases + n := strings.Count(sql, "$") + strings.Count(sql, "--") + 1 + parts = make([]Part, 0, n) } + l := sqlLexerPool.get() + defer sqlLexerPool.put(l) + + l.src = sql + l.stateFn = rawState + l.parts = parts + for l.stateFn != nil { l.stateFn = l.stateFn(l) } - query := &Query{Parts: l.parts} - - return query, nil + q.Parts = l.parts } -func QuoteString(str string) string { - return "'" + strings.ReplaceAll(str, "'", "''") + "'" +func QuoteString(dst []byte, str string) []byte { + const quote = '\'' + + // Preallocate space for the worst case scenario + dst = slices.Grow(dst, len(str)*2+2) + + // Add opening quote + dst = append(dst, quote) + + // Iterate through the string without allocating + for i := 0; i < len(str); i++ { + if str[i] == quote { + dst = append(dst, quote, quote) + } else { + dst = append(dst, str[i]) + } + } + + // Add closing quote + dst = append(dst, quote) + + return dst } -func QuoteBytes(buf []byte) string { - return `'\x` + hex.EncodeToString(buf) + "'" +func QuoteBytes(dst, buf []byte) []byte { + if len(buf) == 0 { + return append(dst, `'\x'`...) + } + + // Calculate required length + requiredLen := 3 + hex.EncodedLen(len(buf)) + 1 + + // Ensure dst has enough capacity + if cap(dst)-len(dst) < requiredLen { + newDst := make([]byte, len(dst), len(dst)+requiredLen) + copy(newDst, dst) + dst = newDst + } + + // Record original length and extend slice + origLen := len(dst) + dst = dst[:origLen+requiredLen] + + // Add prefix + dst[origLen] = '\'' + dst[origLen+1] = '\\' + dst[origLen+2] = 'x' + + // Encode bytes directly into dst + hex.Encode(dst[origLen+3:len(dst)-1], buf) + + // Add suffix + dst[len(dst)-1] = '\'' + + return dst } type sqlLexer struct { @@ -319,13 +416,45 @@ func multilineCommentState(l *sqlLexer) stateFn { } } +var queryPool = &pool[*Query]{ + new: func() *Query { + return &Query{} + }, + reset: func(q *Query) bool { + n := len(q.Parts) + q.Parts = q.Parts[:0] + return n < 64 // drop too large queries + }, +} + // SanitizeSQL replaces placeholder values with args. It quotes and escapes args // as necessary. This function is only safe when standard_conforming_strings is // on. func SanitizeSQL(sql string, args ...any) (string, error) { - query, err := NewQuery(sql) - if err != nil { - return "", err - } + query := queryPool.get() + query.init(sql) + defer queryPool.put(query) + return query.Sanitize(args...) } + +type pool[E any] struct { + p sync.Pool + new func() E + reset func(E) bool +} + +func (pool *pool[E]) get() E { + v, ok := pool.p.Get().(E) + if !ok { + v = pool.new() + } + + return v +} + +func (p *pool[E]) put(v E) { + if p.reset(v) { + p.p.Put(v) + } +} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go index 7efb522a4..14966aa49 100644 --- a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go +++ b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go @@ -1,6 +1,7 @@ package pgconn import ( + "container/list" "context" "crypto/md5" "crypto/tls" @@ -267,12 +268,15 @@ func connectPreferred(ctx context.Context, config *Config, connectOneConfigs []* var pgErr *PgError if errors.As(err, &pgErr) { - const ERRCODE_INVALID_PASSWORD = "28P01" // wrong password - const ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION = "28000" // wrong password or bad pg_hba.conf settings - const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist - const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege + // pgx will try next host even if libpq does not in certain cases (see #2246) + // consider change for the next major version + + const ERRCODE_INVALID_PASSWORD = "28P01" + const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist + const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege + + // auth failed due to invalid password, db does not exist or user has no permission if pgErr.Code == ERRCODE_INVALID_PASSWORD || - pgErr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION && c.tlsConfig != nil || pgErr.Code == ERRCODE_INVALID_CATALOG_NAME || pgErr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE { return nil, allErrors @@ -1408,9 +1412,8 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co // MultiResultReader is a reader for a command that could return multiple results such as Exec or ExecBatch. type MultiResultReader struct { - pgConn *PgConn - ctx context.Context - pipeline *Pipeline + pgConn *PgConn + ctx context.Context rr *ResultReader @@ -1443,12 +1446,8 @@ func (mrr *MultiResultReader) receiveMessage() (pgproto3.BackendMessage, error) switch msg := msg.(type) { case *pgproto3.ReadyForQuery: mrr.closed = true - if mrr.pipeline != nil { - mrr.pipeline.expectedReadyForQueryCount-- - } else { - mrr.pgConn.contextWatcher.Unwatch() - mrr.pgConn.unlock() - } + mrr.pgConn.contextWatcher.Unwatch() + mrr.pgConn.unlock() case *pgproto3.ErrorResponse: mrr.err = ErrorResponseToPgError(msg) } @@ -1672,7 +1671,11 @@ func (rr *ResultReader) receiveMessage() (msg pgproto3.BackendMessage, err error case *pgproto3.EmptyQueryResponse: rr.concludeCommand(CommandTag{}, nil) case *pgproto3.ErrorResponse: - rr.concludeCommand(CommandTag{}, ErrorResponseToPgError(msg)) + pgErr := ErrorResponseToPgError(msg) + if rr.pipeline != nil { + rr.pipeline.state.HandleError(pgErr) + } + rr.concludeCommand(CommandTag{}, pgErr) } return msg, nil @@ -1773,9 +1776,10 @@ func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultR batch.buf, batch.err = (&pgproto3.Sync{}).Encode(batch.buf) if batch.err != nil { + pgConn.contextWatcher.Unwatch() + multiResult.err = normalizeTimeoutError(multiResult.ctx, batch.err) multiResult.closed = true - multiResult.err = batch.err - pgConn.unlock() + pgConn.asyncClose() return multiResult } @@ -1783,9 +1787,10 @@ func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultR defer pgConn.exitPotentialWriteReadDeadlock() _, err := pgConn.conn.Write(batch.buf) if err != nil { + pgConn.contextWatcher.Unwatch() + multiResult.err = normalizeTimeoutError(multiResult.ctx, err) multiResult.closed = true - multiResult.err = err - pgConn.unlock() + pgConn.asyncClose() return multiResult } @@ -1999,9 +2004,7 @@ type Pipeline struct { conn *PgConn ctx context.Context - expectedReadyForQueryCount int - pendingSync bool - + state pipelineState err error closed bool } @@ -2012,6 +2015,122 @@ type PipelineSync struct{} // CloseComplete is returned by GetResults when a CloseComplete message is received. type CloseComplete struct{} +type pipelineRequestType int + +const ( + pipelineNil pipelineRequestType = iota + pipelinePrepare + pipelineQueryParams + pipelineQueryPrepared + pipelineDeallocate + pipelineSyncRequest + pipelineFlushRequest +) + +type pipelineRequestEvent struct { + RequestType pipelineRequestType + WasSentToServer bool + BeforeFlushOrSync bool +} + +type pipelineState struct { + requestEventQueue list.List + lastRequestType pipelineRequestType + pgErr *PgError + expectedReadyForQueryCount int +} + +func (s *pipelineState) Init() { + s.requestEventQueue.Init() + s.lastRequestType = pipelineNil +} + +func (s *pipelineState) RegisterSendingToServer() { + for elem := s.requestEventQueue.Back(); elem != nil; elem = elem.Prev() { + val := elem.Value.(pipelineRequestEvent) + if val.WasSentToServer { + return + } + val.WasSentToServer = true + elem.Value = val + } +} + +func (s *pipelineState) registerFlushingBufferOnServer() { + for elem := s.requestEventQueue.Back(); elem != nil; elem = elem.Prev() { + val := elem.Value.(pipelineRequestEvent) + if val.BeforeFlushOrSync { + return + } + val.BeforeFlushOrSync = true + elem.Value = val + } +} + +func (s *pipelineState) PushBackRequestType(req pipelineRequestType) { + if req == pipelineNil { + return + } + + if req != pipelineFlushRequest { + s.requestEventQueue.PushBack(pipelineRequestEvent{RequestType: req}) + } + if req == pipelineFlushRequest || req == pipelineSyncRequest { + s.registerFlushingBufferOnServer() + } + s.lastRequestType = req + + if req == pipelineSyncRequest { + s.expectedReadyForQueryCount++ + } +} + +func (s *pipelineState) ExtractFrontRequestType() pipelineRequestType { + for { + elem := s.requestEventQueue.Front() + if elem == nil { + return pipelineNil + } + val := elem.Value.(pipelineRequestEvent) + if !(val.WasSentToServer && val.BeforeFlushOrSync) { + return pipelineNil + } + + s.requestEventQueue.Remove(elem) + if val.RequestType == pipelineSyncRequest { + s.pgErr = nil + } + if s.pgErr == nil { + return val.RequestType + } + } +} + +func (s *pipelineState) HandleError(err *PgError) { + s.pgErr = err +} + +func (s *pipelineState) HandleReadyForQuery() { + s.expectedReadyForQueryCount-- +} + +func (s *pipelineState) PendingSync() bool { + var notPendingSync bool + + if elem := s.requestEventQueue.Back(); elem != nil { + val := elem.Value.(pipelineRequestEvent) + notPendingSync = (val.RequestType == pipelineSyncRequest) && val.WasSentToServer + } else { + notPendingSync = (s.lastRequestType == pipelineSyncRequest) || (s.lastRequestType == pipelineNil) + } + + return !notPendingSync +} + +func (s *pipelineState) ExpectedReadyForQuery() int { + return s.expectedReadyForQueryCount +} + // StartPipeline switches the connection to pipeline mode and returns a *Pipeline. In pipeline mode requests can be sent // to the server without waiting for a response. Close must be called on the returned *Pipeline to return the connection // to normal mode. While in pipeline mode, no methods that communicate with the server may be called except @@ -2020,16 +2139,21 @@ type CloseComplete struct{} // Prefer ExecBatch when only sending one group of queries at once. func (pgConn *PgConn) StartPipeline(ctx context.Context) *Pipeline { if err := pgConn.lock(); err != nil { - return &Pipeline{ + pipeline := &Pipeline{ closed: true, err: err, } + pipeline.state.Init() + + return pipeline } pgConn.pipeline = Pipeline{ conn: pgConn, ctx: ctx, } + pgConn.pipeline.state.Init() + pipeline := &pgConn.pipeline if ctx != context.Background() { @@ -2052,10 +2176,10 @@ func (p *Pipeline) SendPrepare(name, sql string, paramOIDs []uint32) { if p.closed { return } - p.pendingSync = true p.conn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs}) p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name}) + p.state.PushBackRequestType(pipelinePrepare) } // SendDeallocate deallocates a prepared statement. @@ -2063,9 +2187,9 @@ func (p *Pipeline) SendDeallocate(name string) { if p.closed { return } - p.pendingSync = true p.conn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name}) + p.state.PushBackRequestType(pipelineDeallocate) } // SendQueryParams is the pipeline version of *PgConn.QueryParams. @@ -2073,12 +2197,12 @@ func (p *Pipeline) SendQueryParams(sql string, paramValues [][]byte, paramOIDs [ if p.closed { return } - p.pendingSync = true p.conn.frontend.SendParse(&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}) p.conn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) p.conn.frontend.SendExecute(&pgproto3.Execute{}) + p.state.PushBackRequestType(pipelineQueryParams) } // SendQueryPrepared is the pipeline version of *PgConn.QueryPrepared. @@ -2086,11 +2210,42 @@ func (p *Pipeline) SendQueryPrepared(stmtName string, paramValues [][]byte, para if p.closed { return } - p.pendingSync = true p.conn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) p.conn.frontend.SendExecute(&pgproto3.Execute{}) + p.state.PushBackRequestType(pipelineQueryPrepared) +} + +// SendFlushRequest sends a request for the server to flush its output buffer. +// +// The server flushes its output buffer automatically as a result of Sync being called, +// or on any request when not in pipeline mode; this function is useful to cause the server +// to flush its output buffer in pipeline mode without establishing a synchronization point. +// Note that the request is not itself flushed to the server automatically; use Flush if +// necessary. This copies the behavior of libpq PQsendFlushRequest. +func (p *Pipeline) SendFlushRequest() { + if p.closed { + return + } + + p.conn.frontend.Send(&pgproto3.Flush{}) + p.state.PushBackRequestType(pipelineFlushRequest) +} + +// SendPipelineSync marks a synchronization point in a pipeline by sending a sync message +// without flushing the send buffer. This serves as the delimiter of an implicit +// transaction and an error recovery point. +// +// Note that the request is not itself flushed to the server automatically; use Flush if +// necessary. This copies the behavior of libpq PQsendPipelineSync. +func (p *Pipeline) SendPipelineSync() { + if p.closed { + return + } + + p.conn.frontend.SendSync(&pgproto3.Sync{}) + p.state.PushBackRequestType(pipelineSyncRequest) } // Flush flushes the queued requests without establishing a synchronization point. @@ -2115,28 +2270,14 @@ func (p *Pipeline) Flush() error { return err } + p.state.RegisterSendingToServer() return nil } // Sync establishes a synchronization point and flushes the queued requests. func (p *Pipeline) Sync() error { - if p.closed { - if p.err != nil { - return p.err - } - return errors.New("pipeline closed") - } - - p.conn.frontend.SendSync(&pgproto3.Sync{}) - err := p.Flush() - if err != nil { - return err - } - - p.pendingSync = false - p.expectedReadyForQueryCount++ - - return nil + p.SendPipelineSync() + return p.Flush() } // GetResults gets the next results. If results are present, results may be a *ResultReader, *StatementDescription, or @@ -2150,7 +2291,7 @@ func (p *Pipeline) GetResults() (results any, err error) { return nil, errors.New("pipeline closed") } - if p.expectedReadyForQueryCount == 0 { + if p.state.ExtractFrontRequestType() == pipelineNil { return nil, nil } @@ -2195,13 +2336,13 @@ func (p *Pipeline) getResults() (results any, err error) { case *pgproto3.CloseComplete: return &CloseComplete{}, nil case *pgproto3.ReadyForQuery: - p.expectedReadyForQueryCount-- + p.state.HandleReadyForQuery() return &PipelineSync{}, nil case *pgproto3.ErrorResponse: pgErr := ErrorResponseToPgError(msg) + p.state.HandleError(pgErr) return nil, pgErr } - } } @@ -2231,6 +2372,7 @@ func (p *Pipeline) getResultsPrepare() (*StatementDescription, error) { // These should never happen here. But don't take chances that could lead to a deadlock. case *pgproto3.ErrorResponse: pgErr := ErrorResponseToPgError(msg) + p.state.HandleError(pgErr) return nil, pgErr case *pgproto3.CommandComplete: p.conn.asyncClose() @@ -2250,7 +2392,7 @@ func (p *Pipeline) Close() error { p.closed = true - if p.pendingSync { + if p.state.PendingSync() { p.conn.asyncClose() p.err = errors.New("pipeline has unsynced requests") p.conn.contextWatcher.Unwatch() @@ -2259,7 +2401,7 @@ func (p *Pipeline) Close() error { return p.err } - for p.expectedReadyForQueryCount > 0 { + for p.state.ExpectedReadyForQuery() > 0 { _, err := p.getResults() if err != nil { p.err = err diff --git a/vendor/github.com/jackc/pgx/v5/pgproto3/password_message.go b/vendor/github.com/jackc/pgx/v5/pgproto3/password_message.go index d820d3275..67b78515d 100644 --- a/vendor/github.com/jackc/pgx/v5/pgproto3/password_message.go +++ b/vendor/github.com/jackc/pgx/v5/pgproto3/password_message.go @@ -12,7 +12,7 @@ type PasswordMessage struct { // Frontend identifies this message as sendable by a PostgreSQL frontend. func (*PasswordMessage) Frontend() {} -// Frontend identifies this message as an authentication response. +// InitialResponse identifies this message as an authentication response. func (*PasswordMessage) InitialResponse() {} // Decode decodes src into dst. src must contain the complete message with the exception of the initial 1 byte message diff --git a/vendor/github.com/jackc/pgx/v5/pgtype/doc.go b/vendor/github.com/jackc/pgx/v5/pgtype/doc.go index 7687ea8fe..83dfc5de5 100644 --- a/vendor/github.com/jackc/pgx/v5/pgtype/doc.go +++ b/vendor/github.com/jackc/pgx/v5/pgtype/doc.go @@ -53,8 +53,8 @@ similar fashion to database/sql. The second is to use a pointer to a pointer. return err } -When using nullable pgtype types as parameters for queries, one has to remember -to explicitly set their Valid field to true, otherwise the parameter's value will be NULL. +When using nullable pgtype types as parameters for queries, one has to remember to explicitly set their Valid field to +true, otherwise the parameter's value will be NULL. JSON Support @@ -159,11 +159,16 @@ example_child_records_test.go for an example. Overview of Scanning Implementation -The first step is to use the OID to lookup the correct Codec. If the OID is unavailable, Map will try to find the OID -from previous calls of Map.RegisterDefaultPgType. The Map will call the Codec's PlanScan method to get a plan for -scanning into the Go value. A Codec will support scanning into one or more Go types. Oftentime these Go types are -interfaces rather than explicit types. For example, PointCodec can use any Go type that implements the PointScanner and -PointValuer interfaces. +The first step is to use the OID to lookup the correct Codec. The Map will call the Codec's PlanScan method to get a +plan for scanning into the Go value. A Codec will support scanning into one or more Go types. Oftentime these Go types +are interfaces rather than explicit types. For example, PointCodec can use any Go type that implements the PointScanner +and PointValuer interfaces. + +If a Go value is not supported directly by a Codec then Map will try see if it is a sql.Scanner. If is then that +interface will be used to scan the value. Most sql.Scanners require the input to be in the text format (e.g. UUIDs and +numeric). However, pgx will typically have received the value in the binary format. In this case the binary value will be +parsed, reencoded as text, and then passed to the sql.Scanner. This may incur additional overhead for query results with +a large number of affected values. If a Go value is not supported directly by a Codec then Map will try wrapping it with additional logic and try again. For example, Int8Codec does not support scanning into a renamed type (e.g. type myInt64 int64). But Map will detect that diff --git a/vendor/github.com/jackc/pgx/v5/pgtype/int.go b/vendor/github.com/jackc/pgx/v5/pgtype/int.go index 90a20a26a..7a2f8cb24 100644 --- a/vendor/github.com/jackc/pgx/v5/pgtype/int.go +++ b/vendor/github.com/jackc/pgx/v5/pgtype/int.go @@ -1,4 +1,5 @@ -// Do not edit. Generated from pgtype/int.go.erb +// Code generated from pgtype/int.go.erb. DO NOT EDIT. + package pgtype import ( diff --git a/vendor/github.com/jackc/pgx/v5/pgtype/json.go b/vendor/github.com/jackc/pgx/v5/pgtype/json.go index 48b9f9771..6f7ebb51f 100644 --- a/vendor/github.com/jackc/pgx/v5/pgtype/json.go +++ b/vendor/github.com/jackc/pgx/v5/pgtype/json.go @@ -71,6 +71,27 @@ func (c *JSONCodec) PlanEncode(m *Map, oid uint32, format int16, value any) Enco } } +// JSON needs its on scan plan for pointers to handle 'null'::json(b). +// Consider making pointerPointerScanPlan more flexible in the future. +type jsonPointerScanPlan struct { + next ScanPlan +} + +func (p jsonPointerScanPlan) Scan(src []byte, dst any) error { + el := reflect.ValueOf(dst).Elem() + if src == nil || string(src) == "null" { + el.SetZero() + return nil + } + + el.Set(reflect.New(el.Type().Elem())) + if p.next != nil { + return p.next.Scan(src, el.Interface()) + } + + return nil +} + type encodePlanJSONCodecEitherFormatString struct{} func (encodePlanJSONCodecEitherFormatString) Encode(value any, buf []byte) (newBuf []byte, err error) { @@ -117,58 +138,36 @@ func (e *encodePlanJSONCodecEitherFormatMarshal) Encode(value any, buf []byte) ( return buf, nil } -func (c *JSONCodec) PlanScan(m *Map, oid uint32, format int16, target any) ScanPlan { - switch target.(type) { - case *string: - return scanPlanAnyToString{} - - case **string: - // This is to fix **string scanning. It seems wrong to special case **string, but it's not clear what a better - // solution would be. - // - // https://github.com/jackc/pgx/issues/1470 -- **string - // https://github.com/jackc/pgx/issues/1691 -- ** anything else - - if wrapperPlan, nextDst, ok := TryPointerPointerScanPlan(target); ok { - if nextPlan := m.planScan(oid, format, nextDst, 0); nextPlan != nil { - if _, failed := nextPlan.(*scanPlanFail); !failed { - wrapperPlan.SetNext(nextPlan) - return wrapperPlan - } - } - } - - case *[]byte: - return scanPlanJSONToByteSlice{} - case BytesScanner: - return scanPlanBinaryBytesToBytesScanner{} - - } - - // Cannot rely on sql.Scanner being handled later because scanPlanJSONToJSONUnmarshal will take precedence. - // - // https://github.com/jackc/pgx/issues/1418 - if isSQLScanner(target) { - return &scanPlanSQLScanner{formatCode: format} - } - - return &scanPlanJSONToJSONUnmarshal{ - unmarshal: c.Unmarshal, - } +func (c *JSONCodec) PlanScan(m *Map, oid uint32, formatCode int16, target any) ScanPlan { + return c.planScan(m, oid, formatCode, target, 0) } -// we need to check if the target is a pointer to a sql.Scanner (or any of the pointer ref tree implements a sql.Scanner). -// -// https://github.com/jackc/pgx/issues/2146 -func isSQLScanner(v any) bool { - val := reflect.ValueOf(v) - for val.Kind() == reflect.Ptr { - if _, ok := val.Interface().(sql.Scanner); ok { - return true - } - val = val.Elem() +// JSON cannot fallback to pointerPointerScanPlan because of 'null'::json(b), +// so we need to duplicate the logic here. +func (c *JSONCodec) planScan(m *Map, oid uint32, formatCode int16, target any, depth int) ScanPlan { + if depth > 8 { + return &scanPlanFail{m: m, oid: oid, formatCode: formatCode} + } + + switch target.(type) { + case *string: + return &scanPlanAnyToString{} + case *[]byte: + return &scanPlanJSONToByteSlice{} + case BytesScanner: + return &scanPlanBinaryBytesToBytesScanner{} + case sql.Scanner: + return &scanPlanSQLScanner{formatCode: formatCode} + } + + rv := reflect.ValueOf(target) + if rv.Kind() == reflect.Pointer && rv.Elem().Kind() == reflect.Pointer { + var plan jsonPointerScanPlan + plan.next = c.planScan(m, oid, formatCode, rv.Elem().Interface(), depth+1) + return plan + } else { + return &scanPlanJSONToJSONUnmarshal{unmarshal: c.Unmarshal} } - return false } type scanPlanAnyToString struct{} @@ -198,7 +197,7 @@ type scanPlanJSONToJSONUnmarshal struct { } func (s *scanPlanJSONToJSONUnmarshal) Scan(src []byte, dst any) error { - if src == nil { + if src == nil || string(src) == "null" { dstValue := reflect.ValueOf(dst) if dstValue.Kind() == reflect.Ptr { el := dstValue.Elem() @@ -212,7 +211,12 @@ func (s *scanPlanJSONToJSONUnmarshal) Scan(src []byte, dst any) error { return fmt.Errorf("cannot scan NULL into %T", dst) } - elem := reflect.ValueOf(dst).Elem() + v := reflect.ValueOf(dst) + if v.Kind() != reflect.Pointer || v.IsNil() { + return fmt.Errorf("cannot scan into non-pointer or nil destinations %T", dst) + } + + elem := v.Elem() elem.Set(reflect.Zero(elem.Type())) return s.unmarshal(src, dst) diff --git a/vendor/github.com/jackc/pgx/v5/pgtype/pgtype.go b/vendor/github.com/jackc/pgx/v5/pgtype/pgtype.go index f9d43edd7..a1083161c 100644 --- a/vendor/github.com/jackc/pgx/v5/pgtype/pgtype.go +++ b/vendor/github.com/jackc/pgx/v5/pgtype/pgtype.go @@ -396,11 +396,7 @@ type scanPlanSQLScanner struct { } func (plan *scanPlanSQLScanner) Scan(src []byte, dst any) error { - scanner := getSQLScanner(dst) - - if scanner == nil { - return fmt.Errorf("cannot scan into %T", dst) - } + scanner := dst.(sql.Scanner) if src == nil { // This is necessary because interface value []byte:nil does not equal nil:nil for the binary format path and the @@ -413,21 +409,6 @@ func (plan *scanPlanSQLScanner) Scan(src []byte, dst any) error { } } -// we don't know if the target is a sql.Scanner or a pointer on a sql.Scanner, so we need to check recursively -func getSQLScanner(target any) sql.Scanner { - val := reflect.ValueOf(target) - for val.Kind() == reflect.Ptr { - if _, ok := val.Interface().(sql.Scanner); ok { - if val.IsNil() { - val.Set(reflect.New(val.Type().Elem())) - } - return val.Interface().(sql.Scanner) - } - val = val.Elem() - } - return nil -} - type scanPlanString struct{} func (scanPlanString) Scan(src []byte, dst any) error { diff --git a/vendor/github.com/jackc/pgx/v5/pgtype/pgtype_default.go b/vendor/github.com/jackc/pgx/v5/pgtype/pgtype_default.go index 9496cb974..8cb512fa5 100644 --- a/vendor/github.com/jackc/pgx/v5/pgtype/pgtype_default.go +++ b/vendor/github.com/jackc/pgx/v5/pgtype/pgtype_default.go @@ -91,7 +91,25 @@ func initDefaultMap() { defaultMap.RegisterType(&Type{Name: "varchar", OID: VarcharOID, Codec: TextCodec{}}) defaultMap.RegisterType(&Type{Name: "xid", OID: XIDOID, Codec: Uint32Codec{}}) defaultMap.RegisterType(&Type{Name: "xid8", OID: XID8OID, Codec: Uint64Codec{}}) - defaultMap.RegisterType(&Type{Name: "xml", OID: XMLOID, Codec: &XMLCodec{Marshal: xml.Marshal, Unmarshal: xml.Unmarshal}}) + defaultMap.RegisterType(&Type{Name: "xml", OID: XMLOID, Codec: &XMLCodec{ + Marshal: xml.Marshal, + // xml.Unmarshal does not support unmarshalling into *any. However, XMLCodec.DecodeValue calls Unmarshal with a + // *any. Wrap xml.Marshal with a function that copies the data into a new byte slice in this case. Not implementing + // directly in XMLCodec.DecodeValue to allow for the unlikely possibility that someone uses an alternative XML + // unmarshaler that does support unmarshalling into *any. + // + // https://github.com/jackc/pgx/issues/2227 + // https://github.com/jackc/pgx/pull/2228 + Unmarshal: func(data []byte, v any) error { + if v, ok := v.(*any); ok { + dstBuf := make([]byte, len(data)) + copy(dstBuf, data) + *v = dstBuf + return nil + } + return xml.Unmarshal(data, v) + }, + }}) // Range types defaultMap.RegisterType(&Type{Name: "daterange", OID: DaterangeOID, Codec: &RangeCodec{ElementType: defaultMap.oidToType[DateOID]}}) diff --git a/vendor/github.com/jackc/pgx/v5/pgtype/timestamp.go b/vendor/github.com/jackc/pgx/v5/pgtype/timestamp.go index ff2739d6b..c31f2ac53 100644 --- a/vendor/github.com/jackc/pgx/v5/pgtype/timestamp.go +++ b/vendor/github.com/jackc/pgx/v5/pgtype/timestamp.go @@ -12,6 +12,7 @@ import ( ) const pgTimestampFormat = "2006-01-02 15:04:05.999999999" +const jsonISO8601 = "2006-01-02T15:04:05.999999999" type TimestampScanner interface { ScanTimestamp(v Timestamp) error @@ -76,7 +77,7 @@ func (ts Timestamp) MarshalJSON() ([]byte, error) { switch ts.InfinityModifier { case Finite: - s = ts.Time.Format(time.RFC3339Nano) + s = ts.Time.Format(jsonISO8601) case Infinity: s = "infinity" case NegativeInfinity: @@ -104,15 +105,23 @@ func (ts *Timestamp) UnmarshalJSON(b []byte) error { case "-infinity": *ts = Timestamp{Valid: true, InfinityModifier: -Infinity} default: - // PostgreSQL uses ISO 8601 wihout timezone for to_json function and casting from a string to timestampt - tim, err := time.Parse(time.RFC3339Nano, *s+"Z") - if err != nil { - return err + // Parse time with or without timezonr + tss := *s + // PostgreSQL uses ISO 8601 without timezone for to_json function and casting from a string to timestampt + tim, err := time.Parse(time.RFC3339Nano, tss) + if err == nil { + *ts = Timestamp{Time: tim, Valid: true} + return nil } - - *ts = Timestamp{Time: tim, Valid: true} + tim, err = time.ParseInLocation(jsonISO8601, tss, time.UTC) + if err == nil { + *ts = Timestamp{Time: tim, Valid: true} + return nil + } + ts.Valid = false + return fmt.Errorf("cannot unmarshal %s to timestamp with layout %s or %s (%w)", + *s, time.RFC3339Nano, jsonISO8601, err) } - return nil } diff --git a/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go b/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go index 270b7617a..e22ed289a 100644 --- a/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go +++ b/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go @@ -17,6 +17,7 @@ import ( var defaultMaxConns = int32(4) var defaultMinConns = int32(0) +var defaultMinIdleConns = int32(0) var defaultMaxConnLifetime = time.Hour var defaultMaxConnIdleTime = time.Minute * 30 var defaultHealthCheckPeriod = time.Minute @@ -87,6 +88,7 @@ type Pool struct { afterRelease func(*pgx.Conn) bool beforeClose func(*pgx.Conn) minConns int32 + minIdleConns int32 maxConns int32 maxConnLifetime time.Duration maxConnLifetimeJitter time.Duration @@ -144,6 +146,13 @@ type Config struct { // to create new connections. MinConns int32 + // MinIdleConns is the minimum number of idle connections in the pool. You can increase this to ensure that + // there are always idle connections available. This can help reduce tail latencies during request processing, + // as you can avoid the latency of establishing a new connection while handling requests. It is superior + // to MinConns for this purpose. + // Similar to MinConns, the pool might temporarily dip below MinIdleConns after connection closes. + MinIdleConns int32 + // HealthCheckPeriod is the duration between checks of the health of idle connections. HealthCheckPeriod time.Duration @@ -189,6 +198,7 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) { afterRelease: config.AfterRelease, beforeClose: config.BeforeClose, minConns: config.MinConns, + minIdleConns: config.MinIdleConns, maxConns: config.MaxConns, maxConnLifetime: config.MaxConnLifetime, maxConnLifetimeJitter: config.MaxConnLifetimeJitter, @@ -271,7 +281,8 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) { } go func() { - p.createIdleResources(ctx, int(p.minConns)) + targetIdleResources := max(int(p.minConns), int(p.minIdleConns)) + p.createIdleResources(ctx, targetIdleResources) p.backgroundHealthCheck() }() @@ -334,6 +345,17 @@ func ParseConfig(connString string) (*Config, error) { config.MinConns = defaultMinConns } + if s, ok := config.ConnConfig.Config.RuntimeParams["pool_min_idle_conns"]; ok { + delete(connConfig.Config.RuntimeParams, "pool_min_idle_conns") + n, err := strconv.ParseInt(s, 10, 32) + if err != nil { + return nil, fmt.Errorf("cannot parse pool_min_idle_conns: %w", err) + } + config.MinIdleConns = int32(n) + } else { + config.MinIdleConns = defaultMinIdleConns + } + if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok { delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime") d, err := time.ParseDuration(s) @@ -472,7 +494,9 @@ func (p *Pool) checkMinConns() error { // TotalConns can include ones that are being destroyed but we should have // sleep(500ms) around all of the destroys to help prevent that from throwing // off this check - toCreate := p.minConns - p.Stat().TotalConns() + + // Create the number of connections needed to get to both minConns and minIdleConns + toCreate := max(p.minConns-p.Stat().TotalConns(), p.minIdleConns-p.Stat().IdleConns()) if toCreate > 0 { return p.createIdleResources(context.Background(), int(toCreate)) } diff --git a/vendor/github.com/jackc/pgx/v5/pgxpool/stat.go b/vendor/github.com/jackc/pgx/v5/pgxpool/stat.go index cfa0c4c56..e02b6ac39 100644 --- a/vendor/github.com/jackc/pgx/v5/pgxpool/stat.go +++ b/vendor/github.com/jackc/pgx/v5/pgxpool/stat.go @@ -82,3 +82,10 @@ func (s *Stat) MaxLifetimeDestroyCount() int64 { func (s *Stat) MaxIdleDestroyCount() int64 { return s.idleDestroyCount } + +// EmptyAcquireWaitTime returns the cumulative time waited for successful acquires +// from the pool for a resource to be released or constructed because the pool was +// empty. +func (s *Stat) EmptyAcquireWaitTime() time.Duration { + return s.s.EmptyAcquireWaitTime() +} diff --git a/vendor/github.com/jackc/pgx/v5/rows.go b/vendor/github.com/jackc/pgx/v5/rows.go index f23625d4c..f6f26f479 100644 --- a/vendor/github.com/jackc/pgx/v5/rows.go +++ b/vendor/github.com/jackc/pgx/v5/rows.go @@ -272,7 +272,7 @@ func (rows *baseRows) Scan(dest ...any) error { err := rows.scanPlans[i].Scan(values[i], dst) if err != nil { - err = ScanArgError{ColumnIndex: i, Err: err} + err = ScanArgError{ColumnIndex: i, FieldName: fieldDescriptions[i].Name, Err: err} rows.fatal(err) return err } @@ -334,11 +334,16 @@ func (rows *baseRows) Conn() *Conn { type ScanArgError struct { ColumnIndex int + FieldName string Err error } func (e ScanArgError) Error() string { - return fmt.Sprintf("can't scan into dest[%d]: %v", e.ColumnIndex, e.Err) + if e.FieldName == "?column?" { // Don't include the fieldname if it's unknown + return fmt.Sprintf("can't scan into dest[%d]: %v", e.ColumnIndex, e.Err) + } + + return fmt.Sprintf("can't scan into dest[%d] (col: %s): %v", e.ColumnIndex, e.FieldName, e.Err) } func (e ScanArgError) Unwrap() error { @@ -366,7 +371,7 @@ func ScanRow(typeMap *pgtype.Map, fieldDescriptions []pgconn.FieldDescription, v err := typeMap.Scan(fieldDescriptions[i].DataTypeOID, fieldDescriptions[i].Format, values[i], d) if err != nil { - return ScanArgError{ColumnIndex: i, Err: err} + return ScanArgError{ColumnIndex: i, FieldName: fieldDescriptions[i].Name, Err: err} } } @@ -468,6 +473,8 @@ func CollectOneRow[T any](rows Rows, fn RowToFunc[T]) (T, error) { return value, err } + // The defer rows.Close() won't have executed yet. If the query returned more than one row, rows would still be open. + // rows.Close() must be called before rows.Err() so we explicitly call it here. rows.Close() return value, rows.Err() } diff --git a/vendor/github.com/jackc/pgx/v5/tx.go b/vendor/github.com/jackc/pgx/v5/tx.go index 168d7ba6c..571e5e00f 100644 --- a/vendor/github.com/jackc/pgx/v5/tx.go +++ b/vendor/github.com/jackc/pgx/v5/tx.go @@ -3,7 +3,6 @@ package pgx import ( "context" "errors" - "fmt" "strconv" "strings" @@ -103,7 +102,7 @@ func (c *Conn) BeginTx(ctx context.Context, txOptions TxOptions) (Tx, error) { if err != nil { // begin should never fail unless there is an underlying connection issue or // a context timeout. In either case, the connection is possibly broken. - c.die(errors.New("failed to begin transaction")) + c.die() return nil, err } @@ -216,7 +215,7 @@ func (tx *dbTx) Rollback(ctx context.Context) error { tx.closed = true if err != nil { // A rollback failure leaves the connection in an undefined state - tx.conn.die(fmt.Errorf("rollback failed: %w", err)) + tx.conn.die() return err } diff --git a/vendor/modules.txt b/vendor/modules.txt index 2b1469c62..5aeaa2824 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -587,7 +587,7 @@ github.com/jackc/pgpassfile # github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 ## explicit; go 1.14 github.com/jackc/pgservicefile -# github.com/jackc/pgx/v5 v5.7.2 +# github.com/jackc/pgx/v5 v5.7.3 ## explicit; go 1.21 github.com/jackc/pgx/v5 github.com/jackc/pgx/v5/internal/iobufpool