Last active
March 1, 2023 21:30
-
-
Save ahmetb/1872db1f14e74aa3e7ca6484673e9443 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/json" | |
"errors" | |
"fmt" | |
"log" | |
"sync" | |
maelstrom "github.com/jepsen-io/maelstrom/demo/go" | |
) | |
type emptyMsg struct { | |
Type string `json:"type"` | |
} | |
type readResp struct { | |
Type string `json:"type"` | |
Value int `json:"value"` | |
} | |
type addReq struct { | |
Type string `json:"type"` | |
Value int `json:"value"` | |
} | |
type versionedVal struct { | |
Generation int `json:"generation"` | |
Value int `json:"value"` | |
} | |
func main() { | |
const key = "key" | |
node := maelstrom.NewNode() | |
n.Handle("read", func(msg maelstrom.Message) error { | |
seqConsistentKV := maelstrom.NewSeqKV(node) | |
v, err := seqConsistentKV.Read(context.TODO(), key) | |
if err != nil { | |
if isKeyNotExists(err) { | |
return n.Reply(msg, readResp{Type: "read_ok", Value: 0}) | |
} | |
log.Println(err) | |
return err | |
} | |
rv := v.(versionedVal) | |
return n.Reply(msg, readResp{Type: "read_ok", Value: rv.Value}) | |
}) | |
n.Handle("add", func(msg maelstrom.Message) error { | |
seqConsistentKV := maelstrom.NewSeqKV(node) | |
var req addReq | |
if err := json.Unmarshal(msg.Body, &req); err != nil { | |
return err | |
} | |
ctx := context.Background() | |
for { | |
v, err := seqConsistentKV.Read(ctx, key) | |
if err != nil { | |
if !isKeyNotExists(err) { | |
return fmt.Errorf("read err: %w", err) | |
} | |
if err := seqConsistentKV.Write(ctx, key, versionedVal{Generation: 1, Value: 0}); err != nil { | |
return fmt.Errorf("initial write err: %w", err) | |
} | |
continue // empty key started, try again | |
} | |
readVal := v.(versionedVal) | |
err = seqConsistentKV.CompareAndSwap(ctx, key, | |
readVal, | |
versionedVal{ | |
Generation: readVal.Generation + 1, | |
Value: readVal.Value + req.Value}, false) | |
if err == nil { | |
return n.Reply(msg, emptyMsg{Type: "add_ok"}) | |
} | |
var kvErr *maelstrom.RPCError | |
if errors.As(err, &kvErr) && kvErr.Code == maelstrom.PreconditionFailed { | |
continue // write contention, try again | |
} else { | |
log.Println("cas err", err) | |
return fmt.Errorf("cas err: %w", err) | |
} | |
} | |
}) | |
if err := n.Run(); err != nil { | |
log.Fatal(err) | |
} | |
} | |
func isKeyNotExists(err error) bool { | |
var rpcErr *maelstrom.RPCError | |
if !errors.As(err, &rpcErr) { | |
return false | |
} | |
return rpcErr.Code == maelstrom.KeyDoesNotExist | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment