Last active
August 24, 2023 20:50
-
-
Save ZhouXing19/5bd2f1939ced0c3f88b92f273a1de26e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/internal/frontend/frontend.go b/internal/frontend/frontend.go | |
index 3df0069..eb02d1c 100644 | |
--- a/internal/frontend/frontend.go | |
+++ b/internal/frontend/frontend.go | |
@@ -66,6 +66,7 @@ type Frontend interface { | |
ListenForTraffic() Traffic | |
// HandleTraffic tells the frontend to handle the traffic sent. | |
HandleTraffic(ctx context.Context, traffic any) error | |
+ CheckIfTrafficCutover(traffic any) (bool, error) | |
PrepareResult(ctx context.Context, result Result) (any, error) | |
ShadowExecutor() shadow.ShadowExecutor | |
diff --git a/internal/proxyql/proxyql.go b/internal/proxyql/proxyql.go | |
index fde4b75..256e53f 100644 | |
--- a/internal/proxyql/proxyql.go | |
+++ b/internal/proxyql/proxyql.go | |
@@ -50,7 +50,8 @@ type CutoverRequest struct { | |
// It determines the limit of wait for all connections to become tnx/query | |
// free. When the timeout is reached, abort the cutover attempt and resume | |
// the traffic. | |
- BeginTimeout time.Duration | |
+ BeginTimeout time.Duration | |
+ silentIfNoOngoingCutover bool | |
} | |
func (c *CutoverRequest) Format(b *strings.Builder) { | |
@@ -112,6 +113,15 @@ func HasCommandPrefix(s string) bool { | |
return i == len(compareStartWord) | |
} | |
+func CheckIfStringIsCutover(s string) (bool, error) { | |
+ c, err := Parse(s) | |
+ if err != nil { | |
+ return false, err | |
+ } | |
+ _, ok := c.(*CutoverRequest) | |
+ return ok, nil | |
+} | |
+ | |
diff --git a/internal/proxyserver/connection.go b/internal/proxyserver/connection.go | |
index 86b17e5..84e3527 100644 | |
--- a/internal/proxyserver/connection.go | |
+++ b/internal/proxyserver/connection.go | |
@@ -24,6 +24,13 @@ type connection struct { | |
frontend.Frontend | |
key string | |
+ // isForCutover if the connection has issued a CONSISTENT CUTOVER BEGIN | |
+ // request, which trigger the traffic to pause, and all user connection and | |
+ // the proxy server will wait for followup request from this connection for | |
+ // next step. The proxy server listens to the aliveness of this connection, | |
+ // and once it dies, the server will inform all user connections to | |
+ // AUTOCOMPLETE the ongoing cutover and resume the traffic. | |
+ isForCutover bool | |
// trafficCh outputs 1 read packet. when traffic is read. | |
trafficCh chan frontend.Traffic | |
// readyForTrafficCh signifies that we should attempt to read another | |
diff --git a/internal/proxyserver/proxyserver.go b/internal/proxyserver/proxyserver.go | |
index 10477f8..8e03821 100644 | |
--- a/internal/proxyserver/proxyserver.go | |
+++ b/internal/proxyserver/proxyserver.go | |
@@ -478,6 +478,9 @@ func (s *Server) handleConnection( | |
ctx context.Context, conn *connection, logger zerolog.Logger, | |
) (retErr error) { | |
defer func() { | |
+ if conn.isForCutover { | |
+ //TODO: make an artifical traffic to AUTOCOMPLETE the cutover. | |
+ } | |
s.conns.Delete(conn.key) | |
retErr = errors.CombineErrors(retErr, conn.Close(ctx)) | |
}() | |
@@ -513,6 +516,11 @@ trafficLoop: | |
return errors.AssertionFailedf("unexpected followup cutover followup request") | |
} | |
case traffic, ok := <-conn.TrafficChannel(): | |
+ isForCutover, err := conn.CheckIfTrafficCutover(traffic) | |
+ if err != nil { | |
+ return err | |
+ } | |
+ conn.isForCutover = isForCutover | |
if err := handleTraffic(ctx, conn, traffic, ok); err != nil { | |
return err | |
} | |
diff --git a/mysqlproxy/internal/mysqlfrontend/frontend.go b/mysqlproxy/internal/mysqlfrontend/frontend.go | |
index ca6728d..019c781 100644 | |
--- a/mysqlproxy/internal/mysqlfrontend/frontend.go | |
+++ b/mysqlproxy/internal/mysqlfrontend/frontend.go | |
@@ -26,6 +26,7 @@ import ( | |
"github.com/cockroachlabs/crdb-proxy/internal/frontend" | |
"github.com/cockroachlabs/crdb-proxy/internal/go-mysql/mysql" | |
mysqlserver "github.com/cockroachlabs/crdb-proxy/internal/go-mysql/server" | |
+ "github.com/cockroachlabs/crdb-proxy/internal/proxyql" | |
"github.com/cockroachlabs/crdb-proxy/internal/proxyqlhandler" | |
"github.com/cockroachlabs/crdb-proxy/internal/shadow" | |
"github.com/rs/zerolog" | |
@@ -89,6 +90,19 @@ func (f *Frontend) ListenForTraffic() frontend.Traffic { | |
return frontend.Traffic{Data: data, Err: err} | |
} | |
+func (f *Frontend) CheckIfTrafficCutover(traffic any) (bool, error) { | |
+ trafficData := traffic.([]byte) | |
+ cmd := trafficData[0] | |
+ data := trafficData[1:] | |
+ | |
+ switch cmd { | |
+ case mysql.COM_QUERY: | |
+ return proxyql.CheckIfStringIsCutover(string(data)) | |
+ default: | |
+ return false, nil | |
+ } | |
+} | |
+ | |
func (f *Frontend) HandleTraffic(ctx context.Context, traffic any) error { | |
// Second half of mysql.HandleCommand - handle the packet (using the handler) | |
c := f.Conn | |
diff --git a/pgproxy/internal/pgfrontend/frontend.go b/pgproxy/internal/pgfrontend/frontend.go | |
index 765f4c2..f2eb8c9 100644 | |
--- a/pgproxy/internal/pgfrontend/frontend.go | |
+++ b/pgproxy/internal/pgfrontend/frontend.go | |
@@ -125,6 +125,17 @@ func (f *Frontend) PrepareResult(ctx context.Context, result frontend.Result) (a | |
return nil, f.pgBackend.Flush() | |
} | |
+func (f *Frontend) CheckIfTrafficCutover(traffic any) (bool, error) { | |
+ msg := traffic.(pgproto3.FrontendMessage) | |
+ switch msg := msg.(type) { | |
+ case *pgproto3.Query: | |
+ query := msg.String | |
+ return proxyql.CheckIfStringIsCutover(query) | |
+ default: | |
+ return false, nil | |
+ } | |
+} | |
+ | |
func (f *Frontend) HandleTraffic(ctx context.Context, traffic any) error { | |
msg := traffic.(pgproto3.FrontendMessage) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment