-
-
Save alpgul/f1bd5ca5d265015800010109587a5078 to your computer and use it in GitHub Desktop.
TypeScript Node.js/Browser Mutex + Read-Write Mutex using Atomics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AsyncLock { | |
private _lock = new Int32Array(new SharedArrayBuffer(4)) // SharedArrayBuffer for multi-threading, 4 bytes for 32-bit integer | |
static INDEX = 0 | |
static UNLOCKED = 0 | |
static LOCKED = 1 | |
lock() { | |
while (true) { | |
console.log("lock") | |
const oldValue = Atomics.compareExchange( | |
this._lock, | |
AsyncLock.INDEX, | |
/* old value >>> */ AsyncLock.UNLOCKED, | |
/* new value >>> */ AsyncLock.LOCKED | |
) | |
if (oldValue == AsyncLock.UNLOCKED) { | |
return | |
} | |
Atomics.wait(this._lock, AsyncLock.INDEX, AsyncLock.LOCKED) // <<< expected value at start | |
} | |
} | |
unlock() { | |
console.log("unlock") | |
const oldValue = Atomics.compareExchange( | |
this._lock, | |
AsyncLock.INDEX, | |
/* old value >>> */ AsyncLock.LOCKED, | |
/* new value >>> */ AsyncLock.UNLOCKED | |
) | |
if (oldValue != AsyncLock.LOCKED) { | |
throw new Error("Tried to unlock while not holding the mutex") | |
} | |
Atomics.notify(this._lock, AsyncLock.INDEX, 1) | |
} | |
executeLocked(fn: () => void): Promise<"ok" | "not-equal" | "timed-out" | undefined> { | |
const self = this | |
async function tryGetLock() { | |
while (true) { | |
const oldValue = Atomics.compareExchange( | |
self._lock, | |
AsyncLock.INDEX, | |
/* old value >>> */ AsyncLock.UNLOCKED, | |
/* new value >>> */ AsyncLock.LOCKED | |
) | |
if (oldValue == AsyncLock.UNLOCKED) { | |
fn() | |
self.unlock() | |
return | |
} | |
// @ts-expect-error | |
const result = Atomics.waitAsync(self._lock, AsyncLock.INDEX, AsyncLock.LOCKED) | |
// ^ expected value at start | |
return await result.value | |
} | |
} | |
return tryGetLock() | |
} | |
} | |
class AsyncReadWriteLock { | |
private _lock = new Int32Array(new SharedArrayBuffer(4)) // SharedArrayBuffer for multi-threading, 4 bytes for 32-bit integer | |
static INDEX = 0 | |
static UNLOCKED = 0 | |
static WRITER_LOCKED = -1 | |
static READERS_UPDATE = 2 | |
async lockRead() { | |
while (true) { | |
const lockState = Atomics.load(this._lock, AsyncReadWriteLock.INDEX) | |
if (lockState == AsyncReadWriteLock.UNLOCKED) { | |
const newLockState = lockState + AsyncReadWriteLock.READERS_UPDATE | |
const oldValue = Atomics.compareExchange(this._lock, AsyncReadWriteLock.INDEX, lockState, newLockState) | |
if (oldValue == lockState) { | |
return | |
} | |
} | |
// @ts-expect-error | |
await Atomics.waitAsync(this._lock, AsyncReadWriteLock.INDEX, lockState) | |
} | |
} | |
async lockWrite() { | |
while (true) { | |
const lockState = Atomics.load(this._lock, AsyncReadWriteLock.INDEX) | |
if (lockState == AsyncReadWriteLock.UNLOCKED) { | |
const oldValue = Atomics.compareExchange( | |
this._lock, | |
AsyncReadWriteLock.INDEX, | |
AsyncReadWriteLock.UNLOCKED, | |
AsyncReadWriteLock.WRITER_LOCKED | |
) | |
if (oldValue == AsyncReadWriteLock.UNLOCKED) { | |
return | |
} | |
} | |
// @ts-expect-error | |
await Atomics.waitAsync(this._lock, AsyncReadWriteLock.INDEX, lockState) | |
} | |
} | |
unlockRead() { | |
const lockState = Atomics.sub(this._lock, AsyncReadWriteLock.INDEX, AsyncReadWriteLock.READERS_UPDATE) | |
if (lockState < AsyncReadWriteLock.READERS_UPDATE) { | |
throw new Error("Tried to unlock read lock while not holding the lock") | |
} | |
Atomics.notify(this._lock, AsyncReadWriteLock.INDEX, 1) | |
} | |
unlockWrite() { | |
const oldValue = Atomics.compareExchange( | |
this._lock, | |
AsyncReadWriteLock.INDEX, | |
AsyncReadWriteLock.WRITER_LOCKED, | |
AsyncReadWriteLock.UNLOCKED | |
) | |
if (oldValue != AsyncReadWriteLock.WRITER_LOCKED) { | |
throw new Error("Tried to unlock write lock while not holding the lock") | |
} | |
Atomics.notify(this._lock, AsyncReadWriteLock.INDEX, 1) | |
} | |
async executeReadLocked(fn: () => Promise<void>): Promise<"ok" | "not-equal" | "timed-out" | undefined> { | |
const self = this | |
async function tryGetReadLock() { | |
while (true) { | |
const lockState = Atomics.load(self._lock, AsyncReadWriteLock.INDEX) | |
if (lockState >= AsyncReadWriteLock.UNLOCKED) { | |
const newLockState = lockState + AsyncReadWriteLock.READERS_UPDATE | |
const oldValue = Atomics.compareExchange( | |
self._lock, | |
AsyncReadWriteLock.INDEX, | |
lockState, | |
newLockState | |
) | |
if (oldValue == lockState) { | |
await fn() | |
self.unlockRead() | |
return | |
} | |
} | |
// @ts-expect-error | |
const result = Atomics.waitAsync(self._lock, AsyncReadWriteLock.INDEX, lockState) | |
return await result.value | |
} | |
} | |
return tryGetReadLock() | |
} | |
async executeWriteLocked(fn: () => Promise<void>): Promise<"ok" | "not-equal" | "timed-out" | undefined> { | |
const self = this | |
async function tryGetWriteLock() { | |
while (true) { | |
const lockState = Atomics.load(self._lock, AsyncReadWriteLock.INDEX) | |
if (lockState == AsyncReadWriteLock.UNLOCKED) { | |
const oldValue = Atomics.compareExchange( | |
self._lock, | |
AsyncReadWriteLock.INDEX, | |
AsyncReadWriteLock.UNLOCKED, | |
AsyncReadWriteLock.WRITER_LOCKED | |
) | |
if (oldValue == AsyncReadWriteLock.UNLOCKED) { | |
await fn() | |
self.unlockWrite() | |
return | |
} | |
} | |
// @ts-expect-error | |
const result = Atomics.waitAsync(self._lock, AsyncReadWriteLock.INDEX, lockState) | |
return await result.value | |
} | |
} | |
return tryGetWriteLock() | |
} | |
} | |
class ReadWriteLock { | |
private lock = new Int32Array(new SharedArrayBuffer(4)) // SharedArrayBuffer for multi-threading, 4 bytes for 32-bit integer | |
private writers = 0 | |
private readers = 0 | |
private writerQueue = 0 | |
private readerQueue = 0 | |
public async lockRead() { | |
Atomics.add(this.lock, 0, 1) | |
if (this.writers > 0 || this.writerQueue > 0) { | |
this.readerQueue++ | |
while (this.writers > 0 || this.writerQueue > 0) { | |
await new Promise((resolve) => setImmediate(resolve)) | |
} | |
this.readerQueue-- | |
} | |
this.readers++ | |
Atomics.sub(this.lock, 0, 1) | |
} | |
public unlockRead() { | |
Atomics.add(this.lock, 0, 1) | |
this.readers-- | |
Atomics.sub(this.lock, 0, 1) | |
} | |
public async lockWrite() { | |
Atomics.add(this.lock, 0, 1) | |
if (this.readers > 0 || this.writers > 0) { | |
this.writerQueue++ | |
while (this.readers > 0 || this.writers > 0) { | |
await new Promise((resolve) => setImmediate(resolve)) | |
} | |
this.writerQueue-- | |
} | |
this.writers++ | |
Atomics.sub(this.lock, 0, 1) | |
} | |
public unlockWrite() { | |
Atomics.add(this.lock, 0, 1) | |
this.writers-- | |
Atomics.sub(this.lock, 0, 1) | |
} | |
} | |
import { describe, it } from "node:test" | |
describe("ReadWriteLock", () => { | |
it("should lock and unlock", async () => { | |
const lock = new ReadWriteLock() | |
await lock.lockRead() | |
lock.unlockRead() | |
await lock.lockWrite() | |
lock.unlockWrite() | |
}) | |
it("should lock and unlock multiple times in parallel", async () => { | |
const lock = new ReadWriteLock() | |
await Promise.all([ | |
(async () => { | |
await lock.lockRead() | |
lock.unlockRead() | |
console.log("Finished 1") | |
})(), | |
(async () => { | |
await lock.lockWrite() | |
lock.unlockWrite() | |
console.log("Finished 2") | |
})(), | |
]) | |
}) | |
}) | |
describe("AsyncReadWriteLock", () => { | |
it("should lock and unlock", async () => { | |
const lock = new AsyncReadWriteLock() | |
await lock.lockRead() | |
lock.unlockRead() | |
await lock.lockWrite() | |
lock.unlockWrite() | |
}) | |
it("should lock and unlock multiple times in parallel", async () => { | |
const lock = new AsyncReadWriteLock() | |
await Promise.all([ | |
(async () => { | |
await lock.lockRead() | |
lock.unlockRead() | |
console.log("Finished 1") | |
})(), | |
(async () => { | |
await lock.lockWrite() | |
lock.unlockWrite() | |
console.log("Finished 2") | |
})(), | |
]) | |
}) | |
it("should fail when unlocking read lock while not holding it", async () => { | |
const lock = new AsyncReadWriteLock() | |
try { | |
lock.unlockRead() | |
} catch (e) { | |
console.log("Failed as expected") | |
return | |
} | |
}) | |
it("should fail when unlocking write lock while not holding it", async () => { | |
const lock = new AsyncReadWriteLock() | |
try { | |
lock.unlockWrite() | |
} catch (e) { | |
console.log("Failed as expected") | |
return | |
} | |
}) | |
it("should fail when parallel locks have invalid order", async () => { | |
const lock = new AsyncReadWriteLock() | |
Promise.all([ | |
(async () => { | |
await lock.lockWrite() | |
})(), | |
(async () => { | |
await lock.unlockRead() | |
})(), | |
]).catch((e) => { | |
console.log("Failed as expected") | |
}) | |
}) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment