Created
February 6, 2018 03:52
-
-
Save schmidt-sebastian/69aecafd8adffa87d369926dacf070f5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright 2017 Google Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
import { assert, fail } from './assert'; | |
import * as log from './log'; | |
import { AnyDuringMigration, AnyJs } from './misc'; | |
import { Deferred, CancelablePromise } from './promise'; | |
import { Code, FirestoreError } from './error'; | |
// tslint:disable-next-line:no-any Accept any return type from setTimeout(). | |
type TimerHandle = any; | |
/** | |
* Represents an operation scheduled to be run in the future on an AsyncQueue. | |
* | |
* It is created via DelayedOperation.createAndSchedule(). | |
* | |
* Supports cancellation (via cancel()) and early execution (via skipDelay()). | |
*/ | |
class DelayedOperation<T> implements CancelablePromise<T> { | |
// handle for use with clearTimeout(), or null if the operation has been | |
// executed or canceled already. | |
private timerHandle: TimerHandle | null; | |
private readonly deferred = new Deferred<T>(); | |
private constructor( | |
private op: () => Promise<T> | |
) {} | |
/** | |
* Creates and returns a DelayedOperation that has been scheduled to be | |
* executed on the provided asyncQueue after the provided delayMs. | |
*/ | |
static createAndSchedule<T>( | |
op: () => Promise<T>, | |
delayMs: number | |
) { | |
const delayedOp = new DelayedOperation(op); | |
delayedOp.start(delayMs); | |
return delayedOp; | |
} | |
private start(delayMs:number) : void { | |
this.timerHandle = setTimeout(() => this.runImmediately()); | |
} | |
/** | |
* Queues the operation to runImmediately immediately (if it hasn't already been runImmediately or | |
* canceled). | |
*/ | |
runImmediately(): Promise<void> { | |
if (this.timerHandle !== null) { | |
this.clearTimeout(); | |
return this.op().then(result => { | |
return this.deferred.resolve(result); | |
}); | |
} | |
} | |
/** | |
* Cancels the operation if it hasn't already been executed or canceled. The | |
* promise will be rejected. | |
*/ | |
cancel(reason?: string): void { | |
if (this.timerHandle !== null) { | |
this.clearTimeout(); | |
this.deferred.reject( | |
new FirestoreError( | |
Code.CANCELLED, | |
'Operation cancelled' + (reason ? ': ' + reason : '') | |
) | |
); | |
} | |
} | |
// Promise implementation. | |
readonly [Symbol.toStringTag]: 'Promise'; | |
then = this.deferred.promise.then.bind(this.deferred.promise); | |
catch = this.deferred.promise.catch.bind(this.deferred.promise); | |
private clearTimeout() { | |
if (this.timerHandle) { | |
clearTimeout(this.timerHandle); | |
this.timerHandle = null; | |
} | |
} | |
} | |
export class AsyncQueue { | |
// The last promise in the queue. | |
private tail: Promise<AnyJs | void> = Promise.resolve(); | |
// A list with timeout handles and their respective deferred promises. | |
// Contains an entry for each operation that is queued to runImmediately in the future | |
// (i.e. it has a delay that has not yet elapsed). | |
private delayedOperations: Array<DelayedOperation<AnyJs>> = []; | |
// The number of operations that are queued to be runImmediately in the future (i.e. they | |
// have a delay that has not yet elapsed). Used for testing. | |
get delayedOperationsCount() { | |
return this.delayedOperations.length; | |
} | |
// visible for testing | |
failure: Error; | |
// Flag set while there's an outstanding AsyncQueue operation, used for | |
// assertion sanity-checks. | |
private operationInProgress = false; | |
/** | |
* Adds a new operation to the queue. Returns a promise that will be resolved | |
* when the promise returned by the new operation is (with its value). | |
*/ | |
schedule<T>(op: () => Promise<T>): Promise<T>; | |
schedule<T>(op: () => Promise<T>, delayMs: number): CancelablePromise<T>; | |
schedule<T>(op: () => Promise<T>, delayMs?: number): Promise<T> { | |
this.verifyNotFailed(); | |
if (delayMs !== undefined) { | |
return this.runImmediately(op); | |
} else { | |
const delayedOp = DelayedOperation.createAndSchedule(() => this.runImmediately(op), delayMs); | |
this.delayedOperations.push(delayedOp); | |
delayedOp.catch(err => {}).then(() => { | |
// NOTE: indexOf / slice are O(n), but delayedOperations is expected to be small. | |
const index = this.delayedOperations.indexOf(delayedOp); | |
assert(index >= 0, 'Delayed operation not found.'); | |
this.delayedOperations.slice(index, 1); | |
}); | |
return delayedOp; | |
} | |
} | |
private runImmediately<T>(op: () => Promise<T>) : Promise<T> { | |
const newTail = this.tail.then(() => { | |
this.operationInProgress = true; | |
return op() | |
.catch(error => { | |
this.failure = error; | |
this.operationInProgress = false; | |
const message = error.stack || error.message || ''; | |
log.error('INTERNAL UNHANDLED ERROR: ', message); | |
// Escape the promise chain and throw the error globally so that | |
// e.g. any global crash reporting library detects and reports it. | |
// (but not for simulated errors in our tests since this breaks mocha) | |
if (message.indexOf('Firestore Test Simulated Error') < 0) { | |
setTimeout(() => { | |
throw error; | |
}, 0); | |
} | |
// Re-throw the error so that this.tail becomes a rejected Promise and | |
// all further attempts to chain (via .then) will just short-circuit | |
// and return the rejected Promise. | |
throw error; | |
}) | |
.then(result => { | |
this.operationInProgress = false; | |
return result; | |
}); | |
}); | |
this.tail = newTail; | |
return newTail; | |
} | |
private verifyNotFailed(): void { | |
if (this.failure) { | |
fail( | |
'AsyncQueue is already failed: ' + | |
(this.failure.stack || this.failure.message) | |
); | |
} | |
} | |
/** | |
* Verifies there's an operation currently in-progress on the AsyncQueue. | |
* Unfortunately we can't verify that the running code is in the promise chain | |
* of that operation, so this isn't a foolproof check, but it should be enough | |
* to catch some bugs. | |
*/ | |
verifyOperationInProgress(): void { | |
assert( | |
this.operationInProgress, | |
'verifyOpInProgress() called when no op in progress on this queue.' | |
); | |
} | |
/** | |
* Waits until all currently scheduled tasks are finished executing. Tasks | |
* scheduled with a delay can be rejected or queued for immediate execution. | |
*/ | |
drain(executeDelayedTasks: boolean): Promise<void> { | |
this.delayedOperations.forEach(delayedOp => { | |
if (executeDelayedTasks) { | |
delayedOp.runImmediately(); | |
} else { | |
delayedOp.cancel('shutdown'); | |
} | |
}); | |
return this.schedule(() => Promise.resolve()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment