Skip to content

Instantly share code, notes, and snippets.

@dbravender
Created November 24, 2009 21:10
Show Gist options
  • Save dbravender/242217 to your computer and use it in GitHub Desktop.
Save dbravender/242217 to your computer and use it in GitHub Desktop.
package mapreduce
func MapReduce(mapper func(interface{}, chan interface{}),
reducer func(chan interface{}, chan interface{}),
input chan interface{},
pool_size int) interface{}
{
reduce_input := make(chan interface{});
reduce_output := make(chan interface{});
worker_output := make(chan chan interface{}, pool_size);
go reducer(reduce_input, reduce_output);
go func() {
for worker_chan := range worker_output {
reduce_input <- <- worker_chan;
}
close(reduce_input);
}();
go func() {
for item := range input {
my_chan := make(chan interface{});
go mapper(item, my_chan);
worker_output <- my_chan;
}
close(worker_output);
}();
return <- reduce_output;
}
package main
import "fmt";
func main() {
for x := range [...]int{10, 9, 8} {
fmt.Println(x);
}
for _, x := range [...]int{10, 9, 8} {
fmt.Println(x);
}
input := make(chan int);
go func() {
for _, x := range[...]int{10, 9, 8} {
input <- x;
}
close(input);
}();
for x := range input {
fmt.Println(x);
}
}
// Prints:
// 0
// 1
// 2
// 10
// 9
// 8
// 10
// 9
// 8
package main
import (
"os";
"fmt";
"regexp";
"io";
"mapreduce";
)
func find_files(dirname string) chan interface{} {
output := make(chan interface{});
go func() {
_find_files(dirname, output);
close(output);
}();
return output;
}
func _find_files(dirname string, output chan interface{}) {
dir, _ := os.Open(dirname, os.O_RDONLY, 0);
dirnames, _ := dir.Readdirnames(-1);
for i := 0; i < len(dirnames); i++ {
fullpath := dirname + "/" + dirnames[i];
file, _ := os.Stat(fullpath);
if file.IsDirectory() {
_find_files(fullpath, output);
} else {
output <- fullpath;
}
}
}
func wordcount(filename interface{}, output chan interface{}) {
results := map[string]int{};
wordsRE := regexp.MustCompile(`[A-Za-z0-9_]*`);
contents, _ := io.ReadFile(filename.(string));
matches := wordsRE.AllMatchesString(string(contents), 0);
for i := 0; i < len(matches); i++ {
match := matches[i];
previous_count, exists := results[match];
if !exists {
results[match] = 1;
} else {
results[match] = previous_count + 1;
}
}
output <- results;
}
func reducer(input chan interface{}, output chan interface{}) {
results := map[string]int{};
for new_matches := range input {
for key, value := range new_matches.(map[string]int) {
_, exists := results[key];
if !exists {
results[key] = value;
} else {
results[key] = results[key] + value;
}
}
}
output <- results;
}
func main() {
fmt.Print(mapreduce.MapReduce(wordcount, reducer, find_files(".")));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment