Skip to content

Instantly share code, notes, and snippets.

@DrSensor
Last active November 15, 2022 03:36
Weekend Project

code related to parallelization

  • main.ts - main logic that both run on main thread and each of worker thread
  • utils.ts

code related to visualization

import type { Neko as DesktopWindow } from "https://deno.land/x/neko/core/Neko.ts";
import type * as skia from "https://deno.land/x/skia_canvas/mod.ts";
export default class Canvas {
#canvas?: skia.Canvas | HTMLCanvasElement;
#window?: DesktopWindow;
#img?: typeof skia.Image;
#ctx?: Context2D;
static async new(
{ title, width, height } = {} as {
title?: string;
width: number;
height: number;
},
) {
if (!("window" in globalThis)) throw "Canvas must be in Main thread";
const self = new Canvas(Canvas._guard);
if ("Deno" in globalThis) {
const [{ Neko }, { createCanvas }, { Image }] = await Promise.all(
[
"neko/core/Neko.ts",
"skia_canvas/src/canvas.ts",
"skia_canvas/src/image.ts",
].map((mod) => import(`https://deno.land/x/${mod}`)),
);
self.#img = Image;
self.#canvas = createCanvas(width, height);
self.#window = new Neko({ title, width, height, resize: true });
} else {
document.body.append(
self.#canvas = (document.querySelector("canvas:not(#memviz)") ??
document.createElement("canvas")) as HTMLCanvasElement,
);
if (!self.#canvas.hasAttribute("width")) self.#canvas.width = width;
if (!self.#canvas.hasAttribute("height")) self.#canvas.height = height;
}
return self;
}
async loadImage(path: string | URL) {
if ("Deno" in globalThis) return this.#img!.load(path);
return createImageBitmap(await fetch(path).then((as) => as.blob()));
}
#cursor = { x: NaN, y: NaN };
#onmousemove?: (e: MouseEvent) => void;
get mouse() {
const p = this.#cursor;
if ("Deno" in globalThis) [p.x, p.y] = this.#window!.mousePosition;
else if (!this.#onmousemove) {
addEventListener(
"mousemove",
this.#onmousemove = (e) => (p.x = e.x, p.y = e.y),
);
}
return p;
}
reset() {
this.#ctx?.clearRect(0, 0, this.#canvas!.width, this.#canvas!.height);
this.#ctx?.resetTransform();
}
draw(fn: (ctx: Context2D) => void) {
this.reset();
fn(this.#ctx ??= this.getContext("2d"));
if ("Deno" in globalThis) {
this.#window!.setFrameBuffer(
(this.#canvas as skia.Canvas).pixels,
this.#canvas!.width,
this.#canvas!.height,
);
}
}
getContext(type: "2d") {
return this.#ctx ??= this.#canvas!.getContext(type, {
desynchronized: true,
})!;
}
private static readonly _guard = Symbol(); // to bad typesccrypt can't do typeof MemViz.#guard
constructor(guard: typeof Canvas._guard) {
if (guard !== Canvas._guard) throw null;
}
}
type Context2D = CanvasRenderingContext2D | skia.CanvasRenderingContext2D;
{
"compilerOptions": {
"lib": [
"dom",
"dom.iterable",
"dom.asynciterable",
"deno.ns",
"deno.unstable"
]
}
}
<canvas/>
<canvas id=memviz/>
#!/usr/bin/env -S deno run --allow-net --allow-env --allow-read --allow-write --unstable --allow-ffi
// Copyright 2023 Fahmi Akbar Wildana <sensorfied@gmail.com>
// SPDX-License-Identifier: BSD-2-Clause
import MemViz from "./memviz.ts";
import Canvas from "./canvas.ts";
import { footer, header } from "./string.ts";
import * as random from "./random.ts";
import { cpus } from "https://deno.land/std/node/os.ts";
import { loop, malloc, WorkerThread } from "./utils.ts";
/////////////////////////////////////////////////////////////////////////////👆 0
//👇 tweakable
const capacity = { min: 100, max: 200 },
instance = 10,
worker = cpus().length - 1;
class Position {
static TypedArray = Uint16Array;
static each = 2 * Position.TypedArray.BYTES_PER_ELEMENT; //👈
x: InstanceType<typeof Position.TypedArray>;
y: InstanceType<typeof Position.TypedArray>;
constructor(public cap: number) {
this.x = make(Position.TypedArray, this.cap);
this.y = make(Position.TypedArray, this.cap);
}
color = random.color();
avatar?: Awaited<ReturnType<Canvas["loadImage"]>> | true;
}
let make: ReturnType<typeof malloc>;
let buffer: ArrayBufferLike;
/////////////////////////////////////////////////////////////////////////////👆 1
const width = 800, height = 600, size = { particle: 2, avatar: 32 }; //👈 tweakable
const radius = 100;
const printMemoryLayout = true;
let zoo: Position[],
capacities: number[] | Uint8Array = [],
border: number,
inc = 0,
mouse = { x: width / 2, y: height / 2 },
isFollowCursor: true | undefined,
actor: WorkerThread | undefined;
const script = import.meta.url;
let actorID: number | undefined;
if ("window" in globalThis) {
const byteLength = Position.each * capacity.max * instance; //👈 console.group(...header("Main thread"));
console.time("full setup");
if (worker) {
actor = new WorkerThread(script, new SharedArrayBuffer(byteLength));
}
make = malloc(buffer = worker ? actor!.buffer : new ArrayBuffer(byteLength));
setup(); // create zoo
if (worker) { // tell worker about the memory layout of zoo
actor!.run(new Uint8Array(capacities = zoo!.map((it) => it.cap)).buffer);
}
console.timeEnd("full setup");
// setTimeout(() => actor!.relayMessage(["no shake"]), 1e2); // debug: freeze particle
console.time("prepare to draw");
const memviz = await MemViz.new(buffer);
const canvas = await Canvas.new({ width, height });
console.timeEnd("prepare to draw");
console.time("fetch avatar");
Promise.all(
zoo!.map((it, i) =>
canvas.loadImage(random.avatar.elonmusk()).then((img) => {
it.avatar = img;
incRect += 2;
actor.relayMessage(["avatar ready", i]);
})
),
).then(() => actor.relayMessage(["follow cursor"]));
console.timeEnd("fetch avatar");
console.groupEnd();
console.info(...footer("Main"));
loop(() => {
mouse = canvas.mouse;
if (worker) actor!.relayMessage(["mousemove", mouse]);
canvas.draw((ctx) => {
const m = mouse;
for (const { cap, x, y, color, avatar } of zoo) {
ctx.fillStyle = color;
for (let i = cap; i--;) {
border = avatar ? size.avatar : size.particle + inc;
const p = { x: Atomics.load(x, i), y: Atomics.load(y, i) };
ctx.translate(p.x, p.y);
if (avatar && !is(p).inside(radius, m)) {
ctx.rotate(
Math.atan2(m.y - (p.y + border / 2), m.x - (p.x + border / 2)),
);
ctx.drawImage(avatar, 0, 0, border, border);
} else {
ctx.fillRect(0, 0, border, border);
}
ctx.resetTransform();
}
}
});
memviz.draw();
});
} else {
console.group(...header("Worker thread"));
console.time("full setup");
const shared = await WorkerThread.selfSpawn(script, worker - 1);
actorID = shared.id;
make = malloc(buffer = shared.memory);
capacities = new Uint8Array(shared.layout);
setup(); // create zoo
console.timeEnd("full setup");
console.groupEnd();
console.info(...footer(`Worker[${actorID}]`));
shared.onrelay = (data) => {
if (!Array.isArray(data)) return;
switch (data[0]) {
case "mousemove":
mouse = data[1];
break;
case "no shake":
isFollowCursor = true;
break;
case "avatar ready":
border = size.avatar;
zoo[data[1]].avatar = true;
break;
}
};
loop(update);
}
/////////////////////////////////////////////////////////////////////////////👆 2
// WARNING: declaring global variables from here then capture it in setup() or update() will not works 😏
function setup() {
zoo = Array.from(
{ length: instance },
(_, i) =>
new Position(
"window" in globalThis
? random.int(capacity) //👈 set random capacity in the main thread
: capacities.at(i)!, //👈 set capactiy in the worker thread from data passed by the parent thread
),
); // NOTE: Position is like DataView for buffer but based on memory layout when in worker thread
// center all entities when setup in main thread
if ("window" in globalThis) {
for (const { x, y } of zoo) {
x.fill(width / 2);
y.fill(height / 2);
}
}
if (printMemoryLayout) printMemTable();
}
function update() {
const clamp = (...$: number[]) => Math.min(Math.max($[1], $[0]), $[2]);
const pad = border ??= size.particle * 2, vw = width - pad, vh = height - pad;
if (isFollowCursor) return; // avatar looking at mouse pointer while wiggle
for (const { cap, x, y, avatar } of zoo) {
// for (let i= cap - 1; i--;) {
let len = Math.ceil(cap / (worker || 1)) - 1;
for (const offset = len * (actorID ?? 0); len--;) { // eratic particles
const i = offset + len;
const p = { x: Atomics.load(x, i), y: Atomics.load(y, i) };
const inside = is(p).inside(radius, mouse);
if (avatar && !inside) continue; // freeze
// eratic particles
const rx = random.int(-inside, +inside);
const ry = random.int(-inside, +inside);
Atomics.store(x, i, clamp(pad, p.x + rx, vw));
Atomics.store(y, i, clamp(pad, p.y + ry, vh));
}
}
}
/////////////////////////////////////////////////////////////////////////////👆 3
function is(p: { x: number; y: number }) {
return {
inside: (r: number, m: { x: number; y: number }) =>
(p.x - m.x + r / 2) ** 2 + (p.y - m.y - r / 2) ** 2 < r ** 2,
};
}
function printMemTable() {
console.time("print memory layout");
const table = zoo.flatMap((
{ x, y, cap },
) => [{
malloc: cap * Position.each,
}, {
"|addr|👉": "|x|",
start: x.byteOffset,
end: x.byteOffset + x.byteLength,
}, {
"|addr|👉": "|y|",
start: y.byteOffset,
end: y.byteOffset + x.byteLength,
}]
), //@ts-ignore: typescript can't infer `group.malloc`
allocated = table.reduce((total, group) => total += group.malloc ?? 0, 0),
available = buffer.byteLength - allocated;
console.info("> every number are in bytes");
console.table(table);
console.table({ "memory buffer": buffer.byteLength, allocated, available });
console.timeEnd("print memory layout");
}
import type { Neko as DesktopWindow } from "https://deno.land/x/neko/core/Neko.ts";
export default class MemViz {
#width?: number;
#height?: number;
#framebuffer?: Uint8Array;
#window?: DesktopWindow;
static async new(buffer: ArrayBufferLike, title = "memviz") {
if (!("window" in globalThis)) throw "MemViz must be in Main thread";
const self = new MemViz(MemViz._guard);
self.#height = self.#width = Math.floor(Math.sqrt(buffer.byteLength) / 2);
if ("Deno" in globalThis) {
self.#window = new (await import("https://deno.land/x/neko/core/Neko.ts"))
.Neko({
width: self.#width,
height: self.#height,
title,
// resize: true,
topmost: true,
});
self.#framebuffer = new Uint8Array(buffer);
} else {
self.#img = new ImageData(
new Uint8ClampedArray(buffer),
self.#width,
self.#height,
);
const canvas: HTMLCanvasElement =
document.querySelector("canvas#memviz") ??
document.createElement("canvas");
self.#ctx = canvas.getContext("2d")!;
document.body.append(canvas);
}
return self;
}
draw() {
if ("Deno" in globalThis) {
this.#window!.setFrameBuffer(
this.#framebuffer!,
this.#width,
this.#height,
);
} else this.#ctx!.putImageData(this.#img!, 0, 0);
}
#ctx?: CanvasRenderingContext2D;
#img?: ImageData;
private static readonly _guard = Symbol(); // to bad typesccrypt can't do typeof MemViz.#guard
constructor(guard: typeof MemViz._guard) {
if (guard !== MemViz._guard) throw null;
}
}
// import {} from "https://esm.sh/@dice-roller/rpg-dice-roller";
import { faker } from "https://esm.sh/@faker-js/faker";
const { round, random: rand } = Math;
export function int(min: number, max: number): number;
export function int($: { min: number; max: number }): number;
export function int(
min: { min: number; max: number } | number,
max?: number,
) {
if (typeof min !== "number") (max = min.max, min = min.min);
return round(min + rand() * (max! - min));
}
export function color(c?: string): string {
const { color: { human } } = faker, { reserved } = color;
return reserved.has(c = human()) || c === "black"
? color(c)
: (reserved.add(c), c);
}
color.reserved = new Set<string>();
export function avatar(url?: string): string {
const { image: { avatar: ipfs } } = faker, { reserved } = avatar;
return reserved.has(url = ipfs()) ? avatar(url) : (reserved.add(url), url);
}
avatar.reserved = new Set<string>();
avatar.elonmusk = Object.assign(function (i?: number): string {
const { elonmusk } = avatar, { at, reserved } = elonmusk, rand = int;
return reserved.has(i = rand(0, hash.elonmusk.length - 1))
? elonmusk(i)
: (reserved.add(i), at(i));
}, {
reserved: new Set<number>(),
at: (index: number) => dir.elonmusk + hash.elonmusk[index] + "_400x400.jpg",
});
const hash = {
elonmusk: [
"3VYnFrzx",
"DgO3WL4S",
"FS8-70Ie",
"IY9Gx6Ok",
"Nyn1HZWF",
"RJ5EI7PS",
"Xz3fufYY",
"Y4s_eu5O",
"dHw9JcrK",
"foUrqiEw",
"itVSA7l0",
"mXIcYtIs",
"my_Sigxw",
],
};
const dir = {
elonmusk:
"https://cloudflare-ipfs.com/ipfs/QmXtX4hzEH3bbb69Sr2MD5B7GLCzktbK1mFG8hM39cWpuZ/",
};
const tcol = Deno.consoleSize().columns;
export const spacer = (n: number, char: string) =>
Array(Math.floor(n)).fill(char).join("");
export function breakline(open: string, title: string, close: string) {
const line = liner(spacer((tcol - title.length - 2) / 2, "─"));
return [line([open]), title, line([, close])];
}
export const liner = (sep: string) =>
(
[open = "", close = ""]: [open?: string, close?: string],
s = open || close,
) => open + sep.slice(0, -s.length) + close;
export const header = (title: string) => breakline("┌─", title, "─┐");
export const footer = (title: string) => breakline("└─", title, "─┘");
// Copyright 2023 Fahmi Akbar Wildana <sensorfied@gmail.com>
// SPDX-License-Identifier: BSD-2-Clause
export function malloc(buffer: ArrayBufferLike) {
let prevType: BYTES_PER_ELEMENT = 1, count = 0, prevSize = 0;
// IIFE
return <T extends TypedArrayConstructor>(TypedArray: T, size: number) =>
(($) => (prevType = $.BYTES_PER_ELEMENT, $ as InstanceType<T>))(
new TypedArray(buffer, prevType * prevSize * count++, prevSize = size), //👈
);
}
export class WorkerThread extends Worker {
constructor(
public url: string,
public buffer: SharedArrayBuffer,
public id: number = 0,
) {
super(url, { type: "module" });
this.postMessage([id, this.buffer]); // share memory buffer with the/another worker thread
}
/** tell worker to run while send info about the memory layout */
run(layout: ArrayBufferLike, config?: { copy: true }) {
this.postMessage(layout, config?.copy ? undefined : { transfer: [layout] });
}
relayMessage(message: unknown, options?: StructuredSerializeOptions) {
this.postMessage(["relay", message, ...options ? [options] : []]);
}
// concurrent? 🤔
static selfSpawn(url: string, limit = 1) {
return new Promise<MayRelay>((resolve) => {
const $ = {} as MayRelay;
self.addEventListener("message", ({ data }) => {
if (data[1] instanceof SharedArrayBuffer) {
[$.id, $.memory] = data;
if ($.id < limit) {
$.relay = new WorkerThread(url, $.memory, ($.id = +$.id) + 1);
}
} else if ($.memory && data instanceof ArrayBuffer) {
$.layout = data;
resolve($);
if ($.relay) $.relay.run($.layout, { copy: true });
} else if (($.relay || $.id === limit) && data[0] === "relay") {
$.onrelay?.(data[1]);
$.relay?.relayMessage(data[1], data[2]);
}
});
});
}
// await-ing will wait Shared memory before returning
static get self(): Promise<SharedData> {
return new Promise((resolve) => {
let memory: SharedArrayBuffer, id: number;
self.onmessage = ({ data }) => {
if (data[1] instanceof SharedArrayBuffer) [id, memory] = data;
else if (memory && data instanceof ArrayBuffer) {
resolve({ id, memory, layout: data });
}
};
});
}
}
export function loop(fn: () => void) {
if ("Deno" in globalThis) {
const chan = new MessageChannel();
chan.port1.postMessage(null);
chan.port2.onmessage = () => {
fn();
chan.port1.postMessage(null);
};
chan.port2.start();
} else {requestAnimationFrame(function loop() {
fn();
requestAnimationFrame(loop);
});}
}
interface MayRelay extends SharedData {
relay: WorkerThread;
onrelay?: (data: Parameters<typeof structuredClone>[0]) => void;
}
interface SharedData {
id: number;
memory: SharedArrayBuffer;
layout: ArrayBuffer;
}
type BYTES_PER_ELEMENT = number;
type TypedArrayConstructor =
| Uint8ClampedArrayConstructor
| Uint8ArrayConstructor
| Uint16ArrayConstructor
| Uint32ArrayConstructor
| BigUint64ArrayConstructor
| Int8ArrayConstructor
| Int16ArrayConstructor
| Int32ArrayConstructor
| BigInt64ArrayConstructor
| Float32ArrayConstructor
| Float64ArrayConstructor;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment