Skip to content

Instantly share code, notes, and snippets.

@tim-evans
Last active June 15, 2019 01:05
Show Gist options
  • Save tim-evans/d6b79d62c01793c3f119f67e2f6f4268 to your computer and use it in GitHub Desktop.
Save tim-evans/d6b79d62c01793c3f119f67e2f6f4268 to your computer and use it in GitHub Desktop.
import { Reducer, useReducer, useDebugValue } from 'react';
export type ParameterType<
T extends (...args: any[]) => any
> = T extends (...args: infer R) => any ? R : undefined;
type TaskRunner = (...args: any[]) => IterableIterator<any>;
const startTask = (task: TaskRunner, ...params: ParameterType<TaskRunner>) => {
return {
action: 'START_TASK',
task,
params
} as const;
}
const startWork = () => {
return {
action: 'START_WORK',
} as const;
}
const continueWork = (task: Task, value: any) => {
return {
action: 'CONTINUE_WORK',
task,
value
} as const;
}
const cancelTasks = (tasks: Task[]) => {
return {
action: 'CANCEL_TASKS',
tasks
} as const;
};
const trackWork = (tasks: Task[]) => {
return {
action: 'TRACK_WORK',
tasks
} as const;
};
type Actions = ReturnType<
typeof startTask |
typeof startWork |
typeof continueWork |
typeof cancelTasks |
typeof trackWork
>;
type TaskStatus = 'queued' | 'ready' | 'dropped' | 'working' | 'working*' | 'finishing' | 'finishing*' | 'cancelled' | 'finished';
interface Task {
run: TaskRunner;
params: ParameterType<TaskRunner>;
status: TaskStatus;
iterator?: ReturnType<TaskRunner>;
promise?: Promise<any>;
value?: any;
error?: Error;
}
interface State {
strategy: 'restart' | 'drop' | 'queue';
maxConcurrency: number;
tasks: Task[];
}
function compact(tasks: Task[]) {
let minimalTasks: Task[] = [];
for (let i = 0, len = tasks.length; i < len; i++) {
let task = tasks[i];
if (task.status !== 'dropped' &&
task.status !== 'cancelled' &&
task.status !== 'finished') {
minimalTasks.push(task);
} else if (i === len - 1) {
minimalTasks.push(task);
}
}
return minimalTasks;
}
const reducer: Reducer<State, Actions> = (state, action) => {
let runningTasks = state.tasks.filter(isRunning);
switch (action.action) {
case 'START_TASK': {
if (runningTasks.length === state.maxConcurrency) {
switch (state.strategy) {
case 'restart': {
let tasks = state.tasks.map(task => {
if (task === runningTasks[0]) {
if (task.status !== 'ready' &&
task.iterator && task.iterator.return) {
task.iterator.return();
}
return {
...task,
status: 'cancelled' as TaskStatus
}
}
return task;
});
return {
strategy: state.strategy,
maxConcurrency: state.maxConcurrency,
tasks: compact([...tasks, {
run: action.task,
params: action.params,
status: 'ready' as TaskStatus
}])
};
}
case 'drop': {
return {
strategy: state.strategy,
maxConcurrency: state.maxConcurrency,
tasks: [...state.tasks, {
run: action.task,
params: action.params,
status: 'dropped' as TaskStatus
}]
};
}
case 'queue': {
return {
strategy: state.strategy,
maxConcurrency: state.maxConcurrency,
tasks: [...state.tasks, {
run: action.task,
params: action.params,
status: 'queued' as TaskStatus
}]
};
}
}
}
// We can start running the task
return {
strategy: state.strategy,
maxConcurrency: state.maxConcurrency,
tasks: [...state.tasks, {
run: action.task,
params: action.params,
status: 'ready' as TaskStatus
}]
};
}
case 'START_WORK': {
let readySlots = state.strategy === 'queue'
? state.maxConcurrency - runningTasks.length
: Infinity;
let tasks = compact(state.tasks).map(task => {
let { status, iterator, value, ...rest } = task;
if (task.status === 'ready') {
let iterator = task.run(...rest.params);
let next = iterator.next();
return {
...rest,
status: (next.done ? 'finishing' : 'working') as TaskStatus,
promise: Promise.resolve(next.value),
iterator
};
} else if (task.status === 'queued' && readySlots > 0) {
readySlots -= 1;
let iterator = task.run(...rest.params);
let next = iterator.next();
return {
...rest,
status: (next.done ? 'finishing' : 'working') as TaskStatus,
promise: Promise.resolve(next.value),
iterator
};
}
return task;
});
return {
strategy: state.strategy,
maxConcurrency: state.maxConcurrency,
tasks
};
}
case 'CONTINUE_WORK': {
let tasks = compact(state.tasks).map(task => {
if (task.iterator === action.task.iterator) {
// The only valid state is from working*
if (task.status === 'working*' && task.iterator) {
let { promise, ...rest } = task;
let next = task.iterator.next(action.value);
return {
...rest,
status: (next.done ? 'finishing' : 'working') as TaskStatus,
promise: Promise.resolve(next.value)
};
} else if (task.status === 'finishing*') {
return {
...task,
value: task.value,
status: 'finished' as TaskStatus
};
}
}
return task;
});
return {
strategy: state.strategy,
maxConcurrency: state.maxConcurrency,
tasks
};
}
case 'CANCEL_TASKS': {
let tasks = compact(state.tasks).map(task => {
if (action.tasks.indexOf(task) !== -1) {
if (task.status === 'ready' || task.status === 'queued') {
return {
...task,
status: 'dropped' as TaskStatus
};
} else if (task.status === 'working*' || task.status === 'finishing*') {
if (task.iterator && task.iterator.return) {
task.iterator.return();
}
return {
...task,
status: 'cancelled' as TaskStatus
};
}
}
return task;
});
return {
strategy: state.strategy,
maxConcurrency: state.maxConcurrency,
tasks
};
}
case 'TRACK_WORK': {
let tasks = compact(state.tasks).map(task => {
if (action.tasks.indexOf(task) !== -1) {
let status = task.status;
if (status === 'finishing') {
status = 'finishing*';
} else if (status === 'working') {
status = 'working*';
}
return {
...task,
status: status as TaskStatus
};
}
return task;
});
return {
strategy: state.strategy,
maxConcurrency: state.maxConcurrency,
tasks
};
}
}
return state;
};
function isRunning(task: Task) {
return task.status === 'ready' ||
task.status === 'working' ||
task.status === 'working*' ||
task.status === 'finishing' ||
task.status === 'finishing*';
}
interface TaskAPI {
status: 'queued' | 'dropped' | 'running' | 'cancelled' | 'finished';
value?: any;
error?: Error;
cancel(): void;
}
export function useTask<T extends TaskRunner>(
task: T,
options?: {
strategy: 'restart' | 'drop' | 'queue';
maxConcurrency?: number;
}
): [
{
isRunning: boolean;
last: TaskAPI | null;
},
(...args: ParameterType<T>) => void
] {
let maxConcurrency = Infinity;
if (options) {
maxConcurrency = options.maxConcurrency || 1;
}
let [state, dispatch] = useReducer(reducer, {
strategy: options ? options.strategy : 'restart',
maxConcurrency,
tasks: []
});
// Find all working tasks and schedule more work to be done!
let tracked: Task[] = [];
state.tasks.filter(task => ['working', 'finishing'].indexOf(task.status) !== -1).forEach(task => {
if (task.promise) {
tracked.push(task);
task.promise.then(result => {
dispatch(continueWork(task, result));
});
}
});
if (tracked.length > 0) {
dispatch(trackWork(tracked));
}
let runningTasks = state.tasks.filter(isRunning);
let areAnyReady = state.tasks.some(task => task.status === 'ready');
let areAnyQueued = state.tasks.some(task => task.status === 'queued');
let areAnyRunning = runningTasks.length > 0;
// Do any work that's necessary
if (areAnyReady || (areAnyQueued && runningTasks.length < maxConcurrency)) {
dispatch(startWork());
}
useDebugValue(areAnyRunning ? 'Running' : 'Idle');
const run = (...args: ParameterType<typeof task>) => {
dispatch(startTask(task, ...args));
};
let lastTask = state.tasks[state.tasks.length - 1];
let last = lastTask ? {
status: isRunning(lastTask) ? 'running' : lastTask.status,
value: lastTask.value,
error: lastTask.error,
cancel() {
dispatch(cancelTasks([lastTask]));
}
} as TaskAPI : null;
return [{
isRunning: areAnyRunning,
last
}, run];
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment