Skip to content

Instantly share code, notes, and snippets.

@urso
Created June 19, 2024 10:32
Show Gist options
  • Save urso/a3ab0418ce7f90e884de9aef139aaa39 to your computer and use it in GitHub Desktop.
Save urso/a3ab0418ce7f90e884de9aef139aaa39 to your computer and use it in GitHub Desktop.
const std = @import("std");
const pgzx = @import("pgzx");
const pg = pgzx.c;
const fdw = pgzx.fdw;
const pq = pgzx.pq;
// postgres_fdw options not related to connection or transaction handling.
const postgres_fdw_options = fdw.OptionList.init(&[_]fdw.Option{
fdw.Option.init("xact_handler", pg.ForeignDataWrapperRelationId, pgfdwValidatorXactHandler),
fdw.Option.String("schema_name", pg.ForeignTableRelationId),
fdw.Option.String("table_name", pg.ForeignTableRelationId),
fdw.Option.String("column_name", pg.AttributeRelationId),
// use_remote_estimate is available on both server and table
fdw.Option.Bool("use_remote_estimate", pg.ForeignServerRelationId),
fdw.Option.Bool("use_remote_estimate", pg.ForeignTableRelationId),
// cost factors
fdw.Option.RealPos("fdw_startup_cost", pg.ForeignServerRelationId),
fdw.Option.RealPos("fdw_tuple_cost", pg.ForeignServerRelationId),
// shippable extensions
fdw.Option.init("extensions", pg.ForeignServerRelationId, &pgfdwValidatorExtensionList),
// updatable is available on both server and table
fdw.Option.Bool("updatable", pg.ForeignServerRelationId),
fdw.Option.Bool("updatable", pg.ForeignTableRelationId),
// truncatable is available on both server and table
fdw.Option.Bool("truncatable", pg.ForeignServerRelationId),
fdw.Option.Bool("truncatable", pg.ForeignTableRelationId),
// fetch_size is available on both server and table
fdw.Option.IntPos("fetch_size", pg.ForeignServerRelationId),
fdw.Option.IntPos("fetch_size", pg.ForeignTableRelationId),
// batch_size is available on both server and table
fdw.Option.IntPos("batch_size", pg.ForeignServerRelationId),
fdw.Option.IntPos("batch_size", pg.ForeignTableRelationId),
// async_capable is available on both server and table
fdw.Option.Bool("async_capable", pg.ForeignServerRelationId),
fdw.Option.Bool("async_capable", pg.ForeignTableRelationId),
// sampling is available on both server and table
fdw.Option.init("analyze_sampling", pg.ForeignServerRelationId, &pgfdwValidatorAnalyzeSampling),
fdw.Option.init("analyze_sampling", pg.ForeignTableRelationId, &pgfdwValidatorAnalyzeSampling),
});
// internal options for connection and transaction handling.
const xact_conn_options = fdw.OptionList.init(&[_]fdw.Option{
fdw.Option.Bool("keep_connections", pg.ForeignServerRelationId),
fdw.Option.Bool("password_required", pg.UserMappingRelationId),
});
// libpq connection options.
var extension_libpq_options: ?[]fdw.Option = undefined;
pub fn pgdc_postgres_fdw_validator(list_datum: pg.Datum, catalog: pg.Oid) void {
initPqOptions();
// use C-API to convert the datum to a proper List of DefElem pointers
const options = pg.untransformRelOptions(list_datum);
// temporary list of all options supported by the extension. Static options and options read from libpq
const extension_options = fdw.MultiOptionList.init(&[_]fdw.OptionList{
postgres_fdw_options,
xact_conn_options,
fdw.OptionList.init(extension_libpq_options.?),
});
// Validate user options against our list of options
fdw.validateOptions(extension_options, options, catalog);
}
fn initPqOptions() void {
if (extension_libpq_options != null)
return;
const LIBPQ_OPTIONS_EXTRA = [_]fdw.Option{
// Additional libpq options that we want to use with user mappings.
fdw.Option.OptString("sslcert", pg.UserMappingRelationId),
fdw.Option.OptString("sslkey", pg.UserMappingRelationId),
fdw.Option.OptString("gssdelegation", pg.UserMappingRelationId),
};
const libpq_options = pg.PQconndefaults();
if (libpq_options == null) {
pgzx.elog.ErrorThrow(@src(), "could not get libpq options", .{});
}
var num_options: usize = 0;
var l_opt: [*c]pg.PQconninfoOption = libpq_options;
while (l_opt.*.keyword != null) : (l_opt = l_opt + 1) {
num_options += 1;
}
// Create array of valid libpq options
// We use the systems C allocator because the options are valid as long as
// the current process lives.
const options = std.heap.c_allocator.alloc(fdw.Option, num_options + LIBPQ_OPTIONS_EXTRA.len) catch {
pgzx.elog.ErrorThrow(@src(), "could not allocate memory for libpq options", .{});
unreachable;
};
l_opt = libpq_options;
var i: usize = 0;
while (l_opt.*.keyword != null) : (l_opt = l_opt + 1) {
const disp = std.mem.span(l_opt.*.dispchar);
// hide debug options
if (std.mem.indexOfScalar(u8, disp, 'D') != null)
continue;
const keyword = std.mem.span(l_opt.*.keyword);
// hide internal options that we overwrite
if (std.mem.eql(u8, keyword, "fallback_application_name") or std.mem.eql(u8, keyword, "client_encoding"))
continue;
// user and 'secrets' are only allowed on the user mappings. Otherwise
// options are assumed to be applied to the server.
const is_user_option = std.mem.eql(u8, keyword, "user") or std.mem.indexOf(u8, disp, "*") != null;
const context: pg.Oid = if (is_user_option) pg.UserMappingRelationId else pg.ForeignServerRelationId;
options[i] = fdw.Option.init(keyword, context, null);
i += 1;
}
for (LIBPQ_OPTIONS_EXTRA) |opt| {
options[i] = opt;
i += 1;
}
extension_libpq_options = options[0..i];
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment