Skip to content

Instantly share code, notes, and snippets.

@willium
Created August 15, 2024 04:45
Show Gist options
  • Save willium/8fdbd2e133e9f5b738d51ac88dc08a58 to your computer and use it in GitHub Desktop.
Save willium/8fdbd2e133e9f5b738d51ac88dc08a58 to your computer and use it in GitHub Desktop.
Examples of how I'm using Mosaic within a React app
// Mosaic Data Client reusable subclass
"use client";
import type * as Arrow from "apache-arrow";
import type * as Mosaic from "@uwdata/mosaic-core";
import { MosaicClient } from "@uwdata/mosaic-core";
import * as SQL from "@uwdata/mosaic-sql";
import { uniqueId } from "lodash";
import { z } from "zod";
import { FieldName, type TableName } from "@/utils/models/sql";
const SELECT_STAR = FieldName.parse("*");
export const ClientId = z.string().brand<"client">();
export type ClientId = z.infer<typeof ClientId>;
export type DataClientProps<T extends Arrow.TypeMap> = {
id?: ClientId;
table: TableName;
columns?: FieldName[];
selection?: Mosaic.Selection;
// if query is undefined, uses this.defaultQuery
query?: SQL.Query | null;
orderby?: unknown[];
offset?: number;
limit?: number;
// callbacks
onSchemaChange?: (schema: Mosaic.FieldInfo[]) => void;
onRequestQuery?: (query?: SQL.Query | null) => void;
onRequestUpdate?: () => void;
onPrefetchQuery?: (nextQuery: SQL.Query) => void;
onQueryPending?: () => void;
onQueryResult?: (data: Arrow.Table<T>) => void;
onQueryError?: (error: unknown) => void;
onUpdate?: () => void;
};
export class DataClient<T extends Arrow.TypeMap> extends MosaicClient {
private _data: Arrow.Table<T> | null = null;
private _selection: Mosaic.Selection | undefined;
private _table: TableName;
private _columns: FieldName[] = [SELECT_STAR];
private _query?: SQL.Query | null;
private _orderby: unknown[];
private _offset: number;
private _limit: number;
private _schema: Mosaic.FieldInfo[] = [];
private _onSchemaChange?: (schema: Mosaic.FieldInfo[]) => void;
private _onRequestQuery?: (query?: SQL.Query | null) => void;
private _onRequestUpdate?: () => void;
private _onPrefetchQuery?: (nextQuery: SQL.Query) => void;
private _onQueryPending?: () => void;
private _onQueryResult?: (data: Arrow.Table<T>) => void;
private _onQueryError?: (error: unknown) => void;
private _onUpdate?: () => void;
public readonly id: ClientId;
constructor({
id,
table,
columns = [SELECT_STAR],
selection,
query,
orderby = [],
offset = 0,
limit = -1, // -1 means no limit
onSchemaChange,
onRequestQuery,
onRequestUpdate,
onPrefetchQuery,
onQueryPending,
onQueryResult,
onQueryError,
onUpdate,
}: DataClientProps<T>) {
super(selection); // Call the parent constructor with the filterBy selection
// Initialize client properties from the options
this.id = id ?? ClientId.parse(uniqueId("client_"));
this._table = table;
this._columns = columns;
this._selection = selection;
this._query = query;
this._orderby = orderby;
this._offset = offset;
this._limit = limit;
this._onSchemaChange = onSchemaChange;
this._onRequestQuery = onRequestQuery;
this._onRequestUpdate = onRequestUpdate;
this._onPrefetchQuery = onPrefetchQuery;
this._onQueryPending = onQueryPending;
this._onQueryResult = onQueryResult;
this._onQueryError = onQueryError;
this._onUpdate = onUpdate;
}
get data(): Arrow.Table<T> | null {
return this._data;
}
get selection(): Mosaic.Selection | undefined {
return this._selection;
}
get table(): string {
return this._table;
}
get columns(): string[] {
return this._columns;
}
fields(): Mosaic.Field[] | null {
return this._columns.map((column) => {
// HACK: https://github.com/uwdata/mosaic/issues/366
return SQL.column(this._table, column) as Mosaic.Field;
});
}
fieldInfo(schema: Mosaic.FieldInfo[]): this {
this._onSchemaChange?.(schema);
this._schema = schema;
return this;
}
get defaultQuery(): SQL.Query {
return SQL.Query.from(this._table).select(
this._schema?.map(({ column }) => column) ?? this._columns,
);
}
sort(column: string) {
if (this._orderby.includes(column)) {
this._orderby = this._orderby.filter(
(_column) => _column !== column,
);
} else {
this._orderby = [...this._orderby, column];
}
this.batchQuery();
}
query(filter: unknown = []): SQL.Query | null {
if (this._query === null) return null;
let query = this._query?.clone() ?? this.defaultQuery.clone();
query = query.where(filter);
if (this._offset > 0) query = query.offset(this._offset);
if (this._limit > 0) query = query.limit(this._limit);
query = query.orderby(Array.from(this._orderby).map(SQL.desc));
return query;
}
async requestQuery(query?: SQL.Query | null) {
this._onRequestQuery?.(query);
await super.requestQuery(query);
return this;
}
requestUpdate() {
this._onRequestUpdate?.();
super.requestUpdate();
}
async batchQuery(opts?: {
offset?: number;
limit?: number;
}): Promise<Arrow.Table<T> | null> {
this._offset = opts?.offset ?? this._offset;
this._limit = opts?.limit ?? this._limit;
const query = this.query(this._selection?.predicate(this));
if (!query) return null;
// request next data batch
await this.requestQuery(query);
if (this._limit > 0) {
// prefetch subsequent data batch
const nextQuery = query.clone().offset(this._offset + this._limit);
this._onPrefetchQuery?.(nextQuery);
this.coordinator?.prefetch(nextQuery);
}
return this.data;
}
queryPending() {
this._onQueryPending?.();
return this;
}
queryResult(data: Arrow.Table): this {
this._onQueryResult?.(data);
this._data = data; // store previous result?
return this;
}
queryError(error: unknown): this {
this._onQueryError?.(error);
const result = super.queryError(error); // Call the parent method to log the error
return result;
}
update(): this {
this._onUpdate?.();
return this;
}
}
// Mosaic Coordinator Provider
"use client";
import * as Mosaic from "@uwdata/mosaic-core";
import { createContext, useContext, useMemo } from "react";
import * as Arrow from "apache-arrow";
import assert from "assert";
import { useAnalyticalData } from "./analytical-data-provider";
const DataCoordinatorContext = createContext<Mosaic.Coordinator | null>(null);
export function DataCoordinatorProvider({
children,
}: {
children: React.ReactNode;
}) {
const connection = useAnalyticalData();
const coordinator = useMemo(() => {
if (!connection) return null;
const connector: Mosaic.Connector = {
query: async ({ sql, type: t }) => {
const result = await connection.evaluateStreamingQuery(sql);
if (t === "exec") return undefined;
const batches = await result.arrowStream.readAll();
const table = new Arrow.Table(batches);
if (t === "json") return Array.from(table);
assert(t === "arrow");
return table;
},
};
const options: Mosaic.CoordinatorOptions = { consolidate: false };
return Mosaic.coordinator(new Mosaic.Coordinator(connector, options));
}, [connection]);
return (
<DataCoordinatorContext.Provider value={coordinator}>
{children}
</DataCoordinatorContext.Provider>
);
}
export function useDataCoordinator() {
const coordinator = useContext(DataCoordinatorContext);
return coordinator;
}
// Mosaic Data Client wrapper hooks
"use client";
import type * as Arrow from "apache-arrow";
import type * as Mosaic from "@uwdata/mosaic-core";
import type * as SQL from "@uwdata/mosaic-sql";
import { useEffect, useMemo, useState } from "react";
import { useDataCoordinator } from "../data-coordinator-provider";
import { DataClient, type DataClientProps } from "./data-client";
export type UseDataClientProps<
R = Arrow.Table,
T extends Arrow.TypeMap = Arrow.TypeMap,
> = Omit<DataClientProps<T>, "tableUpdatedAt" | "onQueryResult"> & {
onQueryResult?: (data: R) => void;
};
export type UseDataClientResponse<
R = Arrow.Table,
T extends Arrow.TypeMap = Arrow.TypeMap,
> = {
isConnected: boolean;
isReady: boolean;
client: DataClient<T> | null;
result: R;
schema: Mosaic.FieldInfo[];
error: string | null;
};
export const PASSTHROUGH_RESULT_PARSER = <
T extends Arrow.TypeMap = Arrow.TypeMap,
>(
table: Arrow.Table<T> | null,
) => table;
export function useCustomDataClient<
R = Arrow.Table,
T extends Arrow.TypeMap = Arrow.TypeMap,
>(
props: UseDataClientProps<R, T>,
resultParser: (table: Arrow.Table<T> | null) => R,
initialResult: R,
): UseDataClientResponse<R, T> {
const [isReady, setIsReady] = useState(false);
const [schema, setSchema] = useState<Mosaic.FieldInfo[]>([]);
const [error, setError] = useState<string | null>(null);
const [result, setResult] = useState<R>(initialResult);
const coordinator = useDataCoordinator();
const client = useMemo(() => {
function onSchemaChange(_schema: Mosaic.FieldInfo[]) {
props.onSchemaChange?.(_schema);
setSchema(_schema);
}
function onRequestQuery(query?: SQL.Query | null) {
props.onRequestQuery?.(query);
}
function onQueryError(_error: unknown) {
props.onQueryError?.(_error);
setError(JSON.stringify(_error));
}
function onQueryResult(_result: Arrow.Table<T>) {
props.onQueryResult?.(resultParser(_result));
setResult(resultParser(_result));
}
function onUpdate() {
props.onUpdate?.();
setIsReady(true);
}
return new DataClient<T>({
...props,
onSchemaChange,
onRequestQuery,
onQueryError,
onQueryResult,
onUpdate,
});
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
const [isConnected, setIsConnected] = useState(false);
useEffect(() => {
if (!coordinator || !client) return;
const startTime = performance.now();
coordinator.connect(client).then(() => {
const endTime = performance.now();
console.log(`Connection time: ${endTime - startTime} milliseconds`);
setIsConnected(true);
});
return () => {
if (coordinator && client) coordinator.disconnect(client);
setIsConnected(false);
};
}, [client, coordinator]);
return { isConnected, isReady, client, result, schema, error };
}
export function useDataClient<T extends Arrow.TypeMap = Arrow.TypeMap>(
props: UseDataClientProps<Arrow.Table | null, T>,
): UseDataClientResponse<Arrow.Table | null, T> {
return useCustomDataClient<Arrow.Table | null, T>(
props,
PASSTHROUGH_RESULT_PARSER,
null,
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment