-
-
Save willium/8fdbd2e133e9f5b738d51ac88dc08a58 to your computer and use it in GitHub Desktop.
Examples of how I'm using Mosaic within a React app
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Mosaic Data Client reusable subclass | |
"use client"; | |
import type * as Arrow from "apache-arrow"; | |
import type * as Mosaic from "@uwdata/mosaic-core"; | |
import { MosaicClient } from "@uwdata/mosaic-core"; | |
import * as SQL from "@uwdata/mosaic-sql"; | |
import { uniqueId } from "lodash"; | |
import { z } from "zod"; | |
import { FieldName, type TableName } from "@/utils/models/sql"; | |
const SELECT_STAR = FieldName.parse("*"); | |
export const ClientId = z.string().brand<"client">(); | |
export type ClientId = z.infer<typeof ClientId>; | |
export type DataClientProps<T extends Arrow.TypeMap> = { | |
id?: ClientId; | |
table: TableName; | |
columns?: FieldName[]; | |
selection?: Mosaic.Selection; | |
// if query is undefined, uses this.defaultQuery | |
query?: SQL.Query | null; | |
orderby?: unknown[]; | |
offset?: number; | |
limit?: number; | |
// callbacks | |
onSchemaChange?: (schema: Mosaic.FieldInfo[]) => void; | |
onRequestQuery?: (query?: SQL.Query | null) => void; | |
onRequestUpdate?: () => void; | |
onPrefetchQuery?: (nextQuery: SQL.Query) => void; | |
onQueryPending?: () => void; | |
onQueryResult?: (data: Arrow.Table<T>) => void; | |
onQueryError?: (error: unknown) => void; | |
onUpdate?: () => void; | |
}; | |
export class DataClient<T extends Arrow.TypeMap> extends MosaicClient { | |
private _data: Arrow.Table<T> | null = null; | |
private _selection: Mosaic.Selection | undefined; | |
private _table: TableName; | |
private _columns: FieldName[] = [SELECT_STAR]; | |
private _query?: SQL.Query | null; | |
private _orderby: unknown[]; | |
private _offset: number; | |
private _limit: number; | |
private _schema: Mosaic.FieldInfo[] = []; | |
private _onSchemaChange?: (schema: Mosaic.FieldInfo[]) => void; | |
private _onRequestQuery?: (query?: SQL.Query | null) => void; | |
private _onRequestUpdate?: () => void; | |
private _onPrefetchQuery?: (nextQuery: SQL.Query) => void; | |
private _onQueryPending?: () => void; | |
private _onQueryResult?: (data: Arrow.Table<T>) => void; | |
private _onQueryError?: (error: unknown) => void; | |
private _onUpdate?: () => void; | |
public readonly id: ClientId; | |
constructor({ | |
id, | |
table, | |
columns = [SELECT_STAR], | |
selection, | |
query, | |
orderby = [], | |
offset = 0, | |
limit = -1, // -1 means no limit | |
onSchemaChange, | |
onRequestQuery, | |
onRequestUpdate, | |
onPrefetchQuery, | |
onQueryPending, | |
onQueryResult, | |
onQueryError, | |
onUpdate, | |
}: DataClientProps<T>) { | |
super(selection); // Call the parent constructor with the filterBy selection | |
// Initialize client properties from the options | |
this.id = id ?? ClientId.parse(uniqueId("client_")); | |
this._table = table; | |
this._columns = columns; | |
this._selection = selection; | |
this._query = query; | |
this._orderby = orderby; | |
this._offset = offset; | |
this._limit = limit; | |
this._onSchemaChange = onSchemaChange; | |
this._onRequestQuery = onRequestQuery; | |
this._onRequestUpdate = onRequestUpdate; | |
this._onPrefetchQuery = onPrefetchQuery; | |
this._onQueryPending = onQueryPending; | |
this._onQueryResult = onQueryResult; | |
this._onQueryError = onQueryError; | |
this._onUpdate = onUpdate; | |
} | |
get data(): Arrow.Table<T> | null { | |
return this._data; | |
} | |
get selection(): Mosaic.Selection | undefined { | |
return this._selection; | |
} | |
get table(): string { | |
return this._table; | |
} | |
get columns(): string[] { | |
return this._columns; | |
} | |
fields(): Mosaic.Field[] | null { | |
return this._columns.map((column) => { | |
// HACK: https://github.com/uwdata/mosaic/issues/366 | |
return SQL.column(this._table, column) as Mosaic.Field; | |
}); | |
} | |
fieldInfo(schema: Mosaic.FieldInfo[]): this { | |
this._onSchemaChange?.(schema); | |
this._schema = schema; | |
return this; | |
} | |
get defaultQuery(): SQL.Query { | |
return SQL.Query.from(this._table).select( | |
this._schema?.map(({ column }) => column) ?? this._columns, | |
); | |
} | |
sort(column: string) { | |
if (this._orderby.includes(column)) { | |
this._orderby = this._orderby.filter( | |
(_column) => _column !== column, | |
); | |
} else { | |
this._orderby = [...this._orderby, column]; | |
} | |
this.batchQuery(); | |
} | |
query(filter: unknown = []): SQL.Query | null { | |
if (this._query === null) return null; | |
let query = this._query?.clone() ?? this.defaultQuery.clone(); | |
query = query.where(filter); | |
if (this._offset > 0) query = query.offset(this._offset); | |
if (this._limit > 0) query = query.limit(this._limit); | |
query = query.orderby(Array.from(this._orderby).map(SQL.desc)); | |
return query; | |
} | |
async requestQuery(query?: SQL.Query | null) { | |
this._onRequestQuery?.(query); | |
await super.requestQuery(query); | |
return this; | |
} | |
requestUpdate() { | |
this._onRequestUpdate?.(); | |
super.requestUpdate(); | |
} | |
async batchQuery(opts?: { | |
offset?: number; | |
limit?: number; | |
}): Promise<Arrow.Table<T> | null> { | |
this._offset = opts?.offset ?? this._offset; | |
this._limit = opts?.limit ?? this._limit; | |
const query = this.query(this._selection?.predicate(this)); | |
if (!query) return null; | |
// request next data batch | |
await this.requestQuery(query); | |
if (this._limit > 0) { | |
// prefetch subsequent data batch | |
const nextQuery = query.clone().offset(this._offset + this._limit); | |
this._onPrefetchQuery?.(nextQuery); | |
this.coordinator?.prefetch(nextQuery); | |
} | |
return this.data; | |
} | |
queryPending() { | |
this._onQueryPending?.(); | |
return this; | |
} | |
queryResult(data: Arrow.Table): this { | |
this._onQueryResult?.(data); | |
this._data = data; // store previous result? | |
return this; | |
} | |
queryError(error: unknown): this { | |
this._onQueryError?.(error); | |
const result = super.queryError(error); // Call the parent method to log the error | |
return result; | |
} | |
update(): this { | |
this._onUpdate?.(); | |
return this; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Mosaic Coordinator Provider | |
"use client"; | |
import * as Mosaic from "@uwdata/mosaic-core"; | |
import { createContext, useContext, useMemo } from "react"; | |
import * as Arrow from "apache-arrow"; | |
import assert from "assert"; | |
import { useAnalyticalData } from "./analytical-data-provider"; | |
const DataCoordinatorContext = createContext<Mosaic.Coordinator | null>(null); | |
export function DataCoordinatorProvider({ | |
children, | |
}: { | |
children: React.ReactNode; | |
}) { | |
const connection = useAnalyticalData(); | |
const coordinator = useMemo(() => { | |
if (!connection) return null; | |
const connector: Mosaic.Connector = { | |
query: async ({ sql, type: t }) => { | |
const result = await connection.evaluateStreamingQuery(sql); | |
if (t === "exec") return undefined; | |
const batches = await result.arrowStream.readAll(); | |
const table = new Arrow.Table(batches); | |
if (t === "json") return Array.from(table); | |
assert(t === "arrow"); | |
return table; | |
}, | |
}; | |
const options: Mosaic.CoordinatorOptions = { consolidate: false }; | |
return Mosaic.coordinator(new Mosaic.Coordinator(connector, options)); | |
}, [connection]); | |
return ( | |
<DataCoordinatorContext.Provider value={coordinator}> | |
{children} | |
</DataCoordinatorContext.Provider> | |
); | |
} | |
export function useDataCoordinator() { | |
const coordinator = useContext(DataCoordinatorContext); | |
return coordinator; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Mosaic Data Client wrapper hooks | |
"use client"; | |
import type * as Arrow from "apache-arrow"; | |
import type * as Mosaic from "@uwdata/mosaic-core"; | |
import type * as SQL from "@uwdata/mosaic-sql"; | |
import { useEffect, useMemo, useState } from "react"; | |
import { useDataCoordinator } from "../data-coordinator-provider"; | |
import { DataClient, type DataClientProps } from "./data-client"; | |
export type UseDataClientProps< | |
R = Arrow.Table, | |
T extends Arrow.TypeMap = Arrow.TypeMap, | |
> = Omit<DataClientProps<T>, "tableUpdatedAt" | "onQueryResult"> & { | |
onQueryResult?: (data: R) => void; | |
}; | |
export type UseDataClientResponse< | |
R = Arrow.Table, | |
T extends Arrow.TypeMap = Arrow.TypeMap, | |
> = { | |
isConnected: boolean; | |
isReady: boolean; | |
client: DataClient<T> | null; | |
result: R; | |
schema: Mosaic.FieldInfo[]; | |
error: string | null; | |
}; | |
export const PASSTHROUGH_RESULT_PARSER = < | |
T extends Arrow.TypeMap = Arrow.TypeMap, | |
>( | |
table: Arrow.Table<T> | null, | |
) => table; | |
export function useCustomDataClient< | |
R = Arrow.Table, | |
T extends Arrow.TypeMap = Arrow.TypeMap, | |
>( | |
props: UseDataClientProps<R, T>, | |
resultParser: (table: Arrow.Table<T> | null) => R, | |
initialResult: R, | |
): UseDataClientResponse<R, T> { | |
const [isReady, setIsReady] = useState(false); | |
const [schema, setSchema] = useState<Mosaic.FieldInfo[]>([]); | |
const [error, setError] = useState<string | null>(null); | |
const [result, setResult] = useState<R>(initialResult); | |
const coordinator = useDataCoordinator(); | |
const client = useMemo(() => { | |
function onSchemaChange(_schema: Mosaic.FieldInfo[]) { | |
props.onSchemaChange?.(_schema); | |
setSchema(_schema); | |
} | |
function onRequestQuery(query?: SQL.Query | null) { | |
props.onRequestQuery?.(query); | |
} | |
function onQueryError(_error: unknown) { | |
props.onQueryError?.(_error); | |
setError(JSON.stringify(_error)); | |
} | |
function onQueryResult(_result: Arrow.Table<T>) { | |
props.onQueryResult?.(resultParser(_result)); | |
setResult(resultParser(_result)); | |
} | |
function onUpdate() { | |
props.onUpdate?.(); | |
setIsReady(true); | |
} | |
return new DataClient<T>({ | |
...props, | |
onSchemaChange, | |
onRequestQuery, | |
onQueryError, | |
onQueryResult, | |
onUpdate, | |
}); | |
// eslint-disable-next-line react-hooks/exhaustive-deps | |
}, []); | |
const [isConnected, setIsConnected] = useState(false); | |
useEffect(() => { | |
if (!coordinator || !client) return; | |
const startTime = performance.now(); | |
coordinator.connect(client).then(() => { | |
const endTime = performance.now(); | |
console.log(`Connection time: ${endTime - startTime} milliseconds`); | |
setIsConnected(true); | |
}); | |
return () => { | |
if (coordinator && client) coordinator.disconnect(client); | |
setIsConnected(false); | |
}; | |
}, [client, coordinator]); | |
return { isConnected, isReady, client, result, schema, error }; | |
} | |
export function useDataClient<T extends Arrow.TypeMap = Arrow.TypeMap>( | |
props: UseDataClientProps<Arrow.Table | null, T>, | |
): UseDataClientResponse<Arrow.Table | null, T> { | |
return useCustomDataClient<Arrow.Table | null, T>( | |
props, | |
PASSTHROUGH_RESULT_PARSER, | |
null, | |
); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment