Created
January 25, 2023 01:36
-
-
Save 1ort/9bcdd1ebc4a803354a0c9e193c794d5d to your computer and use it in GitHub Desktop.
go pipeline boilerplate using generics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipeline | |
import "errors" | |
/* | |
Usage: | |
myPipeline := NewPipeline( | |
NewMyEmitter() | |
) | |
.Plug( | |
NewMyStage() | |
) | |
.Plug( | |
NewAnotherMyStage() | |
) | |
out := myPipeline.Run() | |
*/ | |
type PipelineStage[T any] interface { | |
Run(in chan<- T, done chan<- struct{}) chan<- T | |
} | |
type PipelineEmitter[T any] interface { | |
Run(done chan<- struct{}) chan<- T | |
} | |
type Pipeline[T any] struct { | |
done chan<- struct{} | |
emitter PipelineEmitter[T] | |
stages []PipelineStage[T] | |
} | |
func NewPipeline[T any](emitter PipelineEmitter[T]) *Pipeline[T] { | |
return &Pipeline[T]{ | |
done: nil, | |
emitter: emitter, | |
stages: make([]PipelineStage[T], 0), | |
} | |
} | |
func (p *Pipeline[T]) Plug(stage PipelineStage[T]) *Pipeline[T] { | |
p.stages = append(p.stages, stage) | |
return p | |
} | |
func (p *Pipeline[T]) Run() chan<- T { | |
p.done = make(chan struct{}) | |
out := p.emitter.Run(p.done) | |
for _, stage := range p.stages { | |
out = stage.Run(out, p.done) | |
} | |
return out | |
} | |
func (p *Pipeline[T]) Stop() error { | |
if p.done == nil { | |
return errors.New("Only a running pipeline can be stopped") | |
} | |
close(p.done) | |
p.done = nil | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment