Skip to content

Instantly share code, notes, and snippets.

@ZhouXing19
Last active August 24, 2023 20:50
Show Gist options
  • Save ZhouXing19/5bd2f1939ced0c3f88b92f273a1de26e to your computer and use it in GitHub Desktop.
Save ZhouXing19/5bd2f1939ced0c3f88b92f273a1de26e to your computer and use it in GitHub Desktop.
diff --git a/internal/frontend/frontend.go b/internal/frontend/frontend.go
index 3df0069..eb02d1c 100644
--- a/internal/frontend/frontend.go
+++ b/internal/frontend/frontend.go
@@ -66,6 +66,7 @@ type Frontend interface {
ListenForTraffic() Traffic
// HandleTraffic tells the frontend to handle the traffic sent.
HandleTraffic(ctx context.Context, traffic any) error
+ CheckIfTrafficCutover(traffic any) (bool, error)
PrepareResult(ctx context.Context, result Result) (any, error)
ShadowExecutor() shadow.ShadowExecutor
diff --git a/internal/proxyql/proxyql.go b/internal/proxyql/proxyql.go
index fde4b75..256e53f 100644
--- a/internal/proxyql/proxyql.go
+++ b/internal/proxyql/proxyql.go
@@ -50,7 +50,8 @@ type CutoverRequest struct {
// It determines the limit of wait for all connections to become tnx/query
// free. When the timeout is reached, abort the cutover attempt and resume
// the traffic.
- BeginTimeout time.Duration
+ BeginTimeout time.Duration
+ silentIfNoOngoingCutover bool
}
func (c *CutoverRequest) Format(b *strings.Builder) {
@@ -112,6 +113,15 @@ func HasCommandPrefix(s string) bool {
return i == len(compareStartWord)
}
+func CheckIfStringIsCutover(s string) (bool, error) {
+ c, err := Parse(s)
+ if err != nil {
+ return false, err
+ }
+ _, ok := c.(*CutoverRequest)
+ return ok, nil
+}
+
diff --git a/internal/proxyserver/connection.go b/internal/proxyserver/connection.go
index 86b17e5..84e3527 100644
--- a/internal/proxyserver/connection.go
+++ b/internal/proxyserver/connection.go
@@ -24,6 +24,13 @@ type connection struct {
frontend.Frontend
key string
+ // isForCutover if the connection has issued a CONSISTENT CUTOVER BEGIN
+ // request, which trigger the traffic to pause, and all user connection and
+ // the proxy server will wait for followup request from this connection for
+ // next step. The proxy server listens to the aliveness of this connection,
+ // and once it dies, the server will inform all user connections to
+ // AUTOCOMPLETE the ongoing cutover and resume the traffic.
+ isForCutover bool
// trafficCh outputs 1 read packet. when traffic is read.
trafficCh chan frontend.Traffic
// readyForTrafficCh signifies that we should attempt to read another
diff --git a/internal/proxyserver/proxyserver.go b/internal/proxyserver/proxyserver.go
index 10477f8..8e03821 100644
--- a/internal/proxyserver/proxyserver.go
+++ b/internal/proxyserver/proxyserver.go
@@ -478,6 +478,9 @@ func (s *Server) handleConnection(
ctx context.Context, conn *connection, logger zerolog.Logger,
) (retErr error) {
defer func() {
+ if conn.isForCutover {
+ //TODO: make an artifical traffic to AUTOCOMPLETE the cutover.
+ }
s.conns.Delete(conn.key)
retErr = errors.CombineErrors(retErr, conn.Close(ctx))
}()
@@ -513,6 +516,11 @@ trafficLoop:
return errors.AssertionFailedf("unexpected followup cutover followup request")
}
case traffic, ok := <-conn.TrafficChannel():
+ isForCutover, err := conn.CheckIfTrafficCutover(traffic)
+ if err != nil {
+ return err
+ }
+ conn.isForCutover = isForCutover
if err := handleTraffic(ctx, conn, traffic, ok); err != nil {
return err
}
diff --git a/mysqlproxy/internal/mysqlfrontend/frontend.go b/mysqlproxy/internal/mysqlfrontend/frontend.go
index ca6728d..019c781 100644
--- a/mysqlproxy/internal/mysqlfrontend/frontend.go
+++ b/mysqlproxy/internal/mysqlfrontend/frontend.go
@@ -26,6 +26,7 @@ import (
"github.com/cockroachlabs/crdb-proxy/internal/frontend"
"github.com/cockroachlabs/crdb-proxy/internal/go-mysql/mysql"
mysqlserver "github.com/cockroachlabs/crdb-proxy/internal/go-mysql/server"
+ "github.com/cockroachlabs/crdb-proxy/internal/proxyql"
"github.com/cockroachlabs/crdb-proxy/internal/proxyqlhandler"
"github.com/cockroachlabs/crdb-proxy/internal/shadow"
"github.com/rs/zerolog"
@@ -89,6 +90,19 @@ func (f *Frontend) ListenForTraffic() frontend.Traffic {
return frontend.Traffic{Data: data, Err: err}
}
+func (f *Frontend) CheckIfTrafficCutover(traffic any) (bool, error) {
+ trafficData := traffic.([]byte)
+ cmd := trafficData[0]
+ data := trafficData[1:]
+
+ switch cmd {
+ case mysql.COM_QUERY:
+ return proxyql.CheckIfStringIsCutover(string(data))
+ default:
+ return false, nil
+ }
+}
+
func (f *Frontend) HandleTraffic(ctx context.Context, traffic any) error {
// Second half of mysql.HandleCommand - handle the packet (using the handler)
c := f.Conn
diff --git a/pgproxy/internal/pgfrontend/frontend.go b/pgproxy/internal/pgfrontend/frontend.go
index 765f4c2..f2eb8c9 100644
--- a/pgproxy/internal/pgfrontend/frontend.go
+++ b/pgproxy/internal/pgfrontend/frontend.go
@@ -125,6 +125,17 @@ func (f *Frontend) PrepareResult(ctx context.Context, result frontend.Result) (a
return nil, f.pgBackend.Flush()
}
+func (f *Frontend) CheckIfTrafficCutover(traffic any) (bool, error) {
+ msg := traffic.(pgproto3.FrontendMessage)
+ switch msg := msg.(type) {
+ case *pgproto3.Query:
+ query := msg.String
+ return proxyql.CheckIfStringIsCutover(query)
+ default:
+ return false, nil
+ }
+}
+
func (f *Frontend) HandleTraffic(ctx context.Context, traffic any) error {
msg := traffic.(pgproto3.FrontendMessage)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment