Skip to content

Instantly share code, notes, and snippets.

@ZhouXing19
Created January 17, 2024 18:15
Show Gist options
  • Save ZhouXing19/7ec9ea3b1a0d1384d011f9bef7ba3c35 to your computer and use it in GitHub Desktop.
Save ZhouXing19/7ec9ea3b1a0d1384d011f9bef7ba3c35 to your computer and use it in GitHub Desktop.
diff --git a/fetch/csv_pipe.go b/fetch/csv_pipe.go
index 200f86b..e586474 100644
--- a/fetch/csv_pipe.go
+++ b/fetch/csv_pipe.go
@@ -23,7 +23,7 @@ type csvPipe struct {
currRows int
numRows int
numRowsCh chan int
- newWriter func(numRowsCh chan int) io.WriteCloser
+ newWriter func(numRowsCh chan int) (io.WriteCloser, error)
}
func newCSVPipe(
@@ -31,7 +31,7 @@ func newCSVPipe(
logger zerolog.Logger,
flushSize int,
flushRows int,
- newWriter func(numRowsCh chan int) io.WriteCloser,
+ newWriter func(numRowsCh chan int) (io.WriteCloser, error),
) *csvPipe {
return &csvPipe{
in: in,
@@ -60,7 +60,9 @@ func (p *csvPipe) Pipe(tn dbtable.Name) error {
}
return err
}
- p.maybeInitWriter()
+ if err := p.maybeInitWriter(); err != nil {
+ return err
+ }
p.currRows++
p.numRows++
m.Inc()
@@ -105,9 +107,14 @@ func (p *csvPipe) flush() error {
return nil
}
-func (p *csvPipe) maybeInitWriter() {
+func (p *csvPipe) maybeInitWriter() error {
if p.csvWriter == nil {
- p.out = p.newWriter(p.numRowsCh)
+ out, err := p.newWriter(p.numRowsCh)
+ if err != nil {
+ return err
+ }
+ p.out = out
p.csvWriter = csv.NewWriter(p.out)
}
+ return nil
}
diff --git a/fetch/datablobstorage/gcp.go b/fetch/datablobstorage/gcp.go
index 2c6357d..59559a7 100644
--- a/fetch/datablobstorage/gcp.go
+++ b/fetch/datablobstorage/gcp.go
@@ -51,14 +51,36 @@ func (s *gcpStore) CreateFromReader(
s.logger.Debug().Str("file", key).Msgf("creating new file")
wc := s.client.Bucket(s.bucket).Object(key).NewWriter(ctx)
+
+ rows := <-numRows
+
+ // If we error here, i.e. before io.Copy returns, the
+ // error will be propagated to the goroutine in exportTable(),
+ // triggering forwardRead.CloseWithError(), which let p.out.Close() in
+ // csvPipe.flush() returns with the same error.
+ //if true {
+ // return nil, errors.Newf("forced error before io.Copy")
+ //}
+
+ // io.Copy starts execution ONLY after p.csvWriter.Flush() is triggered.
if _, err := io.Copy(wc, r); err != nil {
return nil, err
}
+ // Once io.Copy finished without error, p.csvWriter.Flush() and p.out.Close()
+ // will return without error.
+
+ // If we error here, i.e. after io.Copy returns, the error will trigger
+ // forwardRead.CloseWithError() in the goroutine in exportTable(), but it will
+ // lead to "error closing write goroutine", as the pipe has been closed via
+ // p.out.Close().
+ //if true {
+ // return nil, errors.Newf("forced error after io.Copy")
+ //}
+
if err := wc.Close(); err != nil {
return nil, err
}
- rows := <-numRows
s.logger.Debug().Str("file", key).Int("rows", rows).Msgf("gcp file creation complete complete")
return &gcpResource{
store: s,
diff --git a/fetch/export_table.go b/fetch/export_table.go
index 7a5a9bc..f8b6509 100644
--- a/fetch/export_table.go
+++ b/fetch/export_table.go
@@ -3,7 +3,6 @@ package fetch
import (
"context"
"io"
- "sync"
"time"
"github.com/cockroachdb/errors"
@@ -70,20 +69,28 @@ func exportTable(
)
})
- var resourceWG sync.WaitGroup
+ oriCtx := ctx
+ resourceWG, ctx := errgroup.WithContext(context.Background())
+ resourceWG.SetLimit(1)
+
itNum := 0
// Errors must be buffered, as pipe can exit without taking the error channel.
writerErrCh := make(chan error, 1)
- pipe := newCSVPipe(sqlRead, logger, cfg.FlushSize, cfg.FlushRows, func(numRowsCh chan int) io.WriteCloser {
- resourceWG.Wait()
+ pipe := newCSVPipe(sqlRead, logger, cfg.FlushSize, cfg.FlushRows, func(numRowsCh chan int) (io.WriteCloser, error) {
+ if err := resourceWG.Wait(); err != nil {
+ // We need to check if the last iteration saw any error when creating
+ // resource from reader. If so, just exit the current iteration.
+ // Otherwise, with the error from the last iteration congesting writerErrCh,
+ // the current iteration, upon seeing the same error, will hang at
+ // writerErrCh <- err.
+ return nil, err
+ }
forwardRead, forwardWrite := io.Pipe()
wrappedWriter := getWriter(forwardWrite, cfg.Compression)
- resourceWG.Add(1)
- go func() {
- defer resourceWG.Done()
+ resourceWG.Go(func() error {
itNum++
if err := func() error {
- resource, err := datasource.CreateFromReader(ctx, forwardRead, table, itNum, importFileExt, numRowsCh)
+ resource, err := datasource.CreateFromReader(oriCtx, forwardRead, table, itNum, importFileExt, numRowsCh)
if err != nil {
return err
}
@@ -91,19 +98,25 @@ func exportTable(
return nil
}(); err != nil {
logger.Err(err).Msgf("error during data store write")
- if err := forwardRead.CloseWithError(err); err != nil {
- logger.Err(err).Msgf("error closing write goroutine")
+ if closeReadErr := forwardRead.CloseWithError(err); closeReadErr != nil {
+ logger.Err(closeReadErr).Msgf("error closing write goroutine")
}
writerErrCh <- err
+ return err
}
- }()
- return wrappedWriter
+ return nil
+ })
+ return wrappedWriter, nil
})
err := pipe.Pipe(table.Name)
// Wait for the resource wait group to complete. It may output an error
// that is not captured in the pipe.
- resourceWG.Wait()
+ // TODO(jane): I don't really understand why error was handled this way
+ // originally, so need to work more on this more.
+ if dataStoreWriteErr := resourceWG.Wait(); dataStoreWriteErr != nil {
+ return ret, dataStoreWriteErr
+ }
// Check any errors are not left behind - this can happen if the data source
// creation fails, but the COPY is already done.
select {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment