Created
June 26, 2020 06:18
-
-
Save schmidt-sebastian/702ba33ce0c3515d096089226a762934 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/dev/src/bulk-writer.ts b/dev/src/bulk-writer.ts | |
index ee09c4c..5792274 100644 | |
--- a/dev/src/bulk-writer.ts | |
+++ b/dev/src/bulk-writer.ts | |
@@ -82,15 +82,12 @@ class BulkCommitBatch { | |
*/ | |
state = BatchState.OPEN; | |
- // The set of document reference paths present in the WriteBatch. | |
- readonly docPaths = new Set<string>(); | |
- | |
// A deferred promise that is resolved after the batch has been sent, and a | |
// response is received. | |
private completedDeferred = new Deferred<void>(); | |
// A map from each WriteBatch operation to its corresponding result. | |
- private resultsMap = new Map<number, Deferred<BatchWriteResult>>(); | |
+ private pendingOps = new Map<string, Deferred<BatchWriteResult>>(); | |
private readonly backoff: ExponentialBackoff; | |
@@ -106,7 +103,7 @@ class BulkCommitBatch { | |
* The number of writes in this batch. | |
*/ | |
get opCount(): number { | |
- return this.resultsMap.size; | |
+ return this.pendingOps.size; | |
} | |
/** | |
@@ -183,16 +180,15 @@ class BulkCommitBatch { | |
documentRef: firestore.DocumentReference<T> | |
): Promise<WriteResult> { | |
assert( | |
- !this.docPaths.has(documentRef.path), | |
+ !this.pendingOps.has(documentRef.path), | |
'Batch should not contain writes to the same document' | |
); | |
assert( | |
this.state === BatchState.OPEN, | |
'Batch should be OPEN when adding writes' | |
); | |
- this.docPaths.add(documentRef.path); | |
const deferred = new Deferred<BatchWriteResult>(); | |
- this.resultsMap.set(this.opCount, deferred); | |
+ this.pendingOps.set(documentRef.path, deferred); | |
if (this.opCount === this.maxBatchSize) { | |
this.state = BatchState.READY_TO_SEND; | |
@@ -224,46 +220,34 @@ class BulkCommitBatch { | |
// Capture the error stack to preserve stack tracing across async calls. | |
const stack = Error().stack!; | |
- let originalIndexMap: Map<number, number> = new Map( | |
- Array.from(new Array(this.opCount), (_, i) => [i, i]) | |
- ); | |
- | |
- let retryAttempts = 0; | |
- while (retryAttempts < MAX_RETRY_ATTEMPTS) { | |
- let retryIndexes: number[] = []; | |
+ for (let attempt = 0; attempt < MAX_RETRY_ATTEMPTS; ++attempt) { | |
await this.backoff.backoffAndWait(); | |
+ | |
+ let results: BatchWriteResult[]; | |
try { | |
- const results = await this.writeBatch.bulkCommit(); | |
- retryIndexes = this.processResults(originalIndexMap, results); | |
+ results = await this.writeBatch.bulkCommit(); | |
} catch (err) { | |
- retryIndexes = this.processResults( | |
- originalIndexMap, | |
- [], | |
- wrapError(err, stack) | |
- ); | |
+ results = [...this.pendingOps.keys()].map(key => { | |
+ return { key, writeTime: null, status: wrapError(err, stack) }; | |
+ }); | |
} | |
- // Map the indexes that are going to be retried back to their | |
- // corresponding indexes in the original batch. | |
- originalIndexMap = new Map( | |
- Array.from(new Array(retryIndexes.length), (_, i) => [ | |
- i, | |
- originalIndexMap.get(retryIndexes[i])!, | |
- ]) | |
- ); | |
- if (retryIndexes.length > 0) { | |
+ this.processResults(results); | |
+ | |
+ if (this.pendingOps.size > 0) { | |
logger( | |
'BulkWriter.bulkCommit', | |
null, | |
- `Current batch failed at retry #${retryAttempts}. Num failures: ` + | |
- `${retryIndexes.length}.` | |
+ `Current batch failed at retry #${attempt}. Num failures: ` + | |
+ `${this.pendingOps.size}.` | |
); | |
- | |
- this.writeBatch = this.writeBatch._sliceIndexes(retryIndexes); | |
+ this.writeBatch = new WriteBatch(this.firestore, this.writeBatch, [ | |
+ ...this.pendingOps.keys(), | |
+ ]); | |
} else { | |
+ this.completedDeferred.resolve(); | |
break; | |
} | |
- retryAttempts++; | |
} | |
} | |
@@ -274,37 +258,16 @@ class BulkCommitBatch { | |
* on resultsMap | |
* @return The indexes of writes that failed with a retryable error. | |
*/ | |
- processResults( | |
- originalIndexMap: Map<number, number>, | |
- results: BatchWriteResult[], | |
- error?: Error | |
- ): number[] { | |
- const indexesToRetry: number[] = []; | |
- if (error) { | |
- results = Array.from({length: originalIndexMap.size}, () => { | |
- return { | |
- writeTime: null, | |
- status: error, | |
- }; | |
- }); | |
- } | |
- | |
- for (let i = 0; i < results.length; i++) { | |
- const writeResult = results[i]; | |
- const originalIndex = originalIndexMap.get(i)!; | |
- if (writeResult.status.code === Status.OK) { | |
- this.resultsMap.get(originalIndex)!.resolve(results[i]); | |
- } else if (this.shouldRetry(writeResult.status.code)) { | |
- indexesToRetry.push(i); | |
- } else { | |
- this.resultsMap.get(originalIndex)!.reject(writeResult.status); | |
+ processResults(results: BatchWriteResult[]): void { | |
+ for (const result of results) { | |
+ if (result.status.code === Status.OK) { | |
+ this.pendingOps.get(result.key)!.resolve(result); | |
+ this.pendingOps.delete(result.key); | |
+ } else if (!this.shouldRetry(result.status.code)) { | |
+ this.pendingOps.get(result.key)!.reject(result.status); | |
+ this.pendingOps.delete(result.key); | |
} | |
} | |
- | |
- if (indexesToRetry.length === 0) { | |
- this.completedDeferred.resolve(); | |
- } | |
- return indexesToRetry; | |
} | |
private shouldRetry(code: Status | undefined): boolean { | |
@@ -326,6 +289,17 @@ class BulkCommitBatch { | |
this.state = BatchState.READY_TO_SEND; | |
} | |
} | |
+ | |
+ has(path: string) { | |
+ for (const [key, _] of this.pendingOps) { | |
+ if (key == path) return true; | |
+ } | |
+ return false; | |
+ } | |
+ | |
+ keys() { | |
+ return this.pendingOps.keys(); | |
+ } | |
} | |
/** | |
@@ -646,10 +620,7 @@ export class BulkWriter { | |
): BulkCommitBatch { | |
if (this.batchQueue.length > 0) { | |
const lastBatch = this.batchQueue[this.batchQueue.length - 1]; | |
- if ( | |
- lastBatch.state === BatchState.OPEN && | |
- !lastBatch.docPaths.has(ref.path) | |
- ) { | |
+ if (lastBatch.state === BatchState.OPEN && !lastBatch.has(ref.path)) { | |
return lastBatch; | |
} | |
} | |
@@ -743,11 +714,11 @@ export class BulkWriter { | |
return false; | |
} | |
- for (const path of batch.docPaths) { | |
+ for (const path of batch.keys()) { | |
const isRefInFlight = | |
this.batchQueue | |
.filter(batch => batch.state === BatchState.SENT) | |
- .find(batch => batch.docPaths.has(path)) !== undefined; | |
+ .find(batch => batch.has(path)) !== undefined; | |
if (isRefInFlight) { | |
// eslint-disable-next-line no-console | |
console.warn( | |
diff --git a/dev/src/write-batch.ts b/dev/src/write-batch.ts | |
index bc3a65b..6e61991 100644 | |
--- a/dev/src/write-batch.ts | |
+++ b/dev/src/write-batch.ts | |
@@ -104,6 +104,7 @@ export class WriteResult implements firestore.WriteResult { | |
*/ | |
export class BatchWriteResult { | |
constructor( | |
+ readonly key: string, | |
readonly writeTime: Timestamp | null, | |
readonly status: GoogleError | |
) {} | |
@@ -132,7 +133,7 @@ export class WriteBatch implements firestore.WriteBatch { | |
* resulting `api.IWrite` will be sent to the backend. | |
* @private | |
*/ | |
- private _ops: Array<PendingWriteOp> = []; | |
+ private _ops: Array<{key: string; op: PendingWriteOp}> = []; | |
private _committed = false; | |
@@ -141,10 +142,26 @@ export class WriteBatch implements firestore.WriteBatch { | |
* | |
* @param firestore The Firestore Database client. | |
*/ | |
- constructor(firestore: Firestore) { | |
+ constructor(firestore: Firestore); | |
+ constructor( | |
+ firestore: Firestore, | |
+ retryBatch: WriteBatch, | |
+ keysToRetry: string[] | |
+ ); | |
+ constructor( | |
+ firestore: Firestore, | |
+ retryBatch?: WriteBatch, | |
+ keysToRetry?: string[] | |
+ ) { | |
this._firestore = firestore; | |
this._serializer = new Serializer(firestore); | |
this._allowUndefined = !!firestore._settings.ignoreUndefinedProperties; | |
+ | |
+ if (retryBatch) { | |
+ this._ops = retryBatch._ops.filter( | |
+ v => keysToRetry!.indexOf(v.key) !== -1 | |
+ ); | |
+ } | |
} | |
/** | |
@@ -214,7 +231,7 @@ export class WriteBatch implements firestore.WriteBatch { | |
return write; | |
}; | |
- this._ops.push(op); | |
+ this._ops.push({key: documentRef.path, op}); | |
return this; | |
} | |
@@ -261,7 +278,7 @@ export class WriteBatch implements firestore.WriteBatch { | |
return write; | |
}; | |
- this._ops.push(op); | |
+ this._ops.push({key: documentRef.path, op}); | |
return this; | |
} | |
@@ -362,7 +379,7 @@ export class WriteBatch implements firestore.WriteBatch { | |
return write; | |
}; | |
- this._ops.push(op); | |
+ this._ops.push({key: documentRef.path, op}); | |
return this; | |
} | |
@@ -518,7 +535,7 @@ export class WriteBatch implements firestore.WriteBatch { | |
return write; | |
}; | |
- this._ops.push(op); | |
+ this._ops.push({key: documentRef.path, op}); | |
return this; | |
} | |
@@ -566,7 +583,7 @@ export class WriteBatch implements firestore.WriteBatch { | |
const database = this._firestore.formattedName; | |
const request: api.IBatchWriteRequest = { | |
database, | |
- writes: this._ops.map(op => op()), | |
+ writes: this._ops.map(op => op.op()), | |
}; | |
const retryCodes = getRetryCodes('batchWrite'); | |
@@ -589,23 +606,8 @@ export class WriteBatch implements firestore.WriteBatch { | |
error.code === Status.OK | |
? Timestamp.fromProto(result.updateTime || DELETE_TIMESTAMP_SENTINEL) | |
: null; | |
- return new BatchWriteResult(updateTime, error); | |
- }); | |
- } | |
- | |
- /** | |
- * Creates a new WriteBatch containing only the operations located at the | |
- * provided indexes. | |
- * | |
- * @param indexes List of operation indexes to keep | |
- * @private | |
- */ | |
- _sliceIndexes(indexes: number[]): WriteBatch { | |
- const writeBatch = new WriteBatch(this._firestore); | |
- writeBatch._ops = this._ops.filter((_, i) => { | |
- return indexes.includes(i); | |
+ return new BatchWriteResult(this._ops[i].key, updateTime, error); | |
}); | |
- return writeBatch; | |
} | |
/** | |
@@ -633,7 +635,7 @@ export class WriteBatch implements firestore.WriteBatch { | |
const request: api.ICommitRequest = { | |
database, | |
- writes: this._ops.map(op => op()), | |
+ writes: this._ops.map(op => op.op()), | |
}; | |
if (commitOptions?.transactionId) { | |
request.transaction = commitOptions.transactionId; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment