Skip to content

Instantly share code, notes, and snippets.

@spindi
Last active August 8, 2019 21:03
Show Gist options
  • Save spindi/b54c66d5633c4651223fc2de89b9cd53 to your computer and use it in GitHub Desktop.
Save spindi/b54c66d5633c4651223fc2de89b9cd53 to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"compress/gzip"
"fmt"
"github.com/pkg/errors"
"local/ripley/chucker/config"
"log"
"os"
"sync"
"time"
)
// get file content
func ContentGet(filename string, lines *chan string, wg *sync.WaitGroup) error {
// file reader
f, err := os.Open(filename)
if err != nil {
return errors.Wrap(err, "unable to open file reader")
}
// gzip reader
gr, err := gzip.NewReader(f)
if err != nil {
return errors.Wrap(err, "unable to open gzip reader")
}
// read file into channel
scanner := bufio.NewScanner(gr)
for scanner.Scan() {
*lines <- scanner.Text()
wg.Add(1)
}
if err := scanner.Err(); err != nil {
return errors.Wrap(err, "unable to read file")
}
close(*lines)
if err := f.Close(); err != nil {
return errors.Wrap(err, "unable to close file reader")
}
if err := gr.Close(); err != nil {
return errors.Wrap(err, "unable to close gzip reader")
}
return nil
}
// some long process
func SomeLongProcess(line string) {
fmt.Println(line)
time.Sleep(time.Second * 1)
fmt.Println("DONE")
}
// push message to somewhere else
func ContentPush(lines *chan string, wg *sync.WaitGroup) () {
for line := range *lines {
go func(){
defer wg.Done()
SomeLongProcess(line)
}()
}
}
func main() {
lines := make(chan string, config.ChannelBuffer)
var wg sync.WaitGroup
// start up the pusher
go ContentPush(&lines, &wg)
// throw the content at the channel
err := ContentGet(config.InputFilename, &lines, &wg)
if err != nil {
log.Fatal(err)
}
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment