Created
January 17, 2024 18:15
-
-
Save ZhouXing19/7ec9ea3b1a0d1384d011f9bef7ba3c35 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/fetch/csv_pipe.go b/fetch/csv_pipe.go | |
index 200f86b..e586474 100644 | |
--- a/fetch/csv_pipe.go | |
+++ b/fetch/csv_pipe.go | |
@@ -23,7 +23,7 @@ type csvPipe struct { | |
currRows int | |
numRows int | |
numRowsCh chan int | |
- newWriter func(numRowsCh chan int) io.WriteCloser | |
+ newWriter func(numRowsCh chan int) (io.WriteCloser, error) | |
} | |
func newCSVPipe( | |
@@ -31,7 +31,7 @@ func newCSVPipe( | |
logger zerolog.Logger, | |
flushSize int, | |
flushRows int, | |
- newWriter func(numRowsCh chan int) io.WriteCloser, | |
+ newWriter func(numRowsCh chan int) (io.WriteCloser, error), | |
) *csvPipe { | |
return &csvPipe{ | |
in: in, | |
@@ -60,7 +60,9 @@ func (p *csvPipe) Pipe(tn dbtable.Name) error { | |
} | |
return err | |
} | |
- p.maybeInitWriter() | |
+ if err := p.maybeInitWriter(); err != nil { | |
+ return err | |
+ } | |
p.currRows++ | |
p.numRows++ | |
m.Inc() | |
@@ -105,9 +107,14 @@ func (p *csvPipe) flush() error { | |
return nil | |
} | |
-func (p *csvPipe) maybeInitWriter() { | |
+func (p *csvPipe) maybeInitWriter() error { | |
if p.csvWriter == nil { | |
- p.out = p.newWriter(p.numRowsCh) | |
+ out, err := p.newWriter(p.numRowsCh) | |
+ if err != nil { | |
+ return err | |
+ } | |
+ p.out = out | |
p.csvWriter = csv.NewWriter(p.out) | |
} | |
+ return nil | |
} | |
diff --git a/fetch/datablobstorage/gcp.go b/fetch/datablobstorage/gcp.go | |
index 2c6357d..59559a7 100644 | |
--- a/fetch/datablobstorage/gcp.go | |
+++ b/fetch/datablobstorage/gcp.go | |
@@ -51,14 +51,36 @@ func (s *gcpStore) CreateFromReader( | |
s.logger.Debug().Str("file", key).Msgf("creating new file") | |
wc := s.client.Bucket(s.bucket).Object(key).NewWriter(ctx) | |
+ | |
+ rows := <-numRows | |
+ | |
+ // If we error here, i.e. before io.Copy returns, the | |
+ // error will be propagated to the goroutine in exportTable(), | |
+ // triggering forwardRead.CloseWithError(), which let p.out.Close() in | |
+ // csvPipe.flush() returns with the same error. | |
+ //if true { | |
+ // return nil, errors.Newf("forced error before io.Copy") | |
+ //} | |
+ | |
+ // io.Copy starts execution ONLY after p.csvWriter.Flush() is triggered. | |
if _, err := io.Copy(wc, r); err != nil { | |
return nil, err | |
} | |
+ // Once io.Copy finished without error, p.csvWriter.Flush() and p.out.Close() | |
+ // will return without error. | |
+ | |
+ // If we error here, i.e. after io.Copy returns, the error will trigger | |
+ // forwardRead.CloseWithError() in the goroutine in exportTable(), but it will | |
+ // lead to "error closing write goroutine", as the pipe has been closed via | |
+ // p.out.Close(). | |
+ //if true { | |
+ // return nil, errors.Newf("forced error after io.Copy") | |
+ //} | |
+ | |
if err := wc.Close(); err != nil { | |
return nil, err | |
} | |
- rows := <-numRows | |
s.logger.Debug().Str("file", key).Int("rows", rows).Msgf("gcp file creation complete complete") | |
return &gcpResource{ | |
store: s, | |
diff --git a/fetch/export_table.go b/fetch/export_table.go | |
index 7a5a9bc..f8b6509 100644 | |
--- a/fetch/export_table.go | |
+++ b/fetch/export_table.go | |
@@ -3,7 +3,6 @@ package fetch | |
import ( | |
"context" | |
"io" | |
- "sync" | |
"time" | |
"github.com/cockroachdb/errors" | |
@@ -70,20 +69,28 @@ func exportTable( | |
) | |
}) | |
- var resourceWG sync.WaitGroup | |
+ oriCtx := ctx | |
+ resourceWG, ctx := errgroup.WithContext(context.Background()) | |
+ resourceWG.SetLimit(1) | |
+ | |
itNum := 0 | |
// Errors must be buffered, as pipe can exit without taking the error channel. | |
writerErrCh := make(chan error, 1) | |
- pipe := newCSVPipe(sqlRead, logger, cfg.FlushSize, cfg.FlushRows, func(numRowsCh chan int) io.WriteCloser { | |
- resourceWG.Wait() | |
+ pipe := newCSVPipe(sqlRead, logger, cfg.FlushSize, cfg.FlushRows, func(numRowsCh chan int) (io.WriteCloser, error) { | |
+ if err := resourceWG.Wait(); err != nil { | |
+ // We need to check if the last iteration saw any error when creating | |
+ // resource from reader. If so, just exit the current iteration. | |
+ // Otherwise, with the error from the last iteration congesting writerErrCh, | |
+ // the current iteration, upon seeing the same error, will hang at | |
+ // writerErrCh <- err. | |
+ return nil, err | |
+ } | |
forwardRead, forwardWrite := io.Pipe() | |
wrappedWriter := getWriter(forwardWrite, cfg.Compression) | |
- resourceWG.Add(1) | |
- go func() { | |
- defer resourceWG.Done() | |
+ resourceWG.Go(func() error { | |
itNum++ | |
if err := func() error { | |
- resource, err := datasource.CreateFromReader(ctx, forwardRead, table, itNum, importFileExt, numRowsCh) | |
+ resource, err := datasource.CreateFromReader(oriCtx, forwardRead, table, itNum, importFileExt, numRowsCh) | |
if err != nil { | |
return err | |
} | |
@@ -91,19 +98,25 @@ func exportTable( | |
return nil | |
}(); err != nil { | |
logger.Err(err).Msgf("error during data store write") | |
- if err := forwardRead.CloseWithError(err); err != nil { | |
- logger.Err(err).Msgf("error closing write goroutine") | |
+ if closeReadErr := forwardRead.CloseWithError(err); closeReadErr != nil { | |
+ logger.Err(closeReadErr).Msgf("error closing write goroutine") | |
} | |
writerErrCh <- err | |
+ return err | |
} | |
- }() | |
- return wrappedWriter | |
+ return nil | |
+ }) | |
+ return wrappedWriter, nil | |
}) | |
err := pipe.Pipe(table.Name) | |
// Wait for the resource wait group to complete. It may output an error | |
// that is not captured in the pipe. | |
- resourceWG.Wait() | |
+ // TODO(jane): I don't really understand why error was handled this way | |
+ // originally, so need to work more on this more. | |
+ if dataStoreWriteErr := resourceWG.Wait(); dataStoreWriteErr != nil { | |
+ return ret, dataStoreWriteErr | |
+ } | |
// Check any errors are not left behind - this can happen if the data source | |
// creation fails, but the COPY is already done. | |
select { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment