Created
November 26, 2015 08:50
-
-
Save olivere/a6dc27ae961736721d2c to your computer and use it in GitHub Desktop.
Very simple bulk pool for elastic/Elasticsearch.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package bulk | |
import ( | |
"errors" | |
"sync" | |
"time" | |
"gopkg.in/olivere/elastic.v3" | |
) | |
const ( | |
// NumRetries is the default number of retries for a bulk request. | |
NumRetries = 5 | |
// SleepMillis is the number of milliseconds to wait between retries (initially). | |
// The sleep interval gets larger the more retries fail. | |
SleepMillis = 1000 | |
) | |
// Pool is a pool of workers performing bulk requests to Elasticsearch. | |
// A pool ensures that Elasticsearch doesn't get overloaded with many | |
// individual bulk requests. | |
// | |
// There is typically just one pool of index workers for the whole application. | |
// | |
// Example: | |
// // Set up a pool of 5 workers | |
// pool, err := bulk.NewPool(5) | |
// if err != nil { ... } | |
// | |
// // Create a response channel for requests | |
// responseCh := make(chan error) | |
// defer close(responseCh) | |
// | |
// // Create a bulk service from Elasticsearch | |
// svc, err := env.ES.Bulk().Index(...).Type(...) | |
// if err != nil { ... } | |
// | |
// for { | |
// // Break on some condition | |
// if stop { | |
// break | |
// } | |
// | |
// // Fill svc with bulk requests here | |
// indexReq := elastic.NewBulkIndexRequest().Id(...).Type(...).Doc(...) | |
// svc.Add(indexReq) | |
// | |
// // Commit every 1000 docs | |
// if svc.NumberOfActions() >= 1000 { | |
// // Enqueue a request | |
// pool.Commit(svc, responseCh) | |
// | |
// // Wait for response, either success or failure | |
// err <- responseCh | |
// if err != nil { ... } | |
// | |
// // Set up new bulk service | |
// svc, err = env.ES.Bulk().Index(...).Type(...) | |
// if err != nil { ... } | |
// } | |
// } | |
// | |
// // Final flush | |
// if svc.NumberOfActions() > 0 { | |
// // Enqueue a request | |
// err = pool.Commit(svc, responseCh) | |
// if err != nil { ... } | |
// | |
// // Wait for response, either success or failure | |
// err <- responseCh | |
// if err != nil { ... } | |
// | |
// // Set up new bulk service | |
// svc, err = env.ES.Bulk().Index(...).Type(...) | |
// if err != nil { ... } | |
// } | |
type Pool struct { | |
sync.Mutex | |
workers int | |
requests chan *request | |
done chan bool | |
running bool | |
} | |
// request encapsulates a single bulk insert request. | |
type request struct { | |
service *elastic.BulkService // bulk service | |
retries int // number of retries before giving up | |
sleepMillis int // time to sleep between retries (initial) | |
// Response channel that emits nil on success and an error on failure. | |
Response chan error | |
} | |
// NewPool sets up a new pool for bulk insertion into Elasticsearch. | |
func NewPool(workers int) (*Pool, error) { | |
if workers <= 0 { | |
return nil, errors.New("invalid number of workers") | |
} | |
p := &Pool{ | |
workers: workers, | |
requests: make(chan *request), | |
done: make(chan bool), | |
} | |
for i := 0; i < p.workers; i++ { | |
go p.worker(p.requests, p.done) | |
} | |
p.Lock() | |
p.running = true | |
p.Unlock() | |
return p, nil | |
} | |
// Close will close the pool, stop all underlying workers and close all | |
// channels. The pool is unusable after calling Close; you need to set up | |
// a new pool. | |
func (p *Pool) Close() error { | |
p.Lock() | |
defer p.Unlock() | |
if !p.running { | |
return nil | |
} | |
for i := 0; i < p.workers; i++ { | |
p.done <- true | |
} | |
for i := 0; i < p.workers; i++ { | |
<-p.done | |
} | |
close(p.requests) | |
close(p.done) | |
p.running = false | |
return nil | |
} | |
// isRunning indicates whether the pool is running. | |
func (p *Pool) isRunning() bool { | |
p.Lock() | |
defer p.Unlock() | |
return p.running | |
} | |
// Commit will commit a single bulk to Elasticsearch. | |
func (p *Pool) Commit(service *elastic.BulkService, responseCh chan error) { | |
p.requests <- &request{ | |
service: service, | |
retries: NumRetries, | |
sleepMillis: SleepMillis, | |
Response: responseCh, | |
} | |
} | |
// worker is an individual worker that consumes requests and commits them | |
// to Elasticsearch. | |
func (p *Pool) worker(requests chan *request, done chan bool) { | |
for { | |
select { | |
case r := <-requests: | |
p.commit(r) | |
case <-done: | |
done <- true | |
return | |
} | |
} | |
} | |
// commit bulk commits the request to Elasticsearch. It will retry | |
// automatically in case of errors. | |
func (p *Pool) commit(r *request) { | |
sleepMillis := r.sleepMillis | |
for { | |
_, err := r.service.Do() | |
if err != nil { | |
r.retries -= 1 | |
if r.retries < 0 { | |
r.Response <- err | |
return | |
} | |
time.Sleep(time.Duration(sleepMillis) * time.Millisecond) | |
sleepMillis += sleepMillis | |
} else { | |
break | |
} | |
} | |
r.Response <- nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment