Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save schmidt-sebastian/702ba33ce0c3515d096089226a762934 to your computer and use it in GitHub Desktop.
Save schmidt-sebastian/702ba33ce0c3515d096089226a762934 to your computer and use it in GitHub Desktop.
diff --git a/dev/src/bulk-writer.ts b/dev/src/bulk-writer.ts
index ee09c4c..5792274 100644
--- a/dev/src/bulk-writer.ts
+++ b/dev/src/bulk-writer.ts
@@ -82,15 +82,12 @@ class BulkCommitBatch {
*/
state = BatchState.OPEN;
- // The set of document reference paths present in the WriteBatch.
- readonly docPaths = new Set<string>();
-
// A deferred promise that is resolved after the batch has been sent, and a
// response is received.
private completedDeferred = new Deferred<void>();
// A map from each WriteBatch operation to its corresponding result.
- private resultsMap = new Map<number, Deferred<BatchWriteResult>>();
+ private pendingOps = new Map<string, Deferred<BatchWriteResult>>();
private readonly backoff: ExponentialBackoff;
@@ -106,7 +103,7 @@ class BulkCommitBatch {
* The number of writes in this batch.
*/
get opCount(): number {
- return this.resultsMap.size;
+ return this.pendingOps.size;
}
/**
@@ -183,16 +180,15 @@ class BulkCommitBatch {
documentRef: firestore.DocumentReference<T>
): Promise<WriteResult> {
assert(
- !this.docPaths.has(documentRef.path),
+ !this.pendingOps.has(documentRef.path),
'Batch should not contain writes to the same document'
);
assert(
this.state === BatchState.OPEN,
'Batch should be OPEN when adding writes'
);
- this.docPaths.add(documentRef.path);
const deferred = new Deferred<BatchWriteResult>();
- this.resultsMap.set(this.opCount, deferred);
+ this.pendingOps.set(documentRef.path, deferred);
if (this.opCount === this.maxBatchSize) {
this.state = BatchState.READY_TO_SEND;
@@ -224,46 +220,34 @@ class BulkCommitBatch {
// Capture the error stack to preserve stack tracing across async calls.
const stack = Error().stack!;
- let originalIndexMap: Map<number, number> = new Map(
- Array.from(new Array(this.opCount), (_, i) => [i, i])
- );
-
- let retryAttempts = 0;
- while (retryAttempts < MAX_RETRY_ATTEMPTS) {
- let retryIndexes: number[] = [];
+ for (let attempt = 0; attempt < MAX_RETRY_ATTEMPTS; ++attempt) {
await this.backoff.backoffAndWait();
+
+ let results: BatchWriteResult[];
try {
- const results = await this.writeBatch.bulkCommit();
- retryIndexes = this.processResults(originalIndexMap, results);
+ results = await this.writeBatch.bulkCommit();
} catch (err) {
- retryIndexes = this.processResults(
- originalIndexMap,
- [],
- wrapError(err, stack)
- );
+ results = [...this.pendingOps.keys()].map(key => {
+ return { key, writeTime: null, status: wrapError(err, stack) };
+ });
}
- // Map the indexes that are going to be retried back to their
- // corresponding indexes in the original batch.
- originalIndexMap = new Map(
- Array.from(new Array(retryIndexes.length), (_, i) => [
- i,
- originalIndexMap.get(retryIndexes[i])!,
- ])
- );
- if (retryIndexes.length > 0) {
+ this.processResults(results);
+
+ if (this.pendingOps.size > 0) {
logger(
'BulkWriter.bulkCommit',
null,
- `Current batch failed at retry #${retryAttempts}. Num failures: ` +
- `${retryIndexes.length}.`
+ `Current batch failed at retry #${attempt}. Num failures: ` +
+ `${this.pendingOps.size}.`
);
-
- this.writeBatch = this.writeBatch._sliceIndexes(retryIndexes);
+ this.writeBatch = new WriteBatch(this.firestore, this.writeBatch, [
+ ...this.pendingOps.keys(),
+ ]);
} else {
+ this.completedDeferred.resolve();
break;
}
- retryAttempts++;
}
}
@@ -274,37 +258,16 @@ class BulkCommitBatch {
* on resultsMap
* @return The indexes of writes that failed with a retryable error.
*/
- processResults(
- originalIndexMap: Map<number, number>,
- results: BatchWriteResult[],
- error?: Error
- ): number[] {
- const indexesToRetry: number[] = [];
- if (error) {
- results = Array.from({length: originalIndexMap.size}, () => {
- return {
- writeTime: null,
- status: error,
- };
- });
- }
-
- for (let i = 0; i < results.length; i++) {
- const writeResult = results[i];
- const originalIndex = originalIndexMap.get(i)!;
- if (writeResult.status.code === Status.OK) {
- this.resultsMap.get(originalIndex)!.resolve(results[i]);
- } else if (this.shouldRetry(writeResult.status.code)) {
- indexesToRetry.push(i);
- } else {
- this.resultsMap.get(originalIndex)!.reject(writeResult.status);
+ processResults(results: BatchWriteResult[]): void {
+ for (const result of results) {
+ if (result.status.code === Status.OK) {
+ this.pendingOps.get(result.key)!.resolve(result);
+ this.pendingOps.delete(result.key);
+ } else if (!this.shouldRetry(result.status.code)) {
+ this.pendingOps.get(result.key)!.reject(result.status);
+ this.pendingOps.delete(result.key);
}
}
-
- if (indexesToRetry.length === 0) {
- this.completedDeferred.resolve();
- }
- return indexesToRetry;
}
private shouldRetry(code: Status | undefined): boolean {
@@ -326,6 +289,17 @@ class BulkCommitBatch {
this.state = BatchState.READY_TO_SEND;
}
}
+
+ has(path: string) {
+ for (const [key, _] of this.pendingOps) {
+ if (key == path) return true;
+ }
+ return false;
+ }
+
+ keys() {
+ return this.pendingOps.keys();
+ }
}
/**
@@ -646,10 +620,7 @@ export class BulkWriter {
): BulkCommitBatch {
if (this.batchQueue.length > 0) {
const lastBatch = this.batchQueue[this.batchQueue.length - 1];
- if (
- lastBatch.state === BatchState.OPEN &&
- !lastBatch.docPaths.has(ref.path)
- ) {
+ if (lastBatch.state === BatchState.OPEN && !lastBatch.has(ref.path)) {
return lastBatch;
}
}
@@ -743,11 +714,11 @@ export class BulkWriter {
return false;
}
- for (const path of batch.docPaths) {
+ for (const path of batch.keys()) {
const isRefInFlight =
this.batchQueue
.filter(batch => batch.state === BatchState.SENT)
- .find(batch => batch.docPaths.has(path)) !== undefined;
+ .find(batch => batch.has(path)) !== undefined;
if (isRefInFlight) {
// eslint-disable-next-line no-console
console.warn(
diff --git a/dev/src/write-batch.ts b/dev/src/write-batch.ts
index bc3a65b..6e61991 100644
--- a/dev/src/write-batch.ts
+++ b/dev/src/write-batch.ts
@@ -104,6 +104,7 @@ export class WriteResult implements firestore.WriteResult {
*/
export class BatchWriteResult {
constructor(
+ readonly key: string,
readonly writeTime: Timestamp | null,
readonly status: GoogleError
) {}
@@ -132,7 +133,7 @@ export class WriteBatch implements firestore.WriteBatch {
* resulting `api.IWrite` will be sent to the backend.
* @private
*/
- private _ops: Array<PendingWriteOp> = [];
+ private _ops: Array<{key: string; op: PendingWriteOp}> = [];
private _committed = false;
@@ -141,10 +142,26 @@ export class WriteBatch implements firestore.WriteBatch {
*
* @param firestore The Firestore Database client.
*/
- constructor(firestore: Firestore) {
+ constructor(firestore: Firestore);
+ constructor(
+ firestore: Firestore,
+ retryBatch: WriteBatch,
+ keysToRetry: string[]
+ );
+ constructor(
+ firestore: Firestore,
+ retryBatch?: WriteBatch,
+ keysToRetry?: string[]
+ ) {
this._firestore = firestore;
this._serializer = new Serializer(firestore);
this._allowUndefined = !!firestore._settings.ignoreUndefinedProperties;
+
+ if (retryBatch) {
+ this._ops = retryBatch._ops.filter(
+ v => keysToRetry!.indexOf(v.key) !== -1
+ );
+ }
}
/**
@@ -214,7 +231,7 @@ export class WriteBatch implements firestore.WriteBatch {
return write;
};
- this._ops.push(op);
+ this._ops.push({key: documentRef.path, op});
return this;
}
@@ -261,7 +278,7 @@ export class WriteBatch implements firestore.WriteBatch {
return write;
};
- this._ops.push(op);
+ this._ops.push({key: documentRef.path, op});
return this;
}
@@ -362,7 +379,7 @@ export class WriteBatch implements firestore.WriteBatch {
return write;
};
- this._ops.push(op);
+ this._ops.push({key: documentRef.path, op});
return this;
}
@@ -518,7 +535,7 @@ export class WriteBatch implements firestore.WriteBatch {
return write;
};
- this._ops.push(op);
+ this._ops.push({key: documentRef.path, op});
return this;
}
@@ -566,7 +583,7 @@ export class WriteBatch implements firestore.WriteBatch {
const database = this._firestore.formattedName;
const request: api.IBatchWriteRequest = {
database,
- writes: this._ops.map(op => op()),
+ writes: this._ops.map(op => op.op()),
};
const retryCodes = getRetryCodes('batchWrite');
@@ -589,23 +606,8 @@ export class WriteBatch implements firestore.WriteBatch {
error.code === Status.OK
? Timestamp.fromProto(result.updateTime || DELETE_TIMESTAMP_SENTINEL)
: null;
- return new BatchWriteResult(updateTime, error);
- });
- }
-
- /**
- * Creates a new WriteBatch containing only the operations located at the
- * provided indexes.
- *
- * @param indexes List of operation indexes to keep
- * @private
- */
- _sliceIndexes(indexes: number[]): WriteBatch {
- const writeBatch = new WriteBatch(this._firestore);
- writeBatch._ops = this._ops.filter((_, i) => {
- return indexes.includes(i);
+ return new BatchWriteResult(this._ops[i].key, updateTime, error);
});
- return writeBatch;
}
/**
@@ -633,7 +635,7 @@ export class WriteBatch implements firestore.WriteBatch {
const request: api.ICommitRequest = {
database,
- writes: this._ops.map(op => op()),
+ writes: this._ops.map(op => op.op()),
};
if (commitOptions?.transactionId) {
request.transaction = commitOptions.transactionId;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment