Commit b597f4a1 by Ford

修改了WorldStart支持高并发,加了协程。

parent a1710d95
......@@ -44,6 +44,8 @@
"app_secret": "cdd08ede-b836-44d2-8c10-b6abc0ac57aa",
"exhibition_mode_app_id": 1,
"voice_set_model_url": "http://127.0.0.1:17666",
"translate_prop_url": "http://52.83.116.11:13679/MindEpoch/Resonate",
"draw_ai_url": "http://52.83.116.11:13679",
"wechat_url": "https://api.weixin.qq.com/sns/jscode2session?appid=%s&secret=%s&js_code=%s&grant_type=authorization_code",
"wechat_app_id": "wx3a8c6fc90ee21b81",
......
package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
// 服务器的 WebSocket URL 和 token
var wsBaseURL = "ws://localhost:8099/WorldChats/ws?worldId=93&AuthId=fc9058b44f7850b4438afb11033891ab"
var token1 = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3Mjk1MjI2MzksImxldmVsIjowLCJ1c2VyX2lkIjo1ODV9.rqUCi2qpJ8knLlaguvn9k5ES_fWREfKJjqhiYeNNaiM"
// WebSocket 连接的客户端函数
func connectToWebSocket(wg *sync.WaitGroup, uid int) {
defer wg.Done()
// 构造 WebSocket URL,使用不同的 uid
wsURL := fmt.Sprintf("%s&uid=%d", wsBaseURL, uid)
// 设置请求头,包括 Token
header := http.Header{}
header.Add("Token", token1)
// 建立 WebSocket 连接
conn, _, err := websocket.DefaultDialer.Dial(wsURL, header)
if err != nil {
log.Printf("用户 %d: 无法连接到服务器: %v\n", uid, err)
return
}
defer conn.Close()
// 向服务器发送测试消息
message := fmt.Sprintf("Hello from client %d", uid)
err = conn.WriteMessage(websocket.TextMessage, []byte(message))
if err != nil {
log.Printf("用户 %d: 发送消息失败: %v\n", uid, err)
return
}
// 读取服务器的回复
_, reply, err := conn.ReadMessage()
if err != nil {
log.Printf("用户 %d: 读取消息失败: %v\n", uid, err)
return
}
log.Printf("用户 %d: 收到服务器回复: %s\n", uid, string(reply))
}
func main() {
// 并发连接的用户 uid 从 634 到 741
startUID := 634
endUID := 741
var wg sync.WaitGroup
// 启动并发的 WebSocket 连接
for uid := startUID; uid <= endUID; uid++ {
wg.Add(1)
go connectToWebSocket(&wg, uid)
time.Sleep(10 * time.Millisecond) // 避免瞬时过多请求,可以根据需要调整或去掉
}
// 等待所有连接完成
wg.Wait()
log.Printf("所有用户 (%d 到 %d) 的 WebSocket 连接测试已完成\n", startUID, endUID)
}
......@@ -3,14 +3,12 @@ package main
import (
"WorldEpcho/src/config"
"WorldEpcho/src/datasource"
"WorldEpcho/src/https_auth"
"WorldEpcho/src/routers"
"WorldEpcho/src/service"
"WorldEpcho/src/utils"
"fmt"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
"net/http"
"time"
)
......@@ -101,20 +99,20 @@ func main() {
//service.ExampleSend()
router.GET("/https_test", func(c *gin.Context) {
fmt.Println(c.Request.Host)
c.JSON(http.StatusOK, gin.H{
"code": http.StatusOK,
"result": "测试成功",
/* router.GET("/https_test", func(c *gin.Context) {
fmt.Println(c.Request.Host)
c.JSON(http.StatusOK, gin.H{
"code": http.StatusOK,
"result": "测试成功",
})
})
})
router.Use(https_auth.HttpsHandler()) //https对应的中间件
path := "https_auth/CA/" //证书的路径
router.RunTLS(":8099", path+"admin.mindepoch.com.pem", path+"admin.mindepoch.com.key") //开启HTTPS服务
router.Use(https_auth.HttpsHandler()) //https对应的中间件
path := "https_auth/CA/" //证书的路径
router.RunTLS(":8099", path+"admin.mindepoch.com.pem", path+"admin.mindepoch.com.key") //开启HTTPS服务*/
//初始化redis配置
//utils.InitRedisStore()
//Run("里面不指定端口号默认为8080")
//router.Run(":8099")
router.Run(":8099")
}
......@@ -108,7 +108,7 @@ func AiDraw(c *gin.Context) {
/*
判断用户是否登录
*/
id, isLogin := IsUserLoggedIn(c)
id, isLogin := IsUserLoggedIn1(c)
if !isLogin {
log.Println("用户未登录")
c.JSON(http.StatusOK, gin.H{"code": 0, "message": "用户未登录"})
......@@ -217,7 +217,7 @@ func AiDraw(c *gin.Context) {
var requestData map[string]interface{}
err = json.Unmarshal([]byte(jsonString), &requestData)
if err != nil {
log.Printf("Error occurred during unmarshaling. Error: %s", err.Error())
log.Fatalf("Error occurred during unmarshaling. Error: %s", err.Error())
c.JSON(http.StatusOK, gin.H{
"code": 0,
"message": "解析请求体为JSON字符串出错",
......@@ -228,7 +228,7 @@ func AiDraw(c *gin.Context) {
// 使用 json.Marshal 将 map 转换回 JSON 字符串,以发送 HTTP 请求
requestBytes, err := json.Marshal(requestData)
if err != nil {
log.Printf("Error occurred during marshaling. Error: %s", err.Error())
log.Fatalf("Error occurred during marshaling. Error: %s", err.Error())
c.JSON(http.StatusOK, gin.H{"code": 0, "message": err.Error()})
return
}
......@@ -237,7 +237,7 @@ func AiDraw(c *gin.Context) {
resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBytes))
fmt.Println("发送请求url: ", url, "clientID: ", Rcd5)
if err != nil {
log.Printf("Error occurred during POST request. Error: %s", err.Error())
log.Fatalf("Error occurred during POST request. Error: %s", err.Error())
c.JSON(http.StatusOK, gin.H{"code": 0, "message": err.Error()})
return
}
......@@ -245,7 +245,7 @@ func AiDraw(c *gin.Context) {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("Error occurred during reading response body. Error: %s", err.Error())
log.Fatalf("Error occurred during reading response body. Error: %s", err.Error())
c.JSON(http.StatusOK, gin.H{"code": 0, "message": err.Error()})
return
}
......@@ -253,7 +253,7 @@ func AiDraw(c *gin.Context) {
var response DrawAIResponse
err = json.Unmarshal(body, &response)
if err != nil {
log.Printf("Error occurred during unmarshaling response body. Error: %s", err.Error())
log.Fatalf("Error occurred during unmarshaling response body. Error: %s", err.Error())
c.JSON(http.StatusOK, gin.H{"code": 0, "message": err.Error()})
return
}
......@@ -261,7 +261,7 @@ func AiDraw(c *gin.Context) {
fmt.Println("response: ==> ", string(body))
if resp.StatusCode != 200 || response.Errors != nil {
log.Printf("Server returned error. Status: %d, Errors: %v", resp.StatusCode, response.Errors)
log.Fatalf("Server returned error. Status: %d, Errors: %v", resp.StatusCode, response.Errors)
c.JSON(http.StatusOK, gin.H{"code": 0, "message": err.Error()})
return
}
......
......@@ -79,6 +79,50 @@ func IsUserLoggedIn(ctx *gin.Context) (int64, bool) {
return 0, false
}
func IsUserLoggedIn1(ctx *gin.Context) (int64, bool) {
session := sessions.Default(ctx)
userId := session.Get("WorldUserID")
var uid int64
var err error
var sessionValid bool
// 先从session中获取用户信息
if userId != nil {
user_id := utils.Strval(userId)
uid, err = strconv.ParseInt(user_id, 10, 64)
if err == nil {
_, sessionValid = config.WorldLoginCacheCode.Get(user_id)
}
}
// 如果 session 有效,直接返回
if sessionValid {
return uid, true
}
// 如果 session 不存在或无效,从请求头中获取 JWT token
tokenString := ctx.GetHeader("Token")
if tokenString != "" {
isValid, err := IsValidMiGuToken(tokenString)
if err == nil && isValid {
// 解析并验证 JWT token
token, isValid, err := ParseMIGUToken(tokenString)
if err == nil && isValid {
// 提取 Token 中的声明(Claims)
if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
if userID, ok := claims["user_id"].(float64); ok {
return int64(userID), true
}
}
}
}
}
// 如果所有方法都失败,返回未授权的访问错误
ctx.JSON(http.StatusOK, gin.H{"code": e.UnauthorizedStatus, "data": nil, "message": "未授权的访问或无效的身份验证"})
return 0, false
}
func MiGuIsUserLoggedIn(ctx *gin.Context) (int64, bool) {
session := sessions.Default(ctx)
userId := session.Get("WorldUserID")
......
......@@ -45,7 +45,7 @@ type WorldReplyMsg struct {
// 转发AI接口意识流响应给前端
type WorldSoulReplyMsg struct {
Code int `json:"code"`
Code int `json:"code"` // 5 ==>
WObj map[string]interface{} `json:"WObj"`
ISLIU string `json:"ISLIU"`
WorldName string `json:"WorldName"`
......@@ -117,10 +117,12 @@ type WorldErrorMessage struct {
Message string `json:"message"`
}
type WorldEchoResponseAndErrorMsg struct {
/*
type WorldEchoResponseAndErrorMsg struct {
SoulReplyMsg WorldSoulReplyMsg //WorldSoulReplyMsg
ErrorMessage WorldErrorMessage `json:"ErrorMessage,omitempty"`
}
}
*/
// Wrapper 结构体仅包含一个字符串字段,用于存储 JSON 字符串
type Wrapper struct {
......
......@@ -13,7 +13,7 @@ import (
"log"
)
func (manager *WorldClientManager) WorldWebSocketStart() {
func (manager *WorldClientManager) WorldWebSocketStart_bak() {
for {
log.Println("<---监听管道通信--->")
select {
......@@ -29,6 +29,7 @@ func (manager *WorldClientManager) WorldWebSocketStart() {
Code: e.WebsocketSuccess,
Content: "已连接至服务器",
}
fmt.Println("WorldManager.Client: ", WorldManager.Client)
msg, err := json.Marshal(replyMsg)
if err != nil {
......@@ -70,3 +71,92 @@ func (manager *WorldClientManager) WorldWebSocketStart() {
}
}
}
func (manager *WorldClientManager) WorldWebSocketStart() {
for {
log.Println("<---监听管道通信--->")
select {
case conn := <-WorldManager.Register: // 建立连接
log.Printf("建立新连接: %v", conn.WorldConversations.Uid)
if conn == nil || conn.Socket == nil {
log.Printf("无效的连接或socket为空")
continue // 如果连接或socket为空,跳过这次循环
}
// 将客户端添加到管理器
WorldManager.Client[utils.Strval(conn.WorldConversations.Uid)] = conn
// 异步启动一个 Goroutine 处理 WebSocket 消息发送,提升并发性能
go func(conn *WorldClient) {
replyMsg := &WorldReplyMsg{
StatusCode: e.SUCCESS,
Code: e.WebsocketSuccess,
Content: "已连接至服务器",
}
// 序列化消息
msg, err := json.Marshal(replyMsg)
if err != nil {
log.Printf("消息序列化失败: %v", err)
return
}
// 在发送消息之前,检查conn.Socket是否为nil
if conn.Socket == nil {
log.Printf("Socket为空,无法发送消息: %v", conn.WorldConversations.Uid)
return
}
// 发送消息给客户端
err = conn.Socket.WriteMessage(websocket.TextMessage, msg)
if err != nil {
log.Printf("消息发送失败: %v", err)
return
}
log.Printf("消息发送成功: %v", conn.WorldConversations.Uid)
}(conn) // 将当前连接传递到 Goroutine
case conn := <-WorldManager.Unregister: // 断开连接
log.Printf("连接断开: %v", conn.WorldConversations.Uid)
if conn == nil || conn.Socket == nil {
log.Printf("无效的连接或socket为空")
continue // 如果连接或socket为空,跳过这次循环
}
// 如果连接存在于客户端列表中,执行断开逻辑
if _, ok := WorldManager.Client[utils.Strval(conn.WorldConversations.Uid)]; ok {
go func(conn *WorldClient) {
replyMsg := &WorldReplyMsg{
Code: e.WebsocketEnd,
Content: "连接已断开",
}
// 序列化消息
msg, err := json.Marshal(replyMsg)
if err != nil {
log.Printf("消息序列化失败: %v", err)
return
}
// 在发送消息之前,检查conn.Socket是否为nil
if conn.Socket == nil {
log.Printf("Socket为空,无法发送断开消息: %v", conn.WorldConversations.Uid)
return
}
// 发送消息给客户端
err = conn.Socket.WriteMessage(websocket.TextMessage, msg)
if err != nil {
log.Printf("消息发送失败: %v", err)
return
}
log.Printf("断开消息发送成功: %v", conn.WorldConversations.Uid)
if conn.Send != nil {
close(conn.Send)
}
delete(WorldManager.Client, utils.Strval(conn.WorldConversations.Uid))
}(conn)
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment