Last active
September 8, 2022 13:41
-
-
Save cideM/a6ae37054dccbf7a19f163e6cf9b979e to your computer and use it in GitHub Desktop.
The new step function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func step[In any, Out any]( | |
ctx context.Context, | |
inputChannel <-chan In, | |
fn func(In) (Out, error), | |
) (chan Out, chan error) { | |
outputChannel := make(chan Out) | |
errorChannel := make(chan error) | |
limit := int64(2) | |
// Use all CPU cores to maximize efficiency. We'll set the limit to 2 so you | |
// can see the values being processed in batches of 2 at a time, in parallel | |
// limit := int64(runtime.NumCPU()) | |
sem1 := semaphore.NewWeighted(limit) | |
go func() { | |
defer close(outputChannel) | |
defer close(errorChannel) | |
for { | |
select { | |
case <-ctx.Done(): | |
break | |
case s, ok := <-inputChannel: | |
if ok { | |
if err := sem1.Acquire(ctx, 1); err != nil { | |
log.Printf("Failed to acquire semaphore: %v", err) | |
break | |
} | |
go func(s In) { | |
defer sem1.Release(1) | |
time.Sleep(time.Second * 3) | |
result, err := fn(s) | |
if err != nil { | |
errorChannel <- err | |
} else { | |
outputChannel <- result | |
} | |
}(s) | |
} else { | |
if err := sem1.Acquire(ctx, limit); err != nil { | |
log.Printf("Failed to acquire semaphore: %v", err) | |
} | |
return | |
} | |
} | |
} | |
}() | |
return outputChannel, errorChannel | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment