You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

379 lines
8.9 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package models
import (
"context"
"encoding/json"
"fmt"
"ginchat/utils"
"net"
"net/http"
"strconv"
"sync"
"time"
"github.com/go-redis/redis/v8"
"github.com/gorilla/websocket"
"github.com/spf13/viper"
"gopkg.in/fatih/set.v0"
"gorm.io/gorm"
)
//消息
type Message struct {
gorm.Model
UserId int64 //发送者
TargetId int64 //接受者
Type int //发送类型 1私聊 2群聊 3心跳
Media int //消息类型 1文字 2表情包 3语音 4图片 /表情包
Content string //消息内容
CreateTime uint64 //创建时间
ReadTime uint64 //读取时间
Pic string
Url string
Desc string
Amount int //其他数字统计
}
func (table *Message) TableName() string {
return "message"
}
// const (
// HeartbeatMaxTime = 1 * 60
// )
type Node struct {
Conn *websocket.Conn //连接
Addr string //客户端地址
FirstTime uint64 //首次连接时间
HeartbeatTime uint64 //心跳时间
LoginTime uint64 //登录时间
DataQueue chan []byte //消息
GroupSets set.Interface //好友 / 群
}
//映射关系
var clientMap map[int64]*Node = make(map[int64]*Node, 0)
//读写锁
var rwLocker sync.RWMutex
// 需要 发送者ID 接受者ID ,消息类型,发送的内容,发送类型
func Chat(writer http.ResponseWriter, request *http.Request) {
//1. 获取参数 并 检验 token 等合法性
//token := query.Get("token")
query := request.URL.Query()
Id := query.Get("userId")
userId, _ := strconv.ParseInt(Id, 10, 64)
//msgType := query.Get("type")
//targetId := query.Get("targetId")
// context := query.Get("context")
isvalida := true //checkToke() 待.........
conn, err := (&websocket.Upgrader{
//token 校验
CheckOrigin: func(r *http.Request) bool {
return isvalida
},
}).Upgrade(writer, request, nil)
if err != nil {
fmt.Println(err)
return
}
//2.获取conn
currentTime := uint64(time.Now().Unix())
node := &Node{
Conn: conn,
Addr: conn.RemoteAddr().String(), //客户端地址
HeartbeatTime: currentTime, //心跳时间
LoginTime: currentTime, //登录时间
DataQueue: make(chan []byte, 50),
GroupSets: set.New(set.ThreadSafe),
}
//3. 用户关系
//4. userid 跟 node绑定 并加锁
rwLocker.Lock()
clientMap[userId] = node
rwLocker.Unlock()
//5.完成发送逻辑
go sendProc(node)
//6.完成接受逻辑
go recvProc(node)
//7.加入在线用户到缓存
SetUserOnlineInfo("online_"+Id, []byte(node.Addr), time.Duration(viper.GetInt("timeout.RedisOnlineTime"))*time.Hour)
//sendMsg(userId, []byte("欢迎进入聊天系统"))
}
func sendProc(node *Node) {
for {
select {
case data := <-node.DataQueue:
fmt.Println("[ws]sendProc >>>> msg :", string(data))
err := node.Conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
fmt.Println(err)
return
}
}
}
}
func recvProc(node *Node) {
for {
_, data, err := node.Conn.ReadMessage()
if err != nil {
fmt.Println(err)
return
}
msg := Message{}
err = json.Unmarshal(data, &msg)
if err != nil {
fmt.Println(err)
}
//心跳检测 msg.Media == -1 || msg.Type == 3
if msg.Type == 3 {
currentTime := uint64(time.Now().Unix())
node.Heartbeat(currentTime)
} else {
dispatch(data)
broadMsg(data) //todo 将消息广播到局域网
fmt.Println("[ws] recvProc <<<<< ", string(data))
}
}
}
var udpsendChan chan []byte = make(chan []byte, 1024)
func broadMsg(data []byte) {
udpsendChan <- data
}
func init() {
go udpSendProc()
go udpRecvProc()
fmt.Println("init goroutine ")
}
//完成udp数据发送协程
func udpSendProc() {
con, err := net.DialUDP("udp", nil, &net.UDPAddr{
IP: net.IPv4(192, 168, 0, 255),
Port: viper.GetInt("port.udp"),
})
defer con.Close()
if err != nil {
fmt.Println(err)
}
for {
select {
case data := <-udpsendChan:
fmt.Println("udpSendProc data :", string(data))
_, err := con.Write(data)
if err != nil {
fmt.Println(err)
return
}
}
}
}
//完成udp数据接收协程
func udpRecvProc() {
con, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IPv4zero,
Port: viper.GetInt("port.udp"),
})
if err != nil {
fmt.Println(err)
}
defer con.Close()
for {
var buf [512]byte
n, err := con.Read(buf[0:])
if err != nil {
fmt.Println(err)
return
}
fmt.Println("udpRecvProc data :", string(buf[0:n]))
dispatch(buf[0:n])
}
}
//后端调度逻辑处理
func dispatch(data []byte) {
msg := Message{}
msg.CreateTime = uint64(time.Now().Unix())
err := json.Unmarshal(data, &msg)
if err != nil {
fmt.Println(err)
return
}
switch msg.Type {
case 1: //私信
fmt.Println("dispatch data :", string(data))
sendMsg(msg.TargetId, data)
case 2: //群发
sendGroupMsg(msg.TargetId, data) //发送的群ID ,消息内容
// case 4: // 心跳
// node.Heartbeat()
//case 4:
//
}
}
func sendGroupMsg(targetId int64, msg []byte) {
fmt.Println("开始群发消息")
userIds := SearchUserByGroupId(uint(targetId))
for i := 0; i < len(userIds); i++ {
//排除给自己的
if targetId != int64(userIds[i]) {
sendMsg(int64(userIds[i]), msg)
}
}
}
func JoinGroup(userId uint, comId string) (int, string) {
contact := Contact{}
contact.OwnerId = userId
//contact.TargetId = comId
contact.Type = 2
community := Community{}
utils.DB.Where("id=? or name=?", comId, comId).Find(&community)
if community.Name == "" {
return -1, "没有找到群"
}
utils.DB.Where("owner_id=? and target_id=? and type =2 ", userId, comId).Find(&contact)
if !contact.CreatedAt.IsZero() {
return -1, "已加过此群"
} else {
contact.TargetId = community.ID
utils.DB.Create(&contact)
return 0, "加群成功"
}
}
func sendMsg(userId int64, msg []byte) {
rwLocker.RLock()
node, ok := clientMap[userId]
rwLocker.RUnlock()
jsonMsg := Message{}
json.Unmarshal(msg, &jsonMsg)
ctx := context.Background()
targetIdStr := strconv.Itoa(int(userId))
userIdStr := strconv.Itoa(int(jsonMsg.UserId))
jsonMsg.CreateTime = uint64(time.Now().Unix())
r, err := utils.Red.Get(ctx, "online_"+userIdStr).Result()
if err != nil {
fmt.Println(err)
}
if r != "" {
if ok {
fmt.Println("sendMsg >>> userID: ", userId, " msg:", string(msg))
node.DataQueue <- msg
}
}
var key string
if userId > jsonMsg.UserId {
key = "msg_" + userIdStr + "_" + targetIdStr
} else {
key = "msg_" + targetIdStr + "_" + userIdStr
}
res, err := utils.Red.ZRevRange(ctx, key, 0, -1).Result()
if err != nil {
fmt.Println(err)
}
score := float64(cap(res)) + 1
ress, e := utils.Red.ZAdd(ctx, key, &redis.Z{score, msg}).Result() //jsonMsg
//res, e := utils.Red.Do(ctx, "zadd", key, 1, jsonMsg).Result() //备用 后续拓展 记录完整msg
if e != nil {
fmt.Println(e)
}
fmt.Println(ress)
}
//需要重写此方法才能完整的msg转byte[]
func (msg Message) MarshalBinary() ([]byte, error) {
return json.Marshal(msg)
}
//获取缓存里面的消息
func RedisMsg(userIdA int64, userIdB int64, start int64, end int64, isRev bool) []string {
rwLocker.RLock()
//node, ok := clientMap[userIdA]
rwLocker.RUnlock()
//jsonMsg := Message{}
//json.Unmarshal(msg, &jsonMsg)
ctx := context.Background()
userIdStr := strconv.Itoa(int(userIdA))
targetIdStr := strconv.Itoa(int(userIdB))
var key string
if userIdA > userIdB {
key = "msg_" + targetIdStr + "_" + userIdStr
} else {
key = "msg_" + userIdStr + "_" + targetIdStr
}
//key = "msg_" + userIdStr + "_" + targetIdStr
//rels, err := utils.Red.ZRevRange(ctx, key, 0, 10).Result() //根据score倒叙
var rels []string
var err error
if isRev {
rels, err = utils.Red.ZRange(ctx, key, start, end).Result()
} else {
rels, err = utils.Red.ZRevRange(ctx, key, start, end).Result()
}
if err != nil {
fmt.Println(err) //没有找到
}
// 发送推送消息
/**
// 后台通过websoket 推送消息
for _, val := range rels {
fmt.Println("sendMsg >>> userID: ", userIdA, " msg:", val)
node.DataQueue <- []byte(val)
}**/
return rels
}
//更新用户心跳
func (node *Node) Heartbeat(currentTime uint64) {
node.HeartbeatTime = currentTime
return
}
//清理超时连接
func CleanConnection(param interface{}) (result bool) {
result = true
defer func() {
if r := recover(); r != nil {
fmt.Println("cleanConnection err", r)
}
}()
//fmt.Println("定时任务,清理超时连接 ", param)
//node.IsHeartbeatTimeOut()
currentTime := uint64(time.Now().Unix())
for i := range clientMap {
node := clientMap[i]
if node.IsHeartbeatTimeOut(currentTime) {
fmt.Println("心跳超时..... 关闭连接:", node)
node.Conn.Close()
}
}
return result
}
//用户心跳是否超时
func (node *Node) IsHeartbeatTimeOut(currentTime uint64) (timeout bool) {
if node.HeartbeatTime+viper.GetUint64("timeout.HeartbeatMaxTime") <= currentTime {
fmt.Println("心跳超时。。。自动下线", node)
timeout = true
}
return
}