Skip to content

Pony PostgreSQL Driver

Pure Pony driver for PostgreSQL. Beta-level — we feel good about it but it needs real-world usage. Expect breaking changes between releases.

Connecting

Create a Session with server and database connection info plus a SessionStatusNotify to receive lifecycle events:

use "postgres"
use lori = "lori"

actor Main
  new create(env: Env) =>
    let session = Session(
      ServerConnectInfo(lori.TCPConnectAuth(env.root), "localhost", "5432"),
      DatabaseConnectInfo("myuser", "mypassword", "mydb"),
      MyNotify(env))

actor MyNotify is (SessionStatusNotify & ResultReceiver)
  let _env: Env

  new create(env: Env) =>
    _env = env

  be pg_session_authenticated(session: Session) =>
    session.execute(SimpleQuery("SELECT 1"), this)

  be pg_query_result(session: Session, result: Result) =>
    _env.out.print("Got a result!")
    session.close()

  be pg_query_failed(session: Session, query: Query,
    failure: (ErrorResponseMessage | ClientQueryError))
  =>
    _env.out.print("Query failed")
    session.close()

SessionStatusNotify callbacks are all optional (default no-op). Implement the ones you need:

  • pg_session_connected — TCP connection established
  • pg_session_connection_failed — the session failed to reach the ready state. Fires for any pre-ready failure: transport-level (DNS, TCP, TLS), unsupported authentication method, bad password, server rejection (e.g., max_connections exhausted), or SCRAM server verification failure
  • pg_session_authenticated — login succeeded
  • pg_transaction_status — fires on every ReadyForQuery with TransactionIdle, TransactionInBlock, or TransactionFailed
  • pg_notification — LISTEN/NOTIFY notifications
  • pg_notice — non-fatal server notices (e.g., DROP IF EXISTS on a nonexistent table, RAISE NOTICE from PL/pgSQL)
  • pg_parameter_status — runtime parameter values (sent during startup and after SET commands)
  • pg_session_shutdown — session has shut down

SSL/TLS

Two SSL modes are available:

  • SSLRequired — aborts if the server refuses SSL. Use when encryption is mandatory.
  • SSLPreferred — attempts SSL, falls back to plaintext if the server refuses. Equivalent to PostgreSQL's sslmode=prefer. A TLS handshake failure (server accepts but handshake fails) is NOT retried as plaintext — pg_session_connection_failed fires.
use "ssl/net"

let sslctx = recover val
  SSLContext
    .> set_client_verify(true)
    .> set_authority(FilePath(FileAuth(env.root), "/path/to/ca.pem"))?
end

// Require SSL — fail if server refuses
let session = Session(
  ServerConnectInfo(
    lori.TCPConnectAuth(env.root), "localhost", "5432",
    SSLRequired(sslctx)),
  DatabaseConnectInfo("myuser", "mypassword", "mydb"),
  MyNotify(env))

// Prefer SSL — fall back to plaintext if server refuses
let session2 = Session(
  ServerConnectInfo(
    lori.TCPConnectAuth(env.root), "localhost", "5432",
    SSLPreferred(sslctx)),
  DatabaseConnectInfo("myuser", "mypassword", "mydb"),
  MyNotify(env))

Authentication

The driver supports cleartext password, MD5 password, and SCRAM-SHA-256 authentication. ServerConnectInfo.auth_requirement selects the client's policy on what the server is allowed to ask for:

  • AuthRequireSCRAM (default) — rejects AuthenticationOk (trust), cleartext, and MD5 challenges with pg_session_connection_failed(AuthenticationMethodRejected). SCRAM is the only PostgreSQL authentication method that verifies the server knows the password; requiring it closes a server-driven downgrade vector where a malicious or compromised server asks for a weaker scheme.
  • AllowAnyAuth — accepts any authentication method the server offers. Required for connecting to servers configured for MD5, cleartext, or trust authentication.

The default is a breaking change from earlier releases. To connect to a non-SCRAM server, pass AllowAnyAuth:

let session = Session(
  ServerConnectInfo(
    lori.TCPConnectAuth(env.root), "localhost", "5432",
    SSLDisabled, AllowAnyAuth),
  DatabaseConnectInfo("myuser", "mypassword", "mydb"),
  MyNotify(env))

If authentication fails, the pg_session_connection_failed callback fires with a ConnectionFailureReason. Authentication-specific variants:

  • AuthenticationMethodRejected — the server-offered method is disallowed by the session's AuthRequirement (the default AuthRequireSCRAM rejects AuthenticationOk, cleartext, and MD5)
  • InvalidPassword — wrong password (SQLSTATE 28P01). Call response() for the full ErrorResponseMessage
  • InvalidAuthorizationSpecification — nonexistent user, user not permitted to connect, or pg_hba.conf rejection (SQLSTATE 28000). Call response() for the full ErrorResponseMessage
  • UnsupportedAuthenticationMethod — the server requested a method the driver doesn't support (e.g., Kerberos, GSSAPI). Distinct from AuthenticationMethodRejected: that's the driver's policy refusing a method it could perform; this is the driver being unable to perform what the server asked for
  • ServerVerificationFailed — the server's SCRAM signature didn't match (possible MITM or misconfigured server)

Executing Queries

Three query types are available, all executed via session.execute():

  • SimpleQuery — an unparameterized SQL string. Can contain multiple semicolon-separated statements; each produces a separate result callback.

  • PreparedQuery — a parameterized single statement using $1, $2, etc. Parameters are Array[FieldDataTypes] val — typed values (I16, I32, I64, F32, F64, Bool, Array[U8] val, PgArray, PgComposite, PgTimestamp, PgTime, PgDate, PgInterval) are sent in binary format with explicit OIDs, while String and None use text format with server-inferred types. Uses an unnamed server-side prepared statement (created and destroyed per execution).

  • NamedPreparedQuery — executes a previously prepared named statement (see session.prepare()). Same typed parameter semantics as PreparedQuery. Use this when executing the same parameterized query many times to avoid repeated parsing.

For one-off parameterized queries, prefer PreparedQuery. Use NamedPreparedQuery only when you need to reuse a prepared statement across multiple executions.

Results arrive via ResultReceiver:

  • pg_query_result delivers a Result, which is one of:
  • ResultSet — rows returned (SELECT, RETURNING, etc.)
  • RowModifying — row count returned (INSERT, UPDATE, DELETE)
  • SimpleResult — no data (empty query)

  • pg_query_failed delivers an ErrorResponseMessage (server error) or a ClientQueryError (SessionClosed, SessionNeverOpened, SessionNotAuthenticated, DataError).

Working with Results

ResultSet contains typed Rows:

be pg_query_result(session: Session, result: Result) =>
  match result
  | let rs: ResultSet =>
    for row in rs.rows().values() do
      for field in row.fields.values() do
        match field.value
        | let s: String => _env.out.print(field.name + ": " + s)
        | let i: I32 => _env.out.print(field.name + ": " + i.string())
        | let b: Bool => _env.out.print(field.name + ": " + b.string())
        | let v: Bytea =>
          _env.out.print(field.name + ": " + v.data.size().string() + " bytes")
        | let a: PgArray =>
          _env.out.print(field.name + ": " + a.string())
        | let c: PgComposite =>
          _env.out.print(field.name + ": " + c.string())
        | let t: PgTimestamp => _env.out.print(field.name + ": " + t.string())
        | let t: PgDate => _env.out.print(field.name + ": " + t.string())
        | let t: PgTime => _env.out.print(field.name + ": " + t.string())
        | let t: PgInterval => _env.out.print(field.name + ": " + t.string())
        | None => _env.out.print(field.name + ": NULL")
        // Also: I16, I64, F32, F64
        end
      end
    end
  | let rm: RowModifying =>
    _env.out.print(rm.command() + ": " + rm.impacted().string() + " rows")
  end

Field values are typed based on the PostgreSQL column OID: bytea → Bytea, bool → Bool, int2 → I16, int4 → I32, int8 → I64, float4 → F32, float8 → F64, date → PgDate, time → PgTime, timestamp/timestamptz → PgTimestamp, interval → PgInterval, array types → PgArray, registered composite types → PgComposite, NULL → None. Extended query results use binary format — unknown OIDs produce RawBytes. Simple query results use text format — unknown OIDs produce String.

timestamptz and query format: PreparedQuery results use binary format where timestamptz values are always UTC microseconds. SimpleQuery results use text format where the server renders the value in the session's timezone and the driver strips the timezone suffix — the resulting PgTimestamp microseconds represent session-local time, not UTC. If your session timezone is not UTC and you use both query types on the same timestamptz column, the microsecond values will differ for the same row.

Named Prepared Statements

Prepare once, execute many times:

be pg_session_authenticated(session: Session) =>
  session.prepare("find_user", "SELECT * FROM users WHERE id = $1", this)

be pg_statement_prepared(session: Session, name: String) =>
  session.execute(
    NamedPreparedQuery(name,
      recover val [as FieldDataTypes: I32(42)] end),
    this)
  session.execute(
    NamedPreparedQuery(name,
      recover val [as FieldDataTypes: I32(99)] end),
    this)
  // When done, optionally: session.close_statement(name)

This requires implementing PrepareReceiver alongside ResultReceiver.

Bulk Loading with COPY IN

session.copy_in() sends data to the server via the COPY FROM STDIN protocol. It uses a pull-based flow — the session calls pg_copy_ready on the CopyInReceiver to request each chunk:

be pg_session_authenticated(session: Session) =>
  session.copy_in(
    "COPY my_table (col1, col2) FROM STDIN WITH (FORMAT text)", this)

be pg_copy_ready(session: Session) =>
  if _has_more_data then
    session.send_copy_data("val1\tval2\n".array())
  else
    session.finish_copy()
  end

be pg_copy_complete(session: Session, count: USize) =>
  _env.out.print("Loaded " + count.string() + " rows")

Call session.abort_copy(reason) instead of finish_copy() to abort the operation.

Bulk Export with COPY OUT

session.copy_out() exports data from the server via the COPY TO STDOUT protocol. The server drives the flow — data arrives via pg_copy_data callbacks on the CopyOutReceiver:

be pg_session_authenticated(session: Session) =>
  session.copy_out("COPY my_table TO STDOUT", this)

be pg_copy_data(session: Session, data: Array[U8] val) =>
  _buffer.append(data)

be pg_copy_complete(session: Session, count: USize) =>
  _env.out.print("Exported " + count.string() + " rows")

Row Streaming

session.stream() delivers rows in windowed batches using the extended query protocol's portal suspension mechanism. Unlike execute() which buffers all rows before delivery, streaming enables pull-based paged result consumption with bounded memory:

be pg_session_authenticated(session: Session) =>
  session.stream(
    PreparedQuery("SELECT * FROM big_table",
      recover val Array[FieldDataTypes] end),
    100, this)  // window_size = 100 rows per batch

be pg_stream_batch(session: Session, rows: Rows) =>
  // Process this batch of up to 100 rows
  for row in rows.values() do
    // ...
  end
  session.fetch_more()  // Pull the next batch

be pg_stream_complete(session: Session) =>
  _env.out.print("All rows processed")

Call session.close_stream() to end streaming early. Only PreparedQuery and NamedPreparedQuery are supported — streaming requires the extended query protocol.

Query Pipelining

session.pipeline() sends multiple queries to the server in a single TCP write, reducing round-trip latency from N round trips to 1. Each query has its own Sync boundary for error isolation — if one query fails, subsequent queries continue executing.

Only PreparedQuery and NamedPreparedQuery are supported. Results arrive via PipelineReceiver:

  • pg_pipeline_result — individual query succeeded, with its pipeline index
  • pg_pipeline_failed — individual query failed, with its pipeline index
  • pg_pipeline_complete — all queries processed (always fires last)
be pg_session_authenticated(session: Session) =>
  let queries = recover val
    [as (PreparedQuery | NamedPreparedQuery):
      PreparedQuery("SELECT * FROM users WHERE id = $1",
        recover val [as FieldDataTypes: I32(1)] end)
      PreparedQuery("SELECT * FROM users WHERE id = $1",
        recover val [as FieldDataTypes: I32(2)] end)
      PreparedQuery("SELECT * FROM users WHERE id = $1",
        recover val [as FieldDataTypes: I32(3)] end)
    ]
  end
  session.pipeline(queries, this)

be pg_pipeline_result(session: Session, index: USize, result: Result) =>
  _env.out.print("Query " + index.string() + " succeeded")

be pg_pipeline_failed(session: Session, index: USize,
  query: (PreparedQuery | NamedPreparedQuery),
  failure: (ErrorResponseMessage | ClientQueryError))
=>
  _env.out.print("Query " + index.string() + " failed")

be pg_pipeline_complete(session: Session) =>
  _env.out.print("All pipeline queries processed")

Query Cancellation

session.cancel() requests cancellation of the currently executing query. Cancellation is best-effort — the server may or may not honor it. If cancelled, the query's ResultReceiver receives pg_query_failed with SQLSTATE 57014. Queued queries are not affected.

Array Types

1-dimensional PostgreSQL arrays are automatically decoded into PgArray values. All built-in element types are supported (int2, int4, int8, float4, float8, bool, text, bytea, date, time, timestamp, timestamptz, interval, uuid, jsonb, numeric, and text-like types).

PgArray can also be used as a query parameter:

let arr = PgArray(23,
  recover val [as (FieldData | None): I32(1); I32(2); None; I32(4)] end)
session.execute(PreparedQuery("SELECT $1::int4[]",
  recover val [as FieldDataTypes: arr] end), receiver)

For custom array types (arrays of custom codec-registered OIDs), use CodecRegistry.with_array_type():

let registry = CodecRegistry
  .with_codec(600, PointBinaryCodec)?
  .with_array_type(1017, 600)?

Composite Types

User-defined composite types (created with CREATE TYPE ... AS (...)) are decoded as PgComposite when registered with CodecRegistry.with_composite_type(). Register the composite OID and its field descriptors (name + OID pairs):

// CREATE TYPE address AS (street text, city text, zip_code int4)
// OID discovered via: SELECT oid FROM pg_type WHERE typname = 'address'

let registry = CodecRegistry
  .with_composite_type(16400,
    recover val
      [as (String, U32): ("street", 25); ("city", 25); ("zip_code", 23)]
    end)?
  .with_array_type(16401, 16400)?  // address[]
let session = Session(server_info, db_info, notify where registry = registry)

Access fields by position or name:

match field.value
| let addr: PgComposite =>
  match try addr(0)? end       // positional
  | let street: String => // ...
  end
  match try addr.field("city")? end  // named
  | let city: String => // ...
  end
end

PgComposite can also be sent as a query parameter using from_fields for safe construction:

let addr = PgComposite.from_fields(16400,
  recover val
    [as (String, U32, (FieldData | None)):
      ("street", 25, "123 Main St")
      ("city", 25, "Springfield")
      ("zip_code", 23, I32(62704))]
  end)
session.execute(PreparedQuery("INSERT INTO users (home) VALUES ($1)",
  recover val [as FieldDataTypes: addr] end), receiver)

Nested composites and composite arrays are supported. Both PreparedQuery (binary format) and SimpleQuery (text format) decode registered composites.

Custom Codecs

Extend the driver with custom type decoders. Implement Codec to decode a PostgreSQL type, then register it with CodecRegistry.with_codec():

class val Point is FieldData
  let x: F64
  let y: F64

  new val create(x': F64, y': F64) =>
    x = x'
    y = y'

  fun string(): String iso^ =>
    recover iso String .> append("(" + x.string() + "," + y.string() + ")") end

primitive PointBinaryCodec is Codec
  fun format(): U16 => 1  // binary

  fun encode(value: FieldDataTypes): Array[U8] val ? =>
    error  // encode not needed for result-only types

  fun decode(data: Array[U8] val): FieldData ? =>
    if data.size() != 16 then error end
    let x = ifdef bigendian then
      F64.from_bits(data.read_u64(0)?)
    else
      F64.from_bits(data.read_u64(0)?.bswap())
    end
    let y = ifdef bigendian then
      F64.from_bits(data.read_u64(8)?)
    else
      F64.from_bits(data.read_u64(8)?.bswap())
    end
    Point(x, y)

// Register and pass to Session
let registry = CodecRegistry.with_codec(600, PointBinaryCodec)?
let session = Session(server_info, db_info, notify where registry = registry)

Result fields from decoded custom types can be matched directly:

match field.value
| let p: Point => _env.out.print(p.string())
end

Custom types that need to participate in Field.eq() comparisons should also implement FieldDataEquatable.

Startup Rejection

pg_session_connection_failed also fires for non-authentication rejections the server raises before the session reaches the ready state:

  • TooManyConnections — server's max_connections has been reached (SQLSTATE 53300). Carries the full ErrorResponseMessage
  • InvalidDatabaseName — the requested database does not exist (SQLSTATE 3D000). Carries the full ErrorResponseMessage
  • ServerRejected — fallback for any other server ErrorResponse during startup. Call response() for the full ErrorResponseMessage; inspect response().code (SQLSTATE) to distinguish specific failure modes

Connection and Transport Failures

pg_session_connection_failed also fires when the session never reaches the server or fails to complete TLS negotiation:

  • ConnectionFailedDNS — name resolution failed
  • ConnectionFailedTCP — TCP connection refused or unreachable
  • ConnectionFailedTimeout — TCP/TLS did not complete within the optional connection_timeout
  • ConnectionFailedTimerError — the connect-timeout timer could not be armed
  • SSLServerRefused — the server refused the SSLRequest and the client was configured with SSLRequired
  • TLSHandshakeFailed — the TLS handshake failed
  • TLSAuthFailed — TLS certificate or authentication verification failed

Supported Features

  • Simple and extended query protocols
  • Typed parameterized queries with binary encoding for numeric, boolean, bytea, and temporal types (unnamed and named prepared statements)
  • SSL/TLS via SSLRequired and SSLPreferred
  • Cleartext password, MD5, and SCRAM-SHA-256 authentication
  • Transaction status tracking (TransactionStatus)
  • LISTEN/NOTIFY notifications
  • NoticeResponse delivery (non-fatal server messages)
  • COPY FROM STDIN (bulk data loading)
  • COPY TO STDOUT (bulk data export)
  • Row streaming (windowed batch delivery)
  • Query pipelining (batched multi-query execution)
  • Query cancellation
  • Statement timeout (automatic cancellation after a deadline)
  • Connection timeout (bounded TCP connection phase)
  • ParameterStatus tracking (server runtime parameters)
  • 1-dimensional array types (decode and encode via PgArray)
  • User-defined enum types via CodecRegistry.with_enum_type()
  • User-defined composite types (decode and encode via PgComposite)
  • Custom codecs via CodecRegistry.with_codec()
  • Custom array types via CodecRegistry.with_array_type()

Public Types