Pony PostgreSQL Driver¶
Pure Pony driver for PostgreSQL. Beta-level — we feel good about it but it needs real-world usage. Expect breaking changes between releases.
Connecting¶
Create a Session with server and database connection info plus a
SessionStatusNotify to receive lifecycle events:
use "postgres"
use lori = "lori"
actor Main
new create(env: Env) =>
let session = Session(
ServerConnectInfo(lori.TCPConnectAuth(env.root), "localhost", "5432"),
DatabaseConnectInfo("myuser", "mypassword", "mydb"),
MyNotify(env))
actor MyNotify is (SessionStatusNotify & ResultReceiver)
let _env: Env
new create(env: Env) =>
_env = env
be pg_session_authenticated(session: Session) =>
session.execute(SimpleQuery("SELECT 1"), this)
be pg_query_result(session: Session, result: Result) =>
_env.out.print("Got a result!")
session.close()
be pg_query_failed(session: Session, query: Query,
failure: (ErrorResponseMessage | ClientQueryError))
=>
_env.out.print("Query failed")
session.close()
SessionStatusNotify callbacks are all optional (default no-op).
Implement the ones you need:
pg_session_connected— TCP connection establishedpg_session_connection_failed— the session failed to reach the ready state. Fires for any pre-ready failure: transport-level (DNS, TCP, TLS), unsupported authentication method, bad password, server rejection (e.g.,max_connectionsexhausted), or SCRAM server verification failurepg_session_authenticated— login succeededpg_transaction_status— fires on everyReadyForQuerywithTransactionIdle,TransactionInBlock, orTransactionFailedpg_notification— LISTEN/NOTIFY notificationspg_notice— non-fatal server notices (e.g.,DROP IF EXISTSon a nonexistent table,RAISE NOTICEfrom PL/pgSQL)pg_parameter_status— runtime parameter values (sent during startup and after SET commands)pg_session_shutdown— session has shut down
SSL/TLS¶
Two SSL modes are available:
SSLRequired— aborts if the server refuses SSL. Use when encryption is mandatory.SSLPreferred— attempts SSL, falls back to plaintext if the server refuses. Equivalent to PostgreSQL'ssslmode=prefer. A TLS handshake failure (server accepts but handshake fails) is NOT retried as plaintext —pg_session_connection_failedfires.
use "ssl/net"
let sslctx = recover val
SSLContext
.> set_client_verify(true)
.> set_authority(FilePath(FileAuth(env.root), "/path/to/ca.pem"))?
end
// Require SSL — fail if server refuses
let session = Session(
ServerConnectInfo(
lori.TCPConnectAuth(env.root), "localhost", "5432",
SSLRequired(sslctx)),
DatabaseConnectInfo("myuser", "mypassword", "mydb"),
MyNotify(env))
// Prefer SSL — fall back to plaintext if server refuses
let session2 = Session(
ServerConnectInfo(
lori.TCPConnectAuth(env.root), "localhost", "5432",
SSLPreferred(sslctx)),
DatabaseConnectInfo("myuser", "mypassword", "mydb"),
MyNotify(env))
Authentication¶
The driver supports cleartext password, MD5 password, and SCRAM-SHA-256
authentication. ServerConnectInfo.auth_requirement selects the client's
policy on what the server is allowed to ask for:
AuthRequireSCRAM(default) — rejectsAuthenticationOk(trust), cleartext, and MD5 challenges withpg_session_connection_failed(AuthenticationMethodRejected). SCRAM is the only PostgreSQL authentication method that verifies the server knows the password; requiring it closes a server-driven downgrade vector where a malicious or compromised server asks for a weaker scheme.AllowAnyAuth— accepts any authentication method the server offers. Required for connecting to servers configured for MD5, cleartext, or trust authentication.
The default is a breaking change from earlier releases. To connect to a
non-SCRAM server, pass AllowAnyAuth:
let session = Session(
ServerConnectInfo(
lori.TCPConnectAuth(env.root), "localhost", "5432",
SSLDisabled, AllowAnyAuth),
DatabaseConnectInfo("myuser", "mypassword", "mydb"),
MyNotify(env))
If authentication fails, the pg_session_connection_failed callback
fires with a ConnectionFailureReason. Authentication-specific variants:
AuthenticationMethodRejected— the server-offered method is disallowed by the session'sAuthRequirement(the defaultAuthRequireSCRAMrejectsAuthenticationOk, cleartext, and MD5)InvalidPassword— wrong password (SQLSTATE 28P01). Callresponse()for the fullErrorResponseMessageInvalidAuthorizationSpecification— nonexistent user, user not permitted to connect, or pg_hba.conf rejection (SQLSTATE 28000). Callresponse()for the fullErrorResponseMessageUnsupportedAuthenticationMethod— the server requested a method the driver doesn't support (e.g., Kerberos, GSSAPI). Distinct fromAuthenticationMethodRejected: that's the driver's policy refusing a method it could perform; this is the driver being unable to perform what the server asked forServerVerificationFailed— the server's SCRAM signature didn't match (possible MITM or misconfigured server)
Executing Queries¶
Three query types are available, all executed via session.execute():
-
SimpleQuery— an unparameterized SQL string. Can contain multiple semicolon-separated statements; each produces a separate result callback. -
PreparedQuery— a parameterized single statement using$1,$2, etc. Parameters areArray[FieldDataTypes] val— typed values (I16,I32,I64,F32,F64,Bool,Array[U8] val,PgArray,PgComposite,PgTimestamp,PgTime,PgDate,PgInterval) are sent in binary format with explicit OIDs, whileStringandNoneuse text format with server-inferred types. Uses an unnamed server-side prepared statement (created and destroyed per execution). -
NamedPreparedQuery— executes a previously prepared named statement (seesession.prepare()). Same typed parameter semantics asPreparedQuery. Use this when executing the same parameterized query many times to avoid repeated parsing.
For one-off parameterized queries, prefer PreparedQuery. Use
NamedPreparedQuery only when you need to reuse a prepared statement
across multiple executions.
Results arrive via ResultReceiver:
pg_query_resultdelivers aResult, which is one of:ResultSet— rows returned (SELECT, RETURNING, etc.)RowModifying— row count returned (INSERT, UPDATE, DELETE)-
SimpleResult— no data (empty query) -
pg_query_faileddelivers anErrorResponseMessage(server error) or aClientQueryError(SessionClosed,SessionNeverOpened,SessionNotAuthenticated,DataError).
Working with Results¶
ResultSet contains typed Rows:
be pg_query_result(session: Session, result: Result) =>
match result
| let rs: ResultSet =>
for row in rs.rows().values() do
for field in row.fields.values() do
match field.value
| let s: String => _env.out.print(field.name + ": " + s)
| let i: I32 => _env.out.print(field.name + ": " + i.string())
| let b: Bool => _env.out.print(field.name + ": " + b.string())
| let v: Bytea =>
_env.out.print(field.name + ": " + v.data.size().string() + " bytes")
| let a: PgArray =>
_env.out.print(field.name + ": " + a.string())
| let c: PgComposite =>
_env.out.print(field.name + ": " + c.string())
| let t: PgTimestamp => _env.out.print(field.name + ": " + t.string())
| let t: PgDate => _env.out.print(field.name + ": " + t.string())
| let t: PgTime => _env.out.print(field.name + ": " + t.string())
| let t: PgInterval => _env.out.print(field.name + ": " + t.string())
| None => _env.out.print(field.name + ": NULL")
// Also: I16, I64, F32, F64
end
end
end
| let rm: RowModifying =>
_env.out.print(rm.command() + ": " + rm.impacted().string() + " rows")
end
Field values are typed based on the PostgreSQL column OID:
bytea → Bytea, bool → Bool, int2 → I16, int4 → I32,
int8 → I64, float4 → F32, float8 → F64, date → PgDate,
time → PgTime, timestamp/timestamptz → PgTimestamp,
interval → PgInterval, array types → PgArray,
registered composite types → PgComposite, NULL → None.
Extended query results use binary format — unknown OIDs produce
RawBytes. Simple query results use text format — unknown OIDs
produce String.
timestamptz and query format: PreparedQuery results use binary
format where timestamptz values are always UTC microseconds.
SimpleQuery results use text format where the server renders the
value in the session's timezone and the driver strips the timezone
suffix — the resulting PgTimestamp microseconds represent
session-local time, not UTC. If your session timezone is not UTC and
you use both query types on the same timestamptz column, the
microsecond values will differ for the same row.
Named Prepared Statements¶
Prepare once, execute many times:
be pg_session_authenticated(session: Session) =>
session.prepare("find_user", "SELECT * FROM users WHERE id = $1", this)
be pg_statement_prepared(session: Session, name: String) =>
session.execute(
NamedPreparedQuery(name,
recover val [as FieldDataTypes: I32(42)] end),
this)
session.execute(
NamedPreparedQuery(name,
recover val [as FieldDataTypes: I32(99)] end),
this)
// When done, optionally: session.close_statement(name)
This requires implementing PrepareReceiver alongside
ResultReceiver.
Bulk Loading with COPY IN¶
session.copy_in() sends data to the server via the COPY FROM STDIN
protocol. It uses a pull-based flow — the session calls
pg_copy_ready on the CopyInReceiver to request each chunk:
be pg_session_authenticated(session: Session) =>
session.copy_in(
"COPY my_table (col1, col2) FROM STDIN WITH (FORMAT text)", this)
be pg_copy_ready(session: Session) =>
if _has_more_data then
session.send_copy_data("val1\tval2\n".array())
else
session.finish_copy()
end
be pg_copy_complete(session: Session, count: USize) =>
_env.out.print("Loaded " + count.string() + " rows")
Call session.abort_copy(reason) instead of finish_copy() to
abort the operation.
Bulk Export with COPY OUT¶
session.copy_out() exports data from the server via the COPY TO STDOUT
protocol. The server drives the flow — data arrives via pg_copy_data
callbacks on the CopyOutReceiver:
be pg_session_authenticated(session: Session) =>
session.copy_out("COPY my_table TO STDOUT", this)
be pg_copy_data(session: Session, data: Array[U8] val) =>
_buffer.append(data)
be pg_copy_complete(session: Session, count: USize) =>
_env.out.print("Exported " + count.string() + " rows")
Row Streaming¶
session.stream() delivers rows in windowed batches using the
extended query protocol's portal suspension mechanism. Unlike
execute() which buffers all rows before delivery, streaming enables
pull-based paged result consumption with bounded memory:
be pg_session_authenticated(session: Session) =>
session.stream(
PreparedQuery("SELECT * FROM big_table",
recover val Array[FieldDataTypes] end),
100, this) // window_size = 100 rows per batch
be pg_stream_batch(session: Session, rows: Rows) =>
// Process this batch of up to 100 rows
for row in rows.values() do
// ...
end
session.fetch_more() // Pull the next batch
be pg_stream_complete(session: Session) =>
_env.out.print("All rows processed")
Call session.close_stream() to end streaming early. Only
PreparedQuery and NamedPreparedQuery are supported — streaming
requires the extended query protocol.
Query Pipelining¶
session.pipeline() sends multiple queries to the server in a single TCP
write, reducing round-trip latency from N round trips to 1. Each query has
its own Sync boundary for error isolation — if one query fails, subsequent
queries continue executing.
Only PreparedQuery and NamedPreparedQuery are supported. Results arrive
via PipelineReceiver:
pg_pipeline_result— individual query succeeded, with its pipeline indexpg_pipeline_failed— individual query failed, with its pipeline indexpg_pipeline_complete— all queries processed (always fires last)
be pg_session_authenticated(session: Session) =>
let queries = recover val
[as (PreparedQuery | NamedPreparedQuery):
PreparedQuery("SELECT * FROM users WHERE id = $1",
recover val [as FieldDataTypes: I32(1)] end)
PreparedQuery("SELECT * FROM users WHERE id = $1",
recover val [as FieldDataTypes: I32(2)] end)
PreparedQuery("SELECT * FROM users WHERE id = $1",
recover val [as FieldDataTypes: I32(3)] end)
]
end
session.pipeline(queries, this)
be pg_pipeline_result(session: Session, index: USize, result: Result) =>
_env.out.print("Query " + index.string() + " succeeded")
be pg_pipeline_failed(session: Session, index: USize,
query: (PreparedQuery | NamedPreparedQuery),
failure: (ErrorResponseMessage | ClientQueryError))
=>
_env.out.print("Query " + index.string() + " failed")
be pg_pipeline_complete(session: Session) =>
_env.out.print("All pipeline queries processed")
Query Cancellation¶
session.cancel() requests cancellation of the currently executing
query. Cancellation is best-effort — the server may or may not honor
it. If cancelled, the query's ResultReceiver receives
pg_query_failed with SQLSTATE 57014. Queued queries are not
affected.
Array Types¶
1-dimensional PostgreSQL arrays are automatically decoded into PgArray
values. All built-in element types are supported (int2, int4, int8,
float4, float8, bool, text, bytea, date, time, timestamp, timestamptz,
interval, uuid, jsonb, numeric, and text-like types).
PgArray can also be used as a query parameter:
let arr = PgArray(23,
recover val [as (FieldData | None): I32(1); I32(2); None; I32(4)] end)
session.execute(PreparedQuery("SELECT $1::int4[]",
recover val [as FieldDataTypes: arr] end), receiver)
For custom array types (arrays of custom codec-registered OIDs), use
CodecRegistry.with_array_type():
Composite Types¶
User-defined composite types (created with CREATE TYPE ... AS (...)) are
decoded as PgComposite when registered with CodecRegistry.with_composite_type().
Register the composite OID and its field descriptors (name + OID pairs):
// CREATE TYPE address AS (street text, city text, zip_code int4)
// OID discovered via: SELECT oid FROM pg_type WHERE typname = 'address'
let registry = CodecRegistry
.with_composite_type(16400,
recover val
[as (String, U32): ("street", 25); ("city", 25); ("zip_code", 23)]
end)?
.with_array_type(16401, 16400)? // address[]
let session = Session(server_info, db_info, notify where registry = registry)
Access fields by position or name:
match field.value
| let addr: PgComposite =>
match try addr(0)? end // positional
| let street: String => // ...
end
match try addr.field("city")? end // named
| let city: String => // ...
end
end
PgComposite can also be sent as a query parameter using from_fields
for safe construction:
let addr = PgComposite.from_fields(16400,
recover val
[as (String, U32, (FieldData | None)):
("street", 25, "123 Main St")
("city", 25, "Springfield")
("zip_code", 23, I32(62704))]
end)
session.execute(PreparedQuery("INSERT INTO users (home) VALUES ($1)",
recover val [as FieldDataTypes: addr] end), receiver)
Nested composites and composite arrays are supported. Both PreparedQuery
(binary format) and SimpleQuery (text format) decode registered
composites.
Custom Codecs¶
Extend the driver with custom type decoders. Implement Codec to decode
a PostgreSQL type, then register it with CodecRegistry.with_codec():
class val Point is FieldData
let x: F64
let y: F64
new val create(x': F64, y': F64) =>
x = x'
y = y'
fun string(): String iso^ =>
recover iso String .> append("(" + x.string() + "," + y.string() + ")") end
primitive PointBinaryCodec is Codec
fun format(): U16 => 1 // binary
fun encode(value: FieldDataTypes): Array[U8] val ? =>
error // encode not needed for result-only types
fun decode(data: Array[U8] val): FieldData ? =>
if data.size() != 16 then error end
let x = ifdef bigendian then
F64.from_bits(data.read_u64(0)?)
else
F64.from_bits(data.read_u64(0)?.bswap())
end
let y = ifdef bigendian then
F64.from_bits(data.read_u64(8)?)
else
F64.from_bits(data.read_u64(8)?.bswap())
end
Point(x, y)
// Register and pass to Session
let registry = CodecRegistry.with_codec(600, PointBinaryCodec)?
let session = Session(server_info, db_info, notify where registry = registry)
Result fields from decoded custom types can be matched directly:
Custom types that need to participate in Field.eq() comparisons should
also implement FieldDataEquatable.
Startup Rejection¶
pg_session_connection_failed also fires for non-authentication
rejections the server raises before the session reaches the ready state:
TooManyConnections— server'smax_connectionshas been reached (SQLSTATE 53300). Carries the fullErrorResponseMessageInvalidDatabaseName— the requested database does not exist (SQLSTATE 3D000). Carries the fullErrorResponseMessageServerRejected— fallback for any other server ErrorResponse during startup. Callresponse()for the fullErrorResponseMessage; inspectresponse().code(SQLSTATE) to distinguish specific failure modes
Connection and Transport Failures¶
pg_session_connection_failed also fires when the session never reaches
the server or fails to complete TLS negotiation:
ConnectionFailedDNS— name resolution failedConnectionFailedTCP— TCP connection refused or unreachableConnectionFailedTimeout— TCP/TLS did not complete within the optionalconnection_timeoutConnectionFailedTimerError— the connect-timeout timer could not be armedSSLServerRefused— the server refused the SSLRequest and the client was configured withSSLRequiredTLSHandshakeFailed— the TLS handshake failedTLSAuthFailed— TLS certificate or authentication verification failed
Supported Features¶
- Simple and extended query protocols
- Typed parameterized queries with binary encoding for numeric, boolean, bytea, and temporal types (unnamed and named prepared statements)
- SSL/TLS via
SSLRequiredandSSLPreferred - Cleartext password, MD5, and SCRAM-SHA-256 authentication
- Transaction status tracking (
TransactionStatus) - LISTEN/NOTIFY notifications
- NoticeResponse delivery (non-fatal server messages)
- COPY FROM STDIN (bulk data loading)
- COPY TO STDOUT (bulk data export)
- Row streaming (windowed batch delivery)
- Query pipelining (batched multi-query execution)
- Query cancellation
- Statement timeout (automatic cancellation after a deadline)
- Connection timeout (bounded TCP connection phase)
- ParameterStatus tracking (server runtime parameters)
- 1-dimensional array types (decode and encode via
PgArray) - User-defined enum types via
CodecRegistry.with_enum_type() - User-defined composite types (decode and encode via
PgComposite) - Custom codecs via
CodecRegistry.with_codec() - Custom array types via
CodecRegistry.with_array_type()
Public Types¶
- primitive AllowAnyAuth
- primitive AuthRequireSCRAM
- type AuthRequirement
- primitive AuthenticationMethodRejected
- class Bytea
- type ClientQueryError
- interface Codec
- class CodecRegistry
- primitive ConnectionClosedByServer
- primitive ConnectionFailedDNS
- primitive ConnectionFailedTCP
- primitive ConnectionFailedTimeout
- primitive ConnectionFailedTimerError
- type ConnectionFailureReason
- interface CopyInReceiver
- interface CopyOutReceiver
- primitive DataError
- class DatabaseConnectInfo
- class ErrorResponseMessage
- class Field
- interface FieldData
- interface FieldDataEquatable
- type FieldDataTypes
- class InvalidAuthorizationSpecification
- class InvalidDatabaseName
- class InvalidPassword
- type MakePgTimeMicroseconds
- class NamedPreparedQuery
- class NoticeResponseMessage
- class Notification
- class ParameterStatus
- class PgArray
- class PgComposite
- class PgDate
- class PgInterval
- class PgTime
- type PgTimeMicroseconds
- primitive PgTimeValidator
- class PgTimestamp
- interface PipelineReceiver
- interface PrepareReceiver
- class PreparedQuery
- primitive ProtocolViolation
- type Query
- class RawBytes
- type Result
- interface ResultReceiver
- class ResultSet
- class Row
- class RowIterator
- class RowModifying
- class Rows
- primitive SSLDisabled
- type SSLMode
- class SSLPreferred
- class SSLRequired
- primitive SSLServerRefused
- class ServerConnectInfo
- class ServerRejected
- primitive ServerVerificationFailed
- actor Session
- primitive SessionClosed
- primitive SessionNeverOpened
- primitive SessionNotAuthenticated
- interface SessionStatusNotify
- class SimpleQuery
- class SimpleResult
- interface StreamingResultReceiver
- primitive TLSAuthFailed
- primitive TLSHandshakeFailed
- class TooManyConnections
- primitive TransactionFailed
- primitive TransactionIdle
- primitive TransactionInBlock
- type TransactionStatus
- primitive UnsupportedAuthenticationMethod
- primitive WriterToByteArray