mirror of
https://github.com/httprunner/httprunner.git
synced 2026-06-26 10:01:28 +08:00
fix: websocket multiple instance conflict
This commit is contained in:
@@ -410,6 +410,7 @@ func (r *CaseRunner) NewSession() *SessionRunner {
|
||||
summary: newSummary(),
|
||||
|
||||
transactions: make(map[string]map[transactionType]time.Time),
|
||||
ws: newWSSession(),
|
||||
}
|
||||
return sessionRunner
|
||||
}
|
||||
@@ -425,6 +426,9 @@ type SessionRunner struct {
|
||||
// transactions stores transaction timing info.
|
||||
// key is transaction name, value is map of transaction type and time, e.g. start time and end time.
|
||||
transactions map[string]map[transactionType]time.Time
|
||||
|
||||
// websocket session
|
||||
ws *wsSession
|
||||
}
|
||||
|
||||
// Start runs the test steps in sequential order.
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
"unsafe"
|
||||
@@ -17,17 +16,18 @@ import (
|
||||
"github.com/httprunner/httprunner/v4/hrp/internal/json"
|
||||
)
|
||||
|
||||
var (
|
||||
wsMutex sync.Mutex
|
||||
func newWSSession() *wsSession {
|
||||
return &wsSession{
|
||||
wsConnMap: make(map[string]*websocket.Conn),
|
||||
pongResponseChan: make(chan string, 1),
|
||||
closeResponseChan: make(chan *wsCloseRespObject, 1),
|
||||
}
|
||||
}
|
||||
|
||||
type wsSession struct {
|
||||
wsConnMap map[string]*websocket.Conn // save all websocket connections
|
||||
pongResponseChan chan string // channel used to receive pong response message
|
||||
closeResponseChan chan *wsCloseRespObject // channel used to receive close response message
|
||||
)
|
||||
|
||||
func init() {
|
||||
wsConnMap = make(map[string]*websocket.Conn)
|
||||
pongResponseChan = make(chan string, 1)
|
||||
closeResponseChan = make(chan *wsCloseRespObject, 1)
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -321,7 +321,7 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er
|
||||
case wsOpen:
|
||||
log.Info().Int64("timeout(ms)", step.WebSocket.GetTimeout()).Str("url", parsedURL).Msg("open websocket connection")
|
||||
// use the current websocket connection if existed
|
||||
if getWsClient(parsedURL) != nil {
|
||||
if getWsClient(r, parsedURL) != nil {
|
||||
break
|
||||
}
|
||||
resp, err = openWithTimeout(parsedURL, parsedHeader, r, step)
|
||||
@@ -341,7 +341,7 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er
|
||||
case <-timer.C:
|
||||
timer.Stop()
|
||||
log.Warn().Msg("pong timeout")
|
||||
case pongResponse := <-pongResponseChan:
|
||||
case pongResponse := <-r.ws.pongResponseChan:
|
||||
resp = pongResponse
|
||||
log.Info().Msg("pong message arrives")
|
||||
}
|
||||
@@ -433,8 +433,8 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er
|
||||
return stepResult, nil
|
||||
}
|
||||
|
||||
func getWsClient(url string) *websocket.Conn {
|
||||
if client, ok := wsConnMap[url]; ok {
|
||||
func getWsClient(r *SessionRunner, url string) *websocket.Conn {
|
||||
if client, ok := r.ws.wsConnMap[url]; ok {
|
||||
return client
|
||||
}
|
||||
|
||||
@@ -485,14 +485,14 @@ func openWithTimeout(urlStr string, requestHeader http.Header, r *SessionRunner,
|
||||
|
||||
// the following handlers handle and transport control message from server
|
||||
conn.SetPongHandler(func(appData string) error {
|
||||
pongResponseChan <- appData
|
||||
r.ws.pongResponseChan <- appData
|
||||
return nil
|
||||
})
|
||||
conn.SetCloseHandler(func(code int, text string) error {
|
||||
message := websocket.FormatCloseMessage(code, "")
|
||||
conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(defaultWriteWait))
|
||||
select {
|
||||
case closeResponseChan <- &wsCloseRespObject{
|
||||
case r.ws.closeResponseChan <- &wsCloseRespObject{
|
||||
StatusCode: code,
|
||||
Text: text,
|
||||
}:
|
||||
@@ -502,10 +502,8 @@ func openWithTimeout(urlStr string, requestHeader http.Header, r *SessionRunner,
|
||||
|
||||
return nil
|
||||
})
|
||||
wsMutex.Lock()
|
||||
wsConnMap[urlStr] = conn
|
||||
wsMutex.Unlock()
|
||||
|
||||
r.ws.wsConnMap[urlStr] = conn
|
||||
openResponseChan <- resp
|
||||
}()
|
||||
|
||||
@@ -522,7 +520,7 @@ func openWithTimeout(urlStr string, requestHeader http.Header, r *SessionRunner,
|
||||
}
|
||||
|
||||
func readMessageWithTimeout(urlString string, r *SessionRunner, step *TStep) (*wsReadRespObject, error) {
|
||||
wsConn := getWsClient(urlString)
|
||||
wsConn := getWsClient(r, urlString)
|
||||
if wsConn == nil {
|
||||
return nil, errors.New("try to use existing connection, but there is no connection")
|
||||
}
|
||||
@@ -552,7 +550,7 @@ func readMessageWithTimeout(urlString string, r *SessionRunner, step *TStep) (*w
|
||||
}
|
||||
|
||||
func writeWebSocket(urlString string, r *SessionRunner, step *TStep, stepVariables map[string]interface{}) error {
|
||||
wsConn := getWsClient(urlString)
|
||||
wsConn := getWsClient(r, urlString)
|
||||
if wsConn == nil {
|
||||
return errors.New("try to use existing connection, but there is no connection")
|
||||
}
|
||||
@@ -618,7 +616,7 @@ func writeWithAction(c *websocket.Conn, step *TStep, messageType int, message []
|
||||
}
|
||||
|
||||
func closeWithTimeout(urlString string, r *SessionRunner, step *TStep, stepVariables map[string]interface{}) (*wsCloseRespObject, error) {
|
||||
wsConn := getWsClient(urlString)
|
||||
wsConn := getWsClient(r, urlString)
|
||||
if wsConn == nil {
|
||||
return nil, errors.New("no connection needs to be closed")
|
||||
}
|
||||
@@ -657,7 +655,7 @@ func closeWithTimeout(urlString string, r *SessionRunner, step *TStep, stepVaria
|
||||
return nil, errors.New("close timeout")
|
||||
case err := <-errorChan:
|
||||
return nil, err
|
||||
case closeResult := <-closeResponseChan:
|
||||
case closeResult := <-r.ws.closeResponseChan:
|
||||
return closeResult, nil
|
||||
}
|
||||
}
|
||||
@@ -695,7 +693,7 @@ func getContentSize(resp interface{}) int64 {
|
||||
// releaseResources releases resources used by session runner
|
||||
func (r *SessionRunner) releaseResources() {
|
||||
// close websocket connections
|
||||
for _, wsConn := range wsConnMap {
|
||||
for _, wsConn := range r.ws.wsConnMap {
|
||||
if wsConn != nil {
|
||||
log.Info().Str("testcase", r.caseRunner.Config.Name).Msg("websocket disconnected")
|
||||
err := wsConn.Close()
|
||||
|
||||
Reference in New Issue
Block a user