optimize websocket step

This commit is contained in:
buyuxiang
2022-07-11 18:25:43 +08:00
parent 28d5f414d0
commit aa5de4ec83
6 changed files with 278 additions and 119 deletions

View File

@@ -128,40 +128,45 @@ func (s *StepWebSocket) Run(r *SessionRunner) (*StepResult, error) {
return runStepWebSocket(r, s.step)
}
func (s *StepWebSocket) OpenConnection(url string) *StepWebSocket {
func (s *StepWebSocket) withUrl(url ...string) *StepWebSocket {
if len(url) == 0 {
return s
}
if len(url) > 1 {
log.Warn().Msg("too many WebSocket step URL specified, using first URL")
}
s.step.WebSocket.URL = url[0]
return s
}
func (s *StepWebSocket) OpenConnection(url ...string) *StepWebSocket {
s.step.WebSocket.Type = wsOpen
s.step.WebSocket.URL = url
return s
return s.withUrl(url...)
}
func (s *StepWebSocket) PingPong(url string) *StepWebSocket {
func (s *StepWebSocket) PingPong(url ...string) *StepWebSocket {
s.step.WebSocket.Type = wsPing
s.step.WebSocket.URL = url
return s
return s.withUrl(url...)
}
func (s *StepWebSocket) WriteAndRead(url string) *StepWebSocket {
func (s *StepWebSocket) WriteAndRead(url ...string) *StepWebSocket {
s.step.WebSocket.Type = wsWriteAndRead
s.step.WebSocket.URL = url
return s
return s.withUrl(url...)
}
func (s *StepWebSocket) Read(url string) *StepWebSocket {
func (s *StepWebSocket) Read(url ...string) *StepWebSocket {
s.step.WebSocket.Type = wsRead
s.step.WebSocket.URL = url
return s
return s.withUrl(url...)
}
func (s *StepWebSocket) Write(url string) *StepWebSocket {
func (s *StepWebSocket) Write(url ...string) *StepWebSocket {
s.step.WebSocket.Type = wsWrite
s.step.WebSocket.URL = url
return s
return s.withUrl(url...)
}
func (s *StepWebSocket) CloseConnection(url string) *StepWebSocket {
func (s *StepWebSocket) CloseConnection(url ...string) *StepWebSocket {
s.step.WebSocket.Type = wsClose
s.step.WebSocket.URL = url
return s
return s.withUrl(url...)
}
func (s *StepWebSocket) WithParams(params map[string]interface{}) *StepWebSocket {
@@ -226,6 +231,23 @@ type WebSocketAction struct {
Timeout int64 `json:"timeout,omitempty" yaml:"timeout,omitempty"`
}
func initWebSocket(testcase *TestCase) {
tCase := testcase.ToTCase()
for _, step := range tCase.TestSteps {
if step.WebSocket == nil {
continue
}
// init websocket action parameters
if step.WebSocket.Timeout <= 0 {
step.WebSocket.Timeout = defaultTimeout
}
// close status code range: [1000, 4999]. ref: https://datatracker.ietf.org/doc/html/rfc6455#section-11.7
if step.WebSocket.CloseStatusCode < 1000 || step.WebSocket.CloseStatusCode > 4999 {
step.WebSocket.CloseStatusCode = defaultCloseStatus
}
}
}
func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, err error) {
stepResult = &StepResult{
Name: step.Name,
@@ -259,9 +281,30 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er
sessionData := newSessionData()
parser := r.GetParser()
config := r.GetConfig()
dummyReq := &Request{
URL: step.WebSocket.URL,
Params: step.WebSocket.Params,
Headers: step.WebSocket.Headers,
}
rb := newRequestBuilder(parser, config, dummyReq)
err = rb.prepareUrlParams(stepVariables)
if err != nil {
return
}
err = rb.prepareHeaders(stepVariables)
if err != nil {
return
}
parsedURL := rb.req.URL.String()
parsedHeader := rb.req.Header
// add request object to step variables, could be used in setup hooks
stepVariables["hrp_step_name"] = step.Name
stepVariables["hrp_step_request"] = rb.requestMap
// deal with setup hooks
for _, setupHook := range step.SetupHooks {
@@ -271,8 +314,6 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er
}
}
// init websocket frame parameters
initStepWebsocket(step.WebSocket)
var resp interface{}
start := time.Now()
@@ -284,19 +325,16 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er
case wsOpen:
log.Info().Int64("timeout(ms)", step.WebSocket.Timeout).Msg("open websocket connection")
// use the current websocket connection if existed
if r.wsConn != nil {
if r.wsConnMap[parsedURL] != nil {
break
}
resp, err = openWithTimeout(sessionData, r, step, stepVariables)
resp, err = openWithTimeout(parsedURL, parsedHeader, r, step)
if err != nil {
return stepResult, errors.Wrap(err, "open connection failed")
}
case wsPing:
log.Info().Int64("timeout(ms)", step.WebSocket.Timeout).Msg("send ping and expect pong")
if r.wsConn == nil {
return stepResult, errors.Errorf("try to use existing connection, but there is no connection")
}
err = writeWebSocket(r, step, stepVariables)
err = writeWebSocket(parsedURL, r, step, stepVariables)
if err != nil {
return stepResult, errors.Wrap(err, "send ping message failed")
}
@@ -314,29 +352,29 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er
}()
case wsWriteAndRead:
log.Info().Int64("timeout(ms)", step.WebSocket.Timeout).Msg("write a message and read response")
err = writeWebSocket(r, step, stepVariables)
err = writeWebSocket(parsedURL, r, step, stepVariables)
if err != nil {
return stepResult, errors.Wrap(err, "write message failed")
}
resp, err = readMessageWithTimeout(r, step)
resp, err = readMessageWithTimeout(parsedURL, r, step)
if err != nil {
return stepResult, errors.Wrap(err, "read message failed")
}
case wsRead:
log.Info().Int64("timeout(ms)", step.WebSocket.Timeout).Msg("read only")
resp, err = readMessageWithTimeout(r, step)
resp, err = readMessageWithTimeout(parsedURL, r, step)
if err != nil {
return stepResult, errors.Wrap(err, "read message failed")
}
case wsWrite:
log.Info().Msg("write only")
err = writeWebSocket(r, step, stepVariables)
err = writeWebSocket(parsedURL, r, step, stepVariables)
if err != nil {
return stepResult, errors.Wrap(err, "write message failed")
}
case wsClose:
log.Info().Int64("timeout(ms)", step.WebSocket.Timeout).Msg("close webSocket connection")
resp, err = closeWithTimeout(r, step, stepVariables)
resp, err = closeWithTimeout(parsedURL, r, step, stepVariables)
if err != nil {
return stepResult, errors.Wrap(err, "close connection failed")
}
@@ -371,6 +409,7 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er
}
if respObj != nil {
sessionData.ReqResps.Request = rb.requestMap
sessionData.ReqResps.Response = builtin.FormatResponse(respObj.respObjMeta)
// extract variables from response
@@ -428,56 +467,11 @@ func printWebSocketResponse(resp interface{}) error {
return nil
}
func initStepWebsocket(stepWebSocket *WebSocketAction) {
if stepWebSocket == nil {
return
}
if stepWebSocket.Timeout <= 0 {
stepWebSocket.Timeout = defaultTimeout
}
// close status code range: [1000, 4999]. ref: https://datatracker.ietf.org/doc/html/rfc6455#section-11.7
if stepWebSocket.CloseStatusCode < 1000 || stepWebSocket.CloseStatusCode > 4999 {
stepWebSocket.CloseStatusCode = defaultCloseStatus
}
}
// prepareDialInfo prepares url and headers before opening connection
func prepareDialInfo(r *SessionRunner, step *TStep, stepVariables map[string]interface{}) (*requestBuilder, error) {
parser := r.GetParser()
config := r.GetConfig()
dummyReq := &Request{
URL: step.WebSocket.URL,
Params: step.WebSocket.Params,
Headers: step.WebSocket.Headers,
}
dummyBuilder := newRequestBuilder(parser, config, dummyReq)
err := dummyBuilder.prepareUrlParams(stepVariables)
if err != nil {
return nil, err
}
err = dummyBuilder.prepareHeaders(stepVariables)
if err != nil {
return nil, err
}
return dummyBuilder, nil
}
func openWithTimeout(d *SessionData, r *SessionRunner, step *TStep, stepVariables map[string]interface{}) (*http.Response, error) {
func openWithTimeout(urlStr string, requestHeader http.Header, r *SessionRunner, step *TStep) (*http.Response, error) {
openResponseChan := make(chan *http.Response)
errorChan := make(chan error)
go func() {
// prepare request and dial
rb, err := prepareDialInfo(r, step, stepVariables)
if err != nil {
errorChan <- errors.Wrap(err, "prepare dail info failed")
return
}
d.ReqResps.Request = rb.requestMap
conn, resp, err := r.hrpRunner.wsDialer.Dial(rb.req.URL.String(), rb.req.Header)
conn, resp, err := r.hrpRunner.wsDialer.Dial(urlStr, requestHeader)
if err != nil {
errorChan <- errors.Wrap(err, "dial tcp failed")
return
@@ -499,7 +493,7 @@ func openWithTimeout(d *SessionData, r *SessionRunner, step *TStep, stepVariable
}
return nil
})
r.wsConn = conn
r.wsConnMap[urlStr] = conn
openResponseChan <- resp
}()
@@ -515,14 +509,15 @@ func openWithTimeout(d *SessionData, r *SessionRunner, step *TStep, stepVariable
}
}
func readMessageWithTimeout(r *SessionRunner, step *TStep) (*wsReadRespObject, error) {
if r.wsConn == nil {
func readMessageWithTimeout(urlString string, r *SessionRunner, step *TStep) (*wsReadRespObject, error) {
wsConn := r.wsConnMap[urlString]
if wsConn == nil {
return nil, errors.New("try to use existing connection, but there is no connection")
}
readResponseChan := make(chan *wsReadRespObject)
errorChan := make(chan error)
go func() {
messageType, message, err := r.wsConn.ReadMessage()
messageType, message, err := wsConn.ReadMessage()
if err != nil {
errorChan <- err
} else {
@@ -544,18 +539,18 @@ func readMessageWithTimeout(r *SessionRunner, step *TStep) (*wsReadRespObject, e
}
}
func writeWebSocket(r *SessionRunner, step *TStep, stepVariables map[string]interface{}) error {
if r.wsConn == nil {
func writeWebSocket(urlString string, r *SessionRunner, step *TStep, stepVariables map[string]interface{}) error {
wsConn := r.wsConnMap[urlString]
if wsConn == nil {
return errors.New("try to use existing connection, but there is no connection")
}
// TODO: only support writing one kind of message each step here?
// check priority: text message > binary message
if step.WebSocket.TextMessage != nil {
parsedMessage, parseErr := r.parser.Parse(step.WebSocket.TextMessage, stepVariables)
if parseErr != nil {
return parseErr
}
writeErr := writeWithType(r.wsConn, step, websocket.TextMessage, parsedMessage)
writeErr := writeWithType(wsConn, step, websocket.TextMessage, parsedMessage)
if writeErr != nil {
return writeErr
}
@@ -564,13 +559,13 @@ func writeWebSocket(r *SessionRunner, step *TStep, stepVariables map[string]inte
if parseErr != nil {
return parseErr
}
writeErr := writeWithType(r.wsConn, step, websocket.BinaryMessage, parsedMessage)
writeErr := writeWithType(wsConn, step, websocket.BinaryMessage, parsedMessage)
if writeErr != nil {
return writeErr
}
} else {
log.Info().Msg("step with empty message")
err := writeWithAction(r.wsConn, step, websocket.BinaryMessage, []byte{})
err := writeWithAction(wsConn, step, websocket.BinaryMessage, []byte{})
if err != nil {
return err
}
@@ -610,13 +605,14 @@ func writeWithAction(c *websocket.Conn, step *TStep, messageType int, message []
}
}
func closeWithTimeout(r *SessionRunner, step *TStep, stepVariables map[string]interface{}) (*wsCloseRespObject, error) {
if r.wsConn == nil {
func closeWithTimeout(urlString string, r *SessionRunner, step *TStep, stepVariables map[string]interface{}) (*wsCloseRespObject, error) {
wsConn := r.wsConnMap[urlString]
if wsConn == nil {
return nil, errors.New("no connection needs to be closed")
}
errorChan := make(chan error)
go func() {
err := writeWebSocket(r, step, stepVariables)
err := writeWebSocket(urlString, r, step, stepVariables)
if err != nil {
errorChan <- errors.Wrap(err, "send close message failed")
return
@@ -626,7 +622,7 @@ func closeWithTimeout(r *SessionRunner, step *TStep, stepVariables map[string]in
var message []byte
var readErr error
for readErr == nil {
mt, message, readErr = r.wsConn.ReadMessage()
mt, message, readErr = wsConn.ReadMessage()
if readErr == nil {
log.Info().
Str("type", MessageType(mt).toString()).