[chore]: Bump github.com/jackc/pgx/v5 from 5.7.2 to 5.7.3 (#3935)

Bumps [github.com/jackc/pgx/v5](https://github.com/jackc/pgx) from 5.7.2 to 5.7.3.
- [Changelog](https://github.com/jackc/pgx/blob/master/CHANGELOG.md)
- [Commits](https://github.com/jackc/pgx/compare/v5.7.2...v5.7.3)

---
updated-dependencies:
- dependency-name: github.com/jackc/pgx/v5
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
This commit is contained in:
dependabot[bot]
2025-03-24 10:50:09 +00:00
committed by GitHub
parent a2caa5fdbb
commit 18c8f85a30
22 changed files with 608 additions and 209 deletions

2
go.mod
View File

@ -47,7 +47,7 @@ require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/gorilla/feeds v1.2.0 github.com/gorilla/feeds v1.2.0
github.com/gorilla/websocket v1.5.3 github.com/gorilla/websocket v1.5.3
github.com/jackc/pgx/v5 v5.7.2 github.com/jackc/pgx/v5 v5.7.3
github.com/k3a/html2text v1.2.1 github.com/k3a/html2text v1.2.1
github.com/microcosm-cc/bluemonday v1.0.27 github.com/microcosm-cc/bluemonday v1.0.27
github.com/miekg/dns v1.1.63 github.com/miekg/dns v1.1.63

4
go.sum generated
View File

@ -258,8 +258,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= github.com/jackc/pgx/v5 v5.7.3 h1:PO1wNKj/bTAwxSJnO1Z4Ai8j4magtqg2SLNjEDzcXQo=
github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= github.com/jackc/pgx/v5 v5.7.3/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=

View File

@ -1,3 +1,16 @@
# 5.7.3 (March 21, 2025)
* Expose EmptyAcquireWaitTime in pgxpool.Stat (vamshiaruru32)
* Improve SQL sanitizer performance (ninedraft)
* Fix Scan confusion with json(b), sql.Scanner, and automatic dereferencing (moukoublen, felix-roehrich)
* Fix Values() for xml type always returning nil instead of []byte
* Add ability to send Flush message in pipeline mode (zenkovev)
* Fix pgtype.Timestamp's JSON behavior to match PostgreSQL (pconstantinou)
* Better error messages when scanning structs (logicbomb)
* Fix handling of error on batch write (bonnefoa)
* Match libpq's connection fallback behavior more closely (felix-roehrich)
* Add MinIdleConns to pgxpool (djahandarie)
# 5.7.2 (December 21, 2024) # 5.7.2 (December 21, 2024)
* Fix prepared statement already exists on batch prepare failure * Fix prepared statement already exists on batch prepare failure
@ -9,6 +22,7 @@
* Implement pgtype.UUID.String() (Konstantin Grachev) * Implement pgtype.UUID.String() (Konstantin Grachev)
* Switch from ExecParams to Exec in ValidateConnectTargetSessionAttrs functions (Alexander Rumyantsev) * Switch from ExecParams to Exec in ValidateConnectTargetSessionAttrs functions (Alexander Rumyantsev)
* Update golang.org/x/crypto * Update golang.org/x/crypto
* Fix json(b) columns prefer sql.Scanner interface like database/sql (Ludovico Russo)
# 5.7.1 (September 10, 2024) # 5.7.1 (September 10, 2024)

View File

@ -92,7 +92,7 @@ See the presentation at Golang Estonia, [PGX Top to Bottom](https://www.youtube.
## Supported Go and PostgreSQL Versions ## Supported Go and PostgreSQL Versions
pgx supports the same versions of Go and PostgreSQL that are supported by their respective teams. For [Go](https://golang.org/doc/devel/release.html#policy) that is the two most recent major releases and for [PostgreSQL](https://www.postgresql.org/support/versioning/) the major releases in the last 5 years. This means pgx supports Go 1.21 and higher and PostgreSQL 12 and higher. pgx also is tested against the latest version of [CockroachDB](https://www.cockroachlabs.com/product/). pgx supports the same versions of Go and PostgreSQL that are supported by their respective teams. For [Go](https://golang.org/doc/devel/release.html#policy) that is the two most recent major releases and for [PostgreSQL](https://www.postgresql.org/support/versioning/) the major releases in the last 5 years. This means pgx supports Go 1.22 and higher and PostgreSQL 13 and higher. pgx also is tested against the latest version of [CockroachDB](https://www.cockroachlabs.com/product/).
## Version Policy ## Version Policy
@ -172,3 +172,15 @@ Supports, structs, maps, slices and custom mapping functions.
### [github.com/z0ne-dev/mgx](https://github.com/z0ne-dev/mgx) ### [github.com/z0ne-dev/mgx](https://github.com/z0ne-dev/mgx)
Code first migration library for native pgx (no database/sql abstraction). Code first migration library for native pgx (no database/sql abstraction).
### [github.com/amirsalarsafaei/sqlc-pgx-monitoring](https://github.com/amirsalarsafaei/sqlc-pgx-monitoring)
A database monitoring/metrics library for pgx and sqlc. Trace, log and monitor your sqlc query performance using OpenTelemetry.
### [https://github.com/nikolayk812/pgx-outbox](https://github.com/nikolayk812/pgx-outbox)
Simple Golang implementation for transactional outbox pattern for PostgreSQL using jackc/pgx driver.
### [https://github.com/Arlandaren/pgxWrappy](https://github.com/Arlandaren/pgxWrappy)
Simplifies working with the pgx library, providing convenient scanning of nested structures.

View File

@ -2,7 +2,7 @@ require "erb"
rule '.go' => '.go.erb' do |task| rule '.go' => '.go.erb' do |task|
erb = ERB.new(File.read(task.source)) erb = ERB.new(File.read(task.source))
File.write(task.name, "// Do not edit. Generated from #{task.source}\n" + erb.result(binding)) File.write(task.name, "// Code generated from #{task.source}. DO NOT EDIT.\n\n" + erb.result(binding))
sh "goimports", "-w", task.name sh "goimports", "-w", task.name
end end

View File

@ -420,7 +420,7 @@ func (c *Conn) IsClosed() bool {
return c.pgConn.IsClosed() return c.pgConn.IsClosed()
} }
func (c *Conn) die(err error) { func (c *Conn) die() {
if c.IsClosed() { if c.IsClosed() {
return return
} }
@ -588,14 +588,6 @@ func (c *Conn) execPrepared(ctx context.Context, sd *pgconn.StatementDescription
return result.CommandTag, result.Err return result.CommandTag, result.Err
} }
type unknownArgumentTypeQueryExecModeExecError struct {
arg any
}
func (e *unknownArgumentTypeQueryExecModeExecError) Error() string {
return fmt.Sprintf("cannot use unregistered type %T as query argument in QueryExecModeExec", e.arg)
}
func (c *Conn) execSQLParams(ctx context.Context, sql string, args []any) (pgconn.CommandTag, error) { func (c *Conn) execSQLParams(ctx context.Context, sql string, args []any) (pgconn.CommandTag, error) {
err := c.eqb.Build(c.typeMap, nil, args) err := c.eqb.Build(c.typeMap, nil, args)
if err != nil { if err != nil {
@ -661,11 +653,12 @@ const (
// should implement pgtype.Int64Valuer. // should implement pgtype.Int64Valuer.
QueryExecModeExec QueryExecModeExec
// Use the simple protocol. Assume the PostgreSQL query parameter types based on the Go type of the arguments. Queries // Use the simple protocol. Assume the PostgreSQL query parameter types based on the Go type of the arguments. This is
// are executed in a single round trip. Type mappings can be registered with pgtype.Map.RegisterDefaultPgType. Queries // especially significant for []byte values. []byte values are encoded as PostgreSQL bytea. string must be used
// will be rejected that have arguments that are unregistered or ambiguous. e.g. A map[string]string may have the // instead for text type values including json and jsonb. Type mappings can be registered with
// PostgreSQL type json or hstore. Modes that know the PostgreSQL type can use a map[string]string directly as an // pgtype.Map.RegisterDefaultPgType. Queries will be rejected that have arguments that are unregistered or ambiguous.
// argument. This mode cannot. // e.g. A map[string]string may have the PostgreSQL type json or hstore. Modes that know the PostgreSQL type can use a
// map[string]string directly as an argument. This mode cannot. Queries are executed in a single round trip.
// //
// QueryExecModeSimpleProtocol should have the user application visible behavior as QueryExecModeExec. This includes // QueryExecModeSimpleProtocol should have the user application visible behavior as QueryExecModeExec. This includes
// the warning regarding differences in text format and binary format encoding with user defined types. There may be // the warning regarding differences in text format and binary format encoding with user defined types. There may be

View File

@ -161,7 +161,7 @@ type derivedTypeInfo struct {
// The result of this call can be passed into RegisterTypes to complete the process. // The result of this call can be passed into RegisterTypes to complete the process.
func (c *Conn) LoadTypes(ctx context.Context, typeNames []string) ([]*pgtype.Type, error) { func (c *Conn) LoadTypes(ctx context.Context, typeNames []string) ([]*pgtype.Type, error) {
m := c.TypeMap() m := c.TypeMap()
if typeNames == nil || len(typeNames) == 0 { if len(typeNames) == 0 {
return nil, fmt.Errorf("No type names were supplied.") return nil, fmt.Errorf("No type names were supplied.")
} }
@ -169,13 +169,7 @@ func (c *Conn) LoadTypes(ctx context.Context, typeNames []string) ([]*pgtype.Typ
// the SQL not support recent structures such as multirange // the SQL not support recent structures such as multirange
serverVersion, _ := serverVersion(c) serverVersion, _ := serverVersion(c)
sql := buildLoadDerivedTypesSQL(serverVersion, typeNames) sql := buildLoadDerivedTypesSQL(serverVersion, typeNames)
var rows Rows rows, err := c.Query(ctx, sql, QueryExecModeSimpleProtocol, typeNames)
var err error
if typeNames == nil {
rows, err = c.Query(ctx, sql, QueryExecModeSimpleProtocol)
} else {
rows, err = c.Query(ctx, sql, QueryExecModeSimpleProtocol, typeNames)
}
if err != nil { if err != nil {
return nil, fmt.Errorf("While generating load types query: %w", err) return nil, fmt.Errorf("While generating load types query: %w", err)
} }
@ -232,7 +226,8 @@ func (c *Conn) LoadTypes(ctx context.Context, typeNames []string) ([]*pgtype.Typ
default: default:
return nil, fmt.Errorf("Unknown typtype %q was found while registering %q", ti.Typtype, ti.TypeName) return nil, fmt.Errorf("Unknown typtype %q was found while registering %q", ti.Typtype, ti.TypeName)
} }
if type_ != nil {
// the type_ is imposible to be null
m.RegisterType(type_) m.RegisterType(type_)
if ti.NspName != "" { if ti.NspName != "" {
nspType := &pgtype.Type{Name: ti.NspName + "." + type_.Name, OID: type_.OID, Codec: type_.Codec} nspType := &pgtype.Type{Name: ti.NspName + "." + type_.Name, OID: type_.OID, Codec: type_.Codec}
@ -241,7 +236,6 @@ func (c *Conn) LoadTypes(ctx context.Context, typeNames []string) ([]*pgtype.Typ
} }
result = append(result, type_) result = append(result, type_)
} }
}
return result, nil return result, nil
} }

View File

@ -0,0 +1,60 @@
#!/usr/bin/env bash
current_branch=$(git rev-parse --abbrev-ref HEAD)
if [ "$current_branch" == "HEAD" ]; then
current_branch=$(git rev-parse HEAD)
fi
restore_branch() {
echo "Restoring original branch/commit: $current_branch"
git checkout "$current_branch"
}
trap restore_branch EXIT
# Check if there are uncommitted changes
if ! git diff --quiet || ! git diff --cached --quiet; then
echo "There are uncommitted changes. Please commit or stash them before running this script."
exit 1
fi
# Ensure that at least one commit argument is passed
if [ "$#" -lt 1 ]; then
echo "Usage: $0 <commit1> <commit2> ... <commitN>"
exit 1
fi
commits=("$@")
benchmarks_dir=benchmarks
if ! mkdir -p "${benchmarks_dir}"; then
echo "Unable to create dir for benchmarks data"
exit 1
fi
# Benchmark results
bench_files=()
# Run benchmark for each listed commit
for i in "${!commits[@]}"; do
commit="${commits[i]}"
git checkout "$commit" || {
echo "Failed to checkout $commit"
exit 1
}
# Sanitized commmit message
commit_message=$(git log -1 --pretty=format:"%s" | tr -c '[:alnum:]-_' '_')
# Benchmark data will go there
bench_file="${benchmarks_dir}/${i}_${commit_message}.bench"
if ! go test -bench=. -count=10 >"$bench_file"; then
echo "Benchmarking failed for commit $commit"
exit 1
fi
bench_files+=("$bench_file")
done
# go install golang.org/x/perf/cmd/benchstat[@latest]
benchstat "${bench_files[@]}"

View File

@ -4,8 +4,10 @@ import (
"bytes" "bytes"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"slices"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"unicode/utf8" "unicode/utf8"
) )
@ -24,18 +26,33 @@ type Query struct {
// https://github.com/jackc/pgx/issues/1380 // https://github.com/jackc/pgx/issues/1380
const replacementcharacterwidth = 3 const replacementcharacterwidth = 3
const maxBufSize = 16384 // 16 Ki
var bufPool = &pool[*bytes.Buffer]{
new: func() *bytes.Buffer {
return &bytes.Buffer{}
},
reset: func(b *bytes.Buffer) bool {
n := b.Len()
b.Reset()
return n < maxBufSize
},
}
var null = []byte("null")
func (q *Query) Sanitize(args ...any) (string, error) { func (q *Query) Sanitize(args ...any) (string, error) {
argUse := make([]bool, len(args)) argUse := make([]bool, len(args))
buf := &bytes.Buffer{} buf := bufPool.get()
defer bufPool.put(buf)
for _, part := range q.Parts { for _, part := range q.Parts {
var str string
switch part := part.(type) { switch part := part.(type) {
case string: case string:
str = part buf.WriteString(part)
case int: case int:
argIdx := part - 1 argIdx := part - 1
var p []byte
if argIdx < 0 { if argIdx < 0 {
return "", fmt.Errorf("first sql argument must be > 0") return "", fmt.Errorf("first sql argument must be > 0")
} }
@ -43,34 +60,41 @@ func (q *Query) Sanitize(args ...any) (string, error) {
if argIdx >= len(args) { if argIdx >= len(args) {
return "", fmt.Errorf("insufficient arguments") return "", fmt.Errorf("insufficient arguments")
} }
// Prevent SQL injection via Line Comment Creation
// https://github.com/jackc/pgx/security/advisories/GHSA-m7wr-2xf7-cm9p
buf.WriteByte(' ')
arg := args[argIdx] arg := args[argIdx]
switch arg := arg.(type) { switch arg := arg.(type) {
case nil: case nil:
str = "null" p = null
case int64: case int64:
str = strconv.FormatInt(arg, 10) p = strconv.AppendInt(buf.AvailableBuffer(), arg, 10)
case float64: case float64:
str = strconv.FormatFloat(arg, 'f', -1, 64) p = strconv.AppendFloat(buf.AvailableBuffer(), arg, 'f', -1, 64)
case bool: case bool:
str = strconv.FormatBool(arg) p = strconv.AppendBool(buf.AvailableBuffer(), arg)
case []byte: case []byte:
str = QuoteBytes(arg) p = QuoteBytes(buf.AvailableBuffer(), arg)
case string: case string:
str = QuoteString(arg) p = QuoteString(buf.AvailableBuffer(), arg)
case time.Time: case time.Time:
str = arg.Truncate(time.Microsecond).Format("'2006-01-02 15:04:05.999999999Z07:00:00'") p = arg.Truncate(time.Microsecond).
AppendFormat(buf.AvailableBuffer(), "'2006-01-02 15:04:05.999999999Z07:00:00'")
default: default:
return "", fmt.Errorf("invalid arg type: %T", arg) return "", fmt.Errorf("invalid arg type: %T", arg)
} }
argUse[argIdx] = true argUse[argIdx] = true
buf.Write(p)
// Prevent SQL injection via Line Comment Creation // Prevent SQL injection via Line Comment Creation
// https://github.com/jackc/pgx/security/advisories/GHSA-m7wr-2xf7-cm9p // https://github.com/jackc/pgx/security/advisories/GHSA-m7wr-2xf7-cm9p
str = " " + str + " " buf.WriteByte(' ')
default: default:
return "", fmt.Errorf("invalid Part type: %T", part) return "", fmt.Errorf("invalid Part type: %T", part)
} }
buf.WriteString(str)
} }
for i, used := range argUse { for i, used := range argUse {
@ -82,26 +106,99 @@ func (q *Query) Sanitize(args ...any) (string, error) {
} }
func NewQuery(sql string) (*Query, error) { func NewQuery(sql string) (*Query, error) {
l := &sqlLexer{ query := &Query{}
src: sql, query.init(sql)
stateFn: rawState,
return query, nil
} }
var sqlLexerPool = &pool[*sqlLexer]{
new: func() *sqlLexer {
return &sqlLexer{}
},
reset: func(sl *sqlLexer) bool {
*sl = sqlLexer{}
return true
},
}
func (q *Query) init(sql string) {
parts := q.Parts[:0]
if parts == nil {
// dirty, but fast heuristic to preallocate for ~90% usecases
n := strings.Count(sql, "$") + strings.Count(sql, "--") + 1
parts = make([]Part, 0, n)
}
l := sqlLexerPool.get()
defer sqlLexerPool.put(l)
l.src = sql
l.stateFn = rawState
l.parts = parts
for l.stateFn != nil { for l.stateFn != nil {
l.stateFn = l.stateFn(l) l.stateFn = l.stateFn(l)
} }
query := &Query{Parts: l.parts} q.Parts = l.parts
return query, nil
} }
func QuoteString(str string) string { func QuoteString(dst []byte, str string) []byte {
return "'" + strings.ReplaceAll(str, "'", "''") + "'" const quote = '\''
// Preallocate space for the worst case scenario
dst = slices.Grow(dst, len(str)*2+2)
// Add opening quote
dst = append(dst, quote)
// Iterate through the string without allocating
for i := 0; i < len(str); i++ {
if str[i] == quote {
dst = append(dst, quote, quote)
} else {
dst = append(dst, str[i])
}
} }
func QuoteBytes(buf []byte) string { // Add closing quote
return `'\x` + hex.EncodeToString(buf) + "'" dst = append(dst, quote)
return dst
}
func QuoteBytes(dst, buf []byte) []byte {
if len(buf) == 0 {
return append(dst, `'\x'`...)
}
// Calculate required length
requiredLen := 3 + hex.EncodedLen(len(buf)) + 1
// Ensure dst has enough capacity
if cap(dst)-len(dst) < requiredLen {
newDst := make([]byte, len(dst), len(dst)+requiredLen)
copy(newDst, dst)
dst = newDst
}
// Record original length and extend slice
origLen := len(dst)
dst = dst[:origLen+requiredLen]
// Add prefix
dst[origLen] = '\''
dst[origLen+1] = '\\'
dst[origLen+2] = 'x'
// Encode bytes directly into dst
hex.Encode(dst[origLen+3:len(dst)-1], buf)
// Add suffix
dst[len(dst)-1] = '\''
return dst
} }
type sqlLexer struct { type sqlLexer struct {
@ -319,13 +416,45 @@ func multilineCommentState(l *sqlLexer) stateFn {
} }
} }
var queryPool = &pool[*Query]{
new: func() *Query {
return &Query{}
},
reset: func(q *Query) bool {
n := len(q.Parts)
q.Parts = q.Parts[:0]
return n < 64 // drop too large queries
},
}
// SanitizeSQL replaces placeholder values with args. It quotes and escapes args // SanitizeSQL replaces placeholder values with args. It quotes and escapes args
// as necessary. This function is only safe when standard_conforming_strings is // as necessary. This function is only safe when standard_conforming_strings is
// on. // on.
func SanitizeSQL(sql string, args ...any) (string, error) { func SanitizeSQL(sql string, args ...any) (string, error) {
query, err := NewQuery(sql) query := queryPool.get()
if err != nil { query.init(sql)
return "", err defer queryPool.put(query)
}
return query.Sanitize(args...) return query.Sanitize(args...)
} }
type pool[E any] struct {
p sync.Pool
new func() E
reset func(E) bool
}
func (pool *pool[E]) get() E {
v, ok := pool.p.Get().(E)
if !ok {
v = pool.new()
}
return v
}
func (p *pool[E]) put(v E) {
if p.reset(v) {
p.p.Put(v)
}
}

View File

@ -1,6 +1,7 @@
package pgconn package pgconn
import ( import (
"container/list"
"context" "context"
"crypto/md5" "crypto/md5"
"crypto/tls" "crypto/tls"
@ -267,12 +268,15 @@ func connectPreferred(ctx context.Context, config *Config, connectOneConfigs []*
var pgErr *PgError var pgErr *PgError
if errors.As(err, &pgErr) { if errors.As(err, &pgErr) {
const ERRCODE_INVALID_PASSWORD = "28P01" // wrong password // pgx will try next host even if libpq does not in certain cases (see #2246)
const ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION = "28000" // wrong password or bad pg_hba.conf settings // consider change for the next major version
const ERRCODE_INVALID_PASSWORD = "28P01"
const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist
const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege
// auth failed due to invalid password, db does not exist or user has no permission
if pgErr.Code == ERRCODE_INVALID_PASSWORD || if pgErr.Code == ERRCODE_INVALID_PASSWORD ||
pgErr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION && c.tlsConfig != nil ||
pgErr.Code == ERRCODE_INVALID_CATALOG_NAME || pgErr.Code == ERRCODE_INVALID_CATALOG_NAME ||
pgErr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE { pgErr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE {
return nil, allErrors return nil, allErrors
@ -1410,7 +1414,6 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co
type MultiResultReader struct { type MultiResultReader struct {
pgConn *PgConn pgConn *PgConn
ctx context.Context ctx context.Context
pipeline *Pipeline
rr *ResultReader rr *ResultReader
@ -1443,12 +1446,8 @@ func (mrr *MultiResultReader) receiveMessage() (pgproto3.BackendMessage, error)
switch msg := msg.(type) { switch msg := msg.(type) {
case *pgproto3.ReadyForQuery: case *pgproto3.ReadyForQuery:
mrr.closed = true mrr.closed = true
if mrr.pipeline != nil {
mrr.pipeline.expectedReadyForQueryCount--
} else {
mrr.pgConn.contextWatcher.Unwatch() mrr.pgConn.contextWatcher.Unwatch()
mrr.pgConn.unlock() mrr.pgConn.unlock()
}
case *pgproto3.ErrorResponse: case *pgproto3.ErrorResponse:
mrr.err = ErrorResponseToPgError(msg) mrr.err = ErrorResponseToPgError(msg)
} }
@ -1672,7 +1671,11 @@ func (rr *ResultReader) receiveMessage() (msg pgproto3.BackendMessage, err error
case *pgproto3.EmptyQueryResponse: case *pgproto3.EmptyQueryResponse:
rr.concludeCommand(CommandTag{}, nil) rr.concludeCommand(CommandTag{}, nil)
case *pgproto3.ErrorResponse: case *pgproto3.ErrorResponse:
rr.concludeCommand(CommandTag{}, ErrorResponseToPgError(msg)) pgErr := ErrorResponseToPgError(msg)
if rr.pipeline != nil {
rr.pipeline.state.HandleError(pgErr)
}
rr.concludeCommand(CommandTag{}, pgErr)
} }
return msg, nil return msg, nil
@ -1773,9 +1776,10 @@ func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultR
batch.buf, batch.err = (&pgproto3.Sync{}).Encode(batch.buf) batch.buf, batch.err = (&pgproto3.Sync{}).Encode(batch.buf)
if batch.err != nil { if batch.err != nil {
pgConn.contextWatcher.Unwatch()
multiResult.err = normalizeTimeoutError(multiResult.ctx, batch.err)
multiResult.closed = true multiResult.closed = true
multiResult.err = batch.err pgConn.asyncClose()
pgConn.unlock()
return multiResult return multiResult
} }
@ -1783,9 +1787,10 @@ func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultR
defer pgConn.exitPotentialWriteReadDeadlock() defer pgConn.exitPotentialWriteReadDeadlock()
_, err := pgConn.conn.Write(batch.buf) _, err := pgConn.conn.Write(batch.buf)
if err != nil { if err != nil {
pgConn.contextWatcher.Unwatch()
multiResult.err = normalizeTimeoutError(multiResult.ctx, err)
multiResult.closed = true multiResult.closed = true
multiResult.err = err pgConn.asyncClose()
pgConn.unlock()
return multiResult return multiResult
} }
@ -1999,9 +2004,7 @@ type Pipeline struct {
conn *PgConn conn *PgConn
ctx context.Context ctx context.Context
expectedReadyForQueryCount int state pipelineState
pendingSync bool
err error err error
closed bool closed bool
} }
@ -2012,6 +2015,122 @@ type PipelineSync struct{}
// CloseComplete is returned by GetResults when a CloseComplete message is received. // CloseComplete is returned by GetResults when a CloseComplete message is received.
type CloseComplete struct{} type CloseComplete struct{}
type pipelineRequestType int
const (
pipelineNil pipelineRequestType = iota
pipelinePrepare
pipelineQueryParams
pipelineQueryPrepared
pipelineDeallocate
pipelineSyncRequest
pipelineFlushRequest
)
type pipelineRequestEvent struct {
RequestType pipelineRequestType
WasSentToServer bool
BeforeFlushOrSync bool
}
type pipelineState struct {
requestEventQueue list.List
lastRequestType pipelineRequestType
pgErr *PgError
expectedReadyForQueryCount int
}
func (s *pipelineState) Init() {
s.requestEventQueue.Init()
s.lastRequestType = pipelineNil
}
func (s *pipelineState) RegisterSendingToServer() {
for elem := s.requestEventQueue.Back(); elem != nil; elem = elem.Prev() {
val := elem.Value.(pipelineRequestEvent)
if val.WasSentToServer {
return
}
val.WasSentToServer = true
elem.Value = val
}
}
func (s *pipelineState) registerFlushingBufferOnServer() {
for elem := s.requestEventQueue.Back(); elem != nil; elem = elem.Prev() {
val := elem.Value.(pipelineRequestEvent)
if val.BeforeFlushOrSync {
return
}
val.BeforeFlushOrSync = true
elem.Value = val
}
}
func (s *pipelineState) PushBackRequestType(req pipelineRequestType) {
if req == pipelineNil {
return
}
if req != pipelineFlushRequest {
s.requestEventQueue.PushBack(pipelineRequestEvent{RequestType: req})
}
if req == pipelineFlushRequest || req == pipelineSyncRequest {
s.registerFlushingBufferOnServer()
}
s.lastRequestType = req
if req == pipelineSyncRequest {
s.expectedReadyForQueryCount++
}
}
func (s *pipelineState) ExtractFrontRequestType() pipelineRequestType {
for {
elem := s.requestEventQueue.Front()
if elem == nil {
return pipelineNil
}
val := elem.Value.(pipelineRequestEvent)
if !(val.WasSentToServer && val.BeforeFlushOrSync) {
return pipelineNil
}
s.requestEventQueue.Remove(elem)
if val.RequestType == pipelineSyncRequest {
s.pgErr = nil
}
if s.pgErr == nil {
return val.RequestType
}
}
}
func (s *pipelineState) HandleError(err *PgError) {
s.pgErr = err
}
func (s *pipelineState) HandleReadyForQuery() {
s.expectedReadyForQueryCount--
}
func (s *pipelineState) PendingSync() bool {
var notPendingSync bool
if elem := s.requestEventQueue.Back(); elem != nil {
val := elem.Value.(pipelineRequestEvent)
notPendingSync = (val.RequestType == pipelineSyncRequest) && val.WasSentToServer
} else {
notPendingSync = (s.lastRequestType == pipelineSyncRequest) || (s.lastRequestType == pipelineNil)
}
return !notPendingSync
}
func (s *pipelineState) ExpectedReadyForQuery() int {
return s.expectedReadyForQueryCount
}
// StartPipeline switches the connection to pipeline mode and returns a *Pipeline. In pipeline mode requests can be sent // StartPipeline switches the connection to pipeline mode and returns a *Pipeline. In pipeline mode requests can be sent
// to the server without waiting for a response. Close must be called on the returned *Pipeline to return the connection // to the server without waiting for a response. Close must be called on the returned *Pipeline to return the connection
// to normal mode. While in pipeline mode, no methods that communicate with the server may be called except // to normal mode. While in pipeline mode, no methods that communicate with the server may be called except
@ -2020,16 +2139,21 @@ type CloseComplete struct{}
// Prefer ExecBatch when only sending one group of queries at once. // Prefer ExecBatch when only sending one group of queries at once.
func (pgConn *PgConn) StartPipeline(ctx context.Context) *Pipeline { func (pgConn *PgConn) StartPipeline(ctx context.Context) *Pipeline {
if err := pgConn.lock(); err != nil { if err := pgConn.lock(); err != nil {
return &Pipeline{ pipeline := &Pipeline{
closed: true, closed: true,
err: err, err: err,
} }
pipeline.state.Init()
return pipeline
} }
pgConn.pipeline = Pipeline{ pgConn.pipeline = Pipeline{
conn: pgConn, conn: pgConn,
ctx: ctx, ctx: ctx,
} }
pgConn.pipeline.state.Init()
pipeline := &pgConn.pipeline pipeline := &pgConn.pipeline
if ctx != context.Background() { if ctx != context.Background() {
@ -2052,10 +2176,10 @@ func (p *Pipeline) SendPrepare(name, sql string, paramOIDs []uint32) {
if p.closed { if p.closed {
return return
} }
p.pendingSync = true
p.conn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs}) p.conn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs})
p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name}) p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name})
p.state.PushBackRequestType(pipelinePrepare)
} }
// SendDeallocate deallocates a prepared statement. // SendDeallocate deallocates a prepared statement.
@ -2063,9 +2187,9 @@ func (p *Pipeline) SendDeallocate(name string) {
if p.closed { if p.closed {
return return
} }
p.pendingSync = true
p.conn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name}) p.conn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name})
p.state.PushBackRequestType(pipelineDeallocate)
} }
// SendQueryParams is the pipeline version of *PgConn.QueryParams. // SendQueryParams is the pipeline version of *PgConn.QueryParams.
@ -2073,12 +2197,12 @@ func (p *Pipeline) SendQueryParams(sql string, paramValues [][]byte, paramOIDs [
if p.closed { if p.closed {
return return
} }
p.pendingSync = true
p.conn.frontend.SendParse(&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}) p.conn.frontend.SendParse(&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs})
p.conn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) p.conn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats})
p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'})
p.conn.frontend.SendExecute(&pgproto3.Execute{}) p.conn.frontend.SendExecute(&pgproto3.Execute{})
p.state.PushBackRequestType(pipelineQueryParams)
} }
// SendQueryPrepared is the pipeline version of *PgConn.QueryPrepared. // SendQueryPrepared is the pipeline version of *PgConn.QueryPrepared.
@ -2086,11 +2210,42 @@ func (p *Pipeline) SendQueryPrepared(stmtName string, paramValues [][]byte, para
if p.closed { if p.closed {
return return
} }
p.pendingSync = true
p.conn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) p.conn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats})
p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'})
p.conn.frontend.SendExecute(&pgproto3.Execute{}) p.conn.frontend.SendExecute(&pgproto3.Execute{})
p.state.PushBackRequestType(pipelineQueryPrepared)
}
// SendFlushRequest sends a request for the server to flush its output buffer.
//
// The server flushes its output buffer automatically as a result of Sync being called,
// or on any request when not in pipeline mode; this function is useful to cause the server
// to flush its output buffer in pipeline mode without establishing a synchronization point.
// Note that the request is not itself flushed to the server automatically; use Flush if
// necessary. This copies the behavior of libpq PQsendFlushRequest.
func (p *Pipeline) SendFlushRequest() {
if p.closed {
return
}
p.conn.frontend.Send(&pgproto3.Flush{})
p.state.PushBackRequestType(pipelineFlushRequest)
}
// SendPipelineSync marks a synchronization point in a pipeline by sending a sync message
// without flushing the send buffer. This serves as the delimiter of an implicit
// transaction and an error recovery point.
//
// Note that the request is not itself flushed to the server automatically; use Flush if
// necessary. This copies the behavior of libpq PQsendPipelineSync.
func (p *Pipeline) SendPipelineSync() {
if p.closed {
return
}
p.conn.frontend.SendSync(&pgproto3.Sync{})
p.state.PushBackRequestType(pipelineSyncRequest)
} }
// Flush flushes the queued requests without establishing a synchronization point. // Flush flushes the queued requests without establishing a synchronization point.
@ -2115,28 +2270,14 @@ func (p *Pipeline) Flush() error {
return err return err
} }
p.state.RegisterSendingToServer()
return nil return nil
} }
// Sync establishes a synchronization point and flushes the queued requests. // Sync establishes a synchronization point and flushes the queued requests.
func (p *Pipeline) Sync() error { func (p *Pipeline) Sync() error {
if p.closed { p.SendPipelineSync()
if p.err != nil { return p.Flush()
return p.err
}
return errors.New("pipeline closed")
}
p.conn.frontend.SendSync(&pgproto3.Sync{})
err := p.Flush()
if err != nil {
return err
}
p.pendingSync = false
p.expectedReadyForQueryCount++
return nil
} }
// GetResults gets the next results. If results are present, results may be a *ResultReader, *StatementDescription, or // GetResults gets the next results. If results are present, results may be a *ResultReader, *StatementDescription, or
@ -2150,7 +2291,7 @@ func (p *Pipeline) GetResults() (results any, err error) {
return nil, errors.New("pipeline closed") return nil, errors.New("pipeline closed")
} }
if p.expectedReadyForQueryCount == 0 { if p.state.ExtractFrontRequestType() == pipelineNil {
return nil, nil return nil, nil
} }
@ -2195,13 +2336,13 @@ func (p *Pipeline) getResults() (results any, err error) {
case *pgproto3.CloseComplete: case *pgproto3.CloseComplete:
return &CloseComplete{}, nil return &CloseComplete{}, nil
case *pgproto3.ReadyForQuery: case *pgproto3.ReadyForQuery:
p.expectedReadyForQueryCount-- p.state.HandleReadyForQuery()
return &PipelineSync{}, nil return &PipelineSync{}, nil
case *pgproto3.ErrorResponse: case *pgproto3.ErrorResponse:
pgErr := ErrorResponseToPgError(msg) pgErr := ErrorResponseToPgError(msg)
p.state.HandleError(pgErr)
return nil, pgErr return nil, pgErr
} }
} }
} }
@ -2231,6 +2372,7 @@ func (p *Pipeline) getResultsPrepare() (*StatementDescription, error) {
// These should never happen here. But don't take chances that could lead to a deadlock. // These should never happen here. But don't take chances that could lead to a deadlock.
case *pgproto3.ErrorResponse: case *pgproto3.ErrorResponse:
pgErr := ErrorResponseToPgError(msg) pgErr := ErrorResponseToPgError(msg)
p.state.HandleError(pgErr)
return nil, pgErr return nil, pgErr
case *pgproto3.CommandComplete: case *pgproto3.CommandComplete:
p.conn.asyncClose() p.conn.asyncClose()
@ -2250,7 +2392,7 @@ func (p *Pipeline) Close() error {
p.closed = true p.closed = true
if p.pendingSync { if p.state.PendingSync() {
p.conn.asyncClose() p.conn.asyncClose()
p.err = errors.New("pipeline has unsynced requests") p.err = errors.New("pipeline has unsynced requests")
p.conn.contextWatcher.Unwatch() p.conn.contextWatcher.Unwatch()
@ -2259,7 +2401,7 @@ func (p *Pipeline) Close() error {
return p.err return p.err
} }
for p.expectedReadyForQueryCount > 0 { for p.state.ExpectedReadyForQuery() > 0 {
_, err := p.getResults() _, err := p.getResults()
if err != nil { if err != nil {
p.err = err p.err = err

View File

@ -12,7 +12,7 @@ type PasswordMessage struct {
// Frontend identifies this message as sendable by a PostgreSQL frontend. // Frontend identifies this message as sendable by a PostgreSQL frontend.
func (*PasswordMessage) Frontend() {} func (*PasswordMessage) Frontend() {}
// Frontend identifies this message as an authentication response. // InitialResponse identifies this message as an authentication response.
func (*PasswordMessage) InitialResponse() {} func (*PasswordMessage) InitialResponse() {}
// Decode decodes src into dst. src must contain the complete message with the exception of the initial 1 byte message // Decode decodes src into dst. src must contain the complete message with the exception of the initial 1 byte message

View File

@ -53,8 +53,8 @@ similar fashion to database/sql. The second is to use a pointer to a pointer.
return err return err
} }
When using nullable pgtype types as parameters for queries, one has to remember When using nullable pgtype types as parameters for queries, one has to remember to explicitly set their Valid field to
to explicitly set their Valid field to true, otherwise the parameter's value will be NULL. true, otherwise the parameter's value will be NULL.
JSON Support JSON Support
@ -159,11 +159,16 @@ example_child_records_test.go for an example.
Overview of Scanning Implementation Overview of Scanning Implementation
The first step is to use the OID to lookup the correct Codec. If the OID is unavailable, Map will try to find the OID The first step is to use the OID to lookup the correct Codec. The Map will call the Codec's PlanScan method to get a
from previous calls of Map.RegisterDefaultPgType. The Map will call the Codec's PlanScan method to get a plan for plan for scanning into the Go value. A Codec will support scanning into one or more Go types. Oftentime these Go types
scanning into the Go value. A Codec will support scanning into one or more Go types. Oftentime these Go types are are interfaces rather than explicit types. For example, PointCodec can use any Go type that implements the PointScanner
interfaces rather than explicit types. For example, PointCodec can use any Go type that implements the PointScanner and and PointValuer interfaces.
PointValuer interfaces.
If a Go value is not supported directly by a Codec then Map will try see if it is a sql.Scanner. If is then that
interface will be used to scan the value. Most sql.Scanners require the input to be in the text format (e.g. UUIDs and
numeric). However, pgx will typically have received the value in the binary format. In this case the binary value will be
parsed, reencoded as text, and then passed to the sql.Scanner. This may incur additional overhead for query results with
a large number of affected values.
If a Go value is not supported directly by a Codec then Map will try wrapping it with additional logic and try again. If a Go value is not supported directly by a Codec then Map will try wrapping it with additional logic and try again.
For example, Int8Codec does not support scanning into a renamed type (e.g. type myInt64 int64). But Map will detect that For example, Int8Codec does not support scanning into a renamed type (e.g. type myInt64 int64). But Map will detect that

View File

@ -1,4 +1,5 @@
// Do not edit. Generated from pgtype/int.go.erb // Code generated from pgtype/int.go.erb. DO NOT EDIT.
package pgtype package pgtype
import ( import (

View File

@ -71,6 +71,27 @@ func (c *JSONCodec) PlanEncode(m *Map, oid uint32, format int16, value any) Enco
} }
} }
// JSON needs its on scan plan for pointers to handle 'null'::json(b).
// Consider making pointerPointerScanPlan more flexible in the future.
type jsonPointerScanPlan struct {
next ScanPlan
}
func (p jsonPointerScanPlan) Scan(src []byte, dst any) error {
el := reflect.ValueOf(dst).Elem()
if src == nil || string(src) == "null" {
el.SetZero()
return nil
}
el.Set(reflect.New(el.Type().Elem()))
if p.next != nil {
return p.next.Scan(src, el.Interface())
}
return nil
}
type encodePlanJSONCodecEitherFormatString struct{} type encodePlanJSONCodecEitherFormatString struct{}
func (encodePlanJSONCodecEitherFormatString) Encode(value any, buf []byte) (newBuf []byte, err error) { func (encodePlanJSONCodecEitherFormatString) Encode(value any, buf []byte) (newBuf []byte, err error) {
@ -117,58 +138,36 @@ func (e *encodePlanJSONCodecEitherFormatMarshal) Encode(value any, buf []byte) (
return buf, nil return buf, nil
} }
func (c *JSONCodec) PlanScan(m *Map, oid uint32, format int16, target any) ScanPlan { func (c *JSONCodec) PlanScan(m *Map, oid uint32, formatCode int16, target any) ScanPlan {
return c.planScan(m, oid, formatCode, target, 0)
}
// JSON cannot fallback to pointerPointerScanPlan because of 'null'::json(b),
// so we need to duplicate the logic here.
func (c *JSONCodec) planScan(m *Map, oid uint32, formatCode int16, target any, depth int) ScanPlan {
if depth > 8 {
return &scanPlanFail{m: m, oid: oid, formatCode: formatCode}
}
switch target.(type) { switch target.(type) {
case *string: case *string:
return scanPlanAnyToString{} return &scanPlanAnyToString{}
case **string:
// This is to fix **string scanning. It seems wrong to special case **string, but it's not clear what a better
// solution would be.
//
// https://github.com/jackc/pgx/issues/1470 -- **string
// https://github.com/jackc/pgx/issues/1691 -- ** anything else
if wrapperPlan, nextDst, ok := TryPointerPointerScanPlan(target); ok {
if nextPlan := m.planScan(oid, format, nextDst, 0); nextPlan != nil {
if _, failed := nextPlan.(*scanPlanFail); !failed {
wrapperPlan.SetNext(nextPlan)
return wrapperPlan
}
}
}
case *[]byte: case *[]byte:
return scanPlanJSONToByteSlice{} return &scanPlanJSONToByteSlice{}
case BytesScanner: case BytesScanner:
return scanPlanBinaryBytesToBytesScanner{} return &scanPlanBinaryBytesToBytesScanner{}
case sql.Scanner:
return &scanPlanSQLScanner{formatCode: formatCode}
} }
// Cannot rely on sql.Scanner being handled later because scanPlanJSONToJSONUnmarshal will take precedence. rv := reflect.ValueOf(target)
// if rv.Kind() == reflect.Pointer && rv.Elem().Kind() == reflect.Pointer {
// https://github.com/jackc/pgx/issues/1418 var plan jsonPointerScanPlan
if isSQLScanner(target) { plan.next = c.planScan(m, oid, formatCode, rv.Elem().Interface(), depth+1)
return &scanPlanSQLScanner{formatCode: format} return plan
} else {
return &scanPlanJSONToJSONUnmarshal{unmarshal: c.Unmarshal}
} }
return &scanPlanJSONToJSONUnmarshal{
unmarshal: c.Unmarshal,
}
}
// we need to check if the target is a pointer to a sql.Scanner (or any of the pointer ref tree implements a sql.Scanner).
//
// https://github.com/jackc/pgx/issues/2146
func isSQLScanner(v any) bool {
val := reflect.ValueOf(v)
for val.Kind() == reflect.Ptr {
if _, ok := val.Interface().(sql.Scanner); ok {
return true
}
val = val.Elem()
}
return false
} }
type scanPlanAnyToString struct{} type scanPlanAnyToString struct{}
@ -198,7 +197,7 @@ type scanPlanJSONToJSONUnmarshal struct {
} }
func (s *scanPlanJSONToJSONUnmarshal) Scan(src []byte, dst any) error { func (s *scanPlanJSONToJSONUnmarshal) Scan(src []byte, dst any) error {
if src == nil { if src == nil || string(src) == "null" {
dstValue := reflect.ValueOf(dst) dstValue := reflect.ValueOf(dst)
if dstValue.Kind() == reflect.Ptr { if dstValue.Kind() == reflect.Ptr {
el := dstValue.Elem() el := dstValue.Elem()
@ -212,7 +211,12 @@ func (s *scanPlanJSONToJSONUnmarshal) Scan(src []byte, dst any) error {
return fmt.Errorf("cannot scan NULL into %T", dst) return fmt.Errorf("cannot scan NULL into %T", dst)
} }
elem := reflect.ValueOf(dst).Elem() v := reflect.ValueOf(dst)
if v.Kind() != reflect.Pointer || v.IsNil() {
return fmt.Errorf("cannot scan into non-pointer or nil destinations %T", dst)
}
elem := v.Elem()
elem.Set(reflect.Zero(elem.Type())) elem.Set(reflect.Zero(elem.Type()))
return s.unmarshal(src, dst) return s.unmarshal(src, dst)

View File

@ -396,11 +396,7 @@ type scanPlanSQLScanner struct {
} }
func (plan *scanPlanSQLScanner) Scan(src []byte, dst any) error { func (plan *scanPlanSQLScanner) Scan(src []byte, dst any) error {
scanner := getSQLScanner(dst) scanner := dst.(sql.Scanner)
if scanner == nil {
return fmt.Errorf("cannot scan into %T", dst)
}
if src == nil { if src == nil {
// This is necessary because interface value []byte:nil does not equal nil:nil for the binary format path and the // This is necessary because interface value []byte:nil does not equal nil:nil for the binary format path and the
@ -413,21 +409,6 @@ func (plan *scanPlanSQLScanner) Scan(src []byte, dst any) error {
} }
} }
// we don't know if the target is a sql.Scanner or a pointer on a sql.Scanner, so we need to check recursively
func getSQLScanner(target any) sql.Scanner {
val := reflect.ValueOf(target)
for val.Kind() == reflect.Ptr {
if _, ok := val.Interface().(sql.Scanner); ok {
if val.IsNil() {
val.Set(reflect.New(val.Type().Elem()))
}
return val.Interface().(sql.Scanner)
}
val = val.Elem()
}
return nil
}
type scanPlanString struct{} type scanPlanString struct{}
func (scanPlanString) Scan(src []byte, dst any) error { func (scanPlanString) Scan(src []byte, dst any) error {

View File

@ -91,7 +91,25 @@ func initDefaultMap() {
defaultMap.RegisterType(&Type{Name: "varchar", OID: VarcharOID, Codec: TextCodec{}}) defaultMap.RegisterType(&Type{Name: "varchar", OID: VarcharOID, Codec: TextCodec{}})
defaultMap.RegisterType(&Type{Name: "xid", OID: XIDOID, Codec: Uint32Codec{}}) defaultMap.RegisterType(&Type{Name: "xid", OID: XIDOID, Codec: Uint32Codec{}})
defaultMap.RegisterType(&Type{Name: "xid8", OID: XID8OID, Codec: Uint64Codec{}}) defaultMap.RegisterType(&Type{Name: "xid8", OID: XID8OID, Codec: Uint64Codec{}})
defaultMap.RegisterType(&Type{Name: "xml", OID: XMLOID, Codec: &XMLCodec{Marshal: xml.Marshal, Unmarshal: xml.Unmarshal}}) defaultMap.RegisterType(&Type{Name: "xml", OID: XMLOID, Codec: &XMLCodec{
Marshal: xml.Marshal,
// xml.Unmarshal does not support unmarshalling into *any. However, XMLCodec.DecodeValue calls Unmarshal with a
// *any. Wrap xml.Marshal with a function that copies the data into a new byte slice in this case. Not implementing
// directly in XMLCodec.DecodeValue to allow for the unlikely possibility that someone uses an alternative XML
// unmarshaler that does support unmarshalling into *any.
//
// https://github.com/jackc/pgx/issues/2227
// https://github.com/jackc/pgx/pull/2228
Unmarshal: func(data []byte, v any) error {
if v, ok := v.(*any); ok {
dstBuf := make([]byte, len(data))
copy(dstBuf, data)
*v = dstBuf
return nil
}
return xml.Unmarshal(data, v)
},
}})
// Range types // Range types
defaultMap.RegisterType(&Type{Name: "daterange", OID: DaterangeOID, Codec: &RangeCodec{ElementType: defaultMap.oidToType[DateOID]}}) defaultMap.RegisterType(&Type{Name: "daterange", OID: DaterangeOID, Codec: &RangeCodec{ElementType: defaultMap.oidToType[DateOID]}})

View File

@ -12,6 +12,7 @@ import (
) )
const pgTimestampFormat = "2006-01-02 15:04:05.999999999" const pgTimestampFormat = "2006-01-02 15:04:05.999999999"
const jsonISO8601 = "2006-01-02T15:04:05.999999999"
type TimestampScanner interface { type TimestampScanner interface {
ScanTimestamp(v Timestamp) error ScanTimestamp(v Timestamp) error
@ -76,7 +77,7 @@ func (ts Timestamp) MarshalJSON() ([]byte, error) {
switch ts.InfinityModifier { switch ts.InfinityModifier {
case Finite: case Finite:
s = ts.Time.Format(time.RFC3339Nano) s = ts.Time.Format(jsonISO8601)
case Infinity: case Infinity:
s = "infinity" s = "infinity"
case NegativeInfinity: case NegativeInfinity:
@ -104,15 +105,23 @@ func (ts *Timestamp) UnmarshalJSON(b []byte) error {
case "-infinity": case "-infinity":
*ts = Timestamp{Valid: true, InfinityModifier: -Infinity} *ts = Timestamp{Valid: true, InfinityModifier: -Infinity}
default: default:
// PostgreSQL uses ISO 8601 wihout timezone for to_json function and casting from a string to timestampt // Parse time with or without timezonr
tim, err := time.Parse(time.RFC3339Nano, *s+"Z") tss := *s
if err != nil { // PostgreSQL uses ISO 8601 without timezone for to_json function and casting from a string to timestampt
return err tim, err := time.Parse(time.RFC3339Nano, tss)
} if err == nil {
*ts = Timestamp{Time: tim, Valid: true} *ts = Timestamp{Time: tim, Valid: true}
return nil
}
tim, err = time.ParseInLocation(jsonISO8601, tss, time.UTC)
if err == nil {
*ts = Timestamp{Time: tim, Valid: true}
return nil
}
ts.Valid = false
return fmt.Errorf("cannot unmarshal %s to timestamp with layout %s or %s (%w)",
*s, time.RFC3339Nano, jsonISO8601, err)
} }
return nil return nil
} }

View File

@ -17,6 +17,7 @@ import (
var defaultMaxConns = int32(4) var defaultMaxConns = int32(4)
var defaultMinConns = int32(0) var defaultMinConns = int32(0)
var defaultMinIdleConns = int32(0)
var defaultMaxConnLifetime = time.Hour var defaultMaxConnLifetime = time.Hour
var defaultMaxConnIdleTime = time.Minute * 30 var defaultMaxConnIdleTime = time.Minute * 30
var defaultHealthCheckPeriod = time.Minute var defaultHealthCheckPeriod = time.Minute
@ -87,6 +88,7 @@ type Pool struct {
afterRelease func(*pgx.Conn) bool afterRelease func(*pgx.Conn) bool
beforeClose func(*pgx.Conn) beforeClose func(*pgx.Conn)
minConns int32 minConns int32
minIdleConns int32
maxConns int32 maxConns int32
maxConnLifetime time.Duration maxConnLifetime time.Duration
maxConnLifetimeJitter time.Duration maxConnLifetimeJitter time.Duration
@ -144,6 +146,13 @@ type Config struct {
// to create new connections. // to create new connections.
MinConns int32 MinConns int32
// MinIdleConns is the minimum number of idle connections in the pool. You can increase this to ensure that
// there are always idle connections available. This can help reduce tail latencies during request processing,
// as you can avoid the latency of establishing a new connection while handling requests. It is superior
// to MinConns for this purpose.
// Similar to MinConns, the pool might temporarily dip below MinIdleConns after connection closes.
MinIdleConns int32
// HealthCheckPeriod is the duration between checks of the health of idle connections. // HealthCheckPeriod is the duration between checks of the health of idle connections.
HealthCheckPeriod time.Duration HealthCheckPeriod time.Duration
@ -189,6 +198,7 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
afterRelease: config.AfterRelease, afterRelease: config.AfterRelease,
beforeClose: config.BeforeClose, beforeClose: config.BeforeClose,
minConns: config.MinConns, minConns: config.MinConns,
minIdleConns: config.MinIdleConns,
maxConns: config.MaxConns, maxConns: config.MaxConns,
maxConnLifetime: config.MaxConnLifetime, maxConnLifetime: config.MaxConnLifetime,
maxConnLifetimeJitter: config.MaxConnLifetimeJitter, maxConnLifetimeJitter: config.MaxConnLifetimeJitter,
@ -271,7 +281,8 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
} }
go func() { go func() {
p.createIdleResources(ctx, int(p.minConns)) targetIdleResources := max(int(p.minConns), int(p.minIdleConns))
p.createIdleResources(ctx, targetIdleResources)
p.backgroundHealthCheck() p.backgroundHealthCheck()
}() }()
@ -334,6 +345,17 @@ func ParseConfig(connString string) (*Config, error) {
config.MinConns = defaultMinConns config.MinConns = defaultMinConns
} }
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_min_idle_conns"]; ok {
delete(connConfig.Config.RuntimeParams, "pool_min_idle_conns")
n, err := strconv.ParseInt(s, 10, 32)
if err != nil {
return nil, fmt.Errorf("cannot parse pool_min_idle_conns: %w", err)
}
config.MinIdleConns = int32(n)
} else {
config.MinIdleConns = defaultMinIdleConns
}
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok { if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok {
delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime") delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime")
d, err := time.ParseDuration(s) d, err := time.ParseDuration(s)
@ -472,7 +494,9 @@ func (p *Pool) checkMinConns() error {
// TotalConns can include ones that are being destroyed but we should have // TotalConns can include ones that are being destroyed but we should have
// sleep(500ms) around all of the destroys to help prevent that from throwing // sleep(500ms) around all of the destroys to help prevent that from throwing
// off this check // off this check
toCreate := p.minConns - p.Stat().TotalConns()
// Create the number of connections needed to get to both minConns and minIdleConns
toCreate := max(p.minConns-p.Stat().TotalConns(), p.minIdleConns-p.Stat().IdleConns())
if toCreate > 0 { if toCreate > 0 {
return p.createIdleResources(context.Background(), int(toCreate)) return p.createIdleResources(context.Background(), int(toCreate))
} }

View File

@ -82,3 +82,10 @@ func (s *Stat) MaxLifetimeDestroyCount() int64 {
func (s *Stat) MaxIdleDestroyCount() int64 { func (s *Stat) MaxIdleDestroyCount() int64 {
return s.idleDestroyCount return s.idleDestroyCount
} }
// EmptyAcquireWaitTime returns the cumulative time waited for successful acquires
// from the pool for a resource to be released or constructed because the pool was
// empty.
func (s *Stat) EmptyAcquireWaitTime() time.Duration {
return s.s.EmptyAcquireWaitTime()
}

View File

@ -272,7 +272,7 @@ func (rows *baseRows) Scan(dest ...any) error {
err := rows.scanPlans[i].Scan(values[i], dst) err := rows.scanPlans[i].Scan(values[i], dst)
if err != nil { if err != nil {
err = ScanArgError{ColumnIndex: i, Err: err} err = ScanArgError{ColumnIndex: i, FieldName: fieldDescriptions[i].Name, Err: err}
rows.fatal(err) rows.fatal(err)
return err return err
} }
@ -334,13 +334,18 @@ func (rows *baseRows) Conn() *Conn {
type ScanArgError struct { type ScanArgError struct {
ColumnIndex int ColumnIndex int
FieldName string
Err error Err error
} }
func (e ScanArgError) Error() string { func (e ScanArgError) Error() string {
if e.FieldName == "?column?" { // Don't include the fieldname if it's unknown
return fmt.Sprintf("can't scan into dest[%d]: %v", e.ColumnIndex, e.Err) return fmt.Sprintf("can't scan into dest[%d]: %v", e.ColumnIndex, e.Err)
} }
return fmt.Sprintf("can't scan into dest[%d] (col: %s): %v", e.ColumnIndex, e.FieldName, e.Err)
}
func (e ScanArgError) Unwrap() error { func (e ScanArgError) Unwrap() error {
return e.Err return e.Err
} }
@ -366,7 +371,7 @@ func ScanRow(typeMap *pgtype.Map, fieldDescriptions []pgconn.FieldDescription, v
err := typeMap.Scan(fieldDescriptions[i].DataTypeOID, fieldDescriptions[i].Format, values[i], d) err := typeMap.Scan(fieldDescriptions[i].DataTypeOID, fieldDescriptions[i].Format, values[i], d)
if err != nil { if err != nil {
return ScanArgError{ColumnIndex: i, Err: err} return ScanArgError{ColumnIndex: i, FieldName: fieldDescriptions[i].Name, Err: err}
} }
} }
@ -468,6 +473,8 @@ func CollectOneRow[T any](rows Rows, fn RowToFunc[T]) (T, error) {
return value, err return value, err
} }
// The defer rows.Close() won't have executed yet. If the query returned more than one row, rows would still be open.
// rows.Close() must be called before rows.Err() so we explicitly call it here.
rows.Close() rows.Close()
return value, rows.Err() return value, rows.Err()
} }

View File

@ -3,7 +3,6 @@ package pgx
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"strconv" "strconv"
"strings" "strings"
@ -103,7 +102,7 @@ func (c *Conn) BeginTx(ctx context.Context, txOptions TxOptions) (Tx, error) {
if err != nil { if err != nil {
// begin should never fail unless there is an underlying connection issue or // begin should never fail unless there is an underlying connection issue or
// a context timeout. In either case, the connection is possibly broken. // a context timeout. In either case, the connection is possibly broken.
c.die(errors.New("failed to begin transaction")) c.die()
return nil, err return nil, err
} }
@ -216,7 +215,7 @@ func (tx *dbTx) Rollback(ctx context.Context) error {
tx.closed = true tx.closed = true
if err != nil { if err != nil {
// A rollback failure leaves the connection in an undefined state // A rollback failure leaves the connection in an undefined state
tx.conn.die(fmt.Errorf("rollback failed: %w", err)) tx.conn.die()
return err return err
} }

2
vendor/modules.txt vendored
View File

@ -587,7 +587,7 @@ github.com/jackc/pgpassfile
# github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 # github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761
## explicit; go 1.14 ## explicit; go 1.14
github.com/jackc/pgservicefile github.com/jackc/pgservicefile
# github.com/jackc/pgx/v5 v5.7.2 # github.com/jackc/pgx/v5 v5.7.3
## explicit; go 1.21 ## explicit; go 1.21
github.com/jackc/pgx/v5 github.com/jackc/pgx/v5
github.com/jackc/pgx/v5/internal/iobufpool github.com/jackc/pgx/v5/internal/iobufpool