@ -6,10 +6,12 @@ import (
pbMsg "Open_IM/pkg/proto/chat"
pbMsg "Open_IM/pkg/proto/chat"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"Open_IM/pkg/utils"
"context"
"errors"
"errors"
"github.com/garyburd/redigo/redis"
//"github.com/garyburd/redigo/redis"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/proto"
"gopkg.in/mgo.v2/bson"
"gopkg.in/mgo.v2/bson"
"strconv"
"strconv"
"time"
"time"
)
)
@ -34,42 +36,97 @@ type GroupMember_x struct {
}
}
func ( d * DataBases ) GetMinSeqFromMongo ( uid string ) ( MinSeq uint32 , err error ) {
func ( d * DataBases ) GetMinSeqFromMongo ( uid string ) ( MinSeq uint32 , err error ) {
var i , NB uint32
return 1 , nil
var seqUid string
//var i, NB uint32
//var seqUid string
//session := d.mgoSession.Clone()
//if session == nil {
// return MinSeq, errors.New("session == nil")
//}
//defer session.Close()
//c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
//MaxSeq, err := d.GetUserMaxSeq(uid)
//if err != nil && err != redis.ErrNil {
// return MinSeq, err
//}
//NB = uint32(MaxSeq / singleGocMsgNum)
//for i = 0; i <= NB; i++ {
// seqUid = indexGen(uid, i)
// n, err := c.Find(bson.M{"uid": seqUid}).Count()
// if err == nil && n != 0 {
// if i == 0 {
// MinSeq = 1
// } else {
// MinSeq = uint32(i * singleGocMsgNum)
// }
// break
// }
//}
//return MinSeq, nil
}
func ( d * DataBases ) GetMsgBySeqList ( uid string , seqList [ ] uint32 , operationID string ) ( seqMsg [ ] * open_im_sdk . MsgData , err error ) {
var hasSeqList [ ] uint32
singleCount := 0
session := d . mgoSession . Clone ( )
session := d . mgoSession . Clone ( )
if session == nil {
if session == nil {
return MinSeq , errors . New ( "session == nil" )
return nil , errors . New ( "session == nil" )
}
}
defer session . Close ( )
defer session . Close ( )
c := session . DB ( config . Config . Mongo . DBDatabase ) . C ( cChat )
c := session . DB ( config . Config . Mongo . DBDatabase ) . C ( cChat )
MaxSeq , err := d . GetUserMaxSeq ( uid )
m := func ( uid string , seqList [ ] uint32 ) map [ string ] [ ] uint32 {
if err != nil && err != redis . ErrNil {
t := make ( map [ string ] [ ] uint32 )
return MinSeq , err
for i := 0 ; i < len ( seqList ) ; i ++ {
}
seqUid := getSeqUid ( uid , seqList [ i ] )
NB = uint32 ( MaxSeq / singleGocMsgNum )
if value , ok := t [ seqUid ] ; ! ok {
for i = 0 ; i <= NB ; i ++ {
var temp [ ] uint32
seqUid = indexGen ( uid , i )
t [ seqUid ] = append ( temp , seqList [ i ] )
n , err := c . Find ( bson . M { "uid" : seqUid } ) . Count ( )
if err == nil && n != 0 {
if i == 0 {
MinSeq = 1
} else {
} else {
MinSeq = uint32 ( i * singleGocMsgNum )
t [ seqUid ] = append ( value , seqList [ i ] )
}
}
break
}
}
return t
} ( uid , seqList )
sChat := UserChat { }
for seqUid , value := range m {
if err = c . Find ( bson . M { "uid" : seqUid } ) . One ( & sChat ) ; err != nil {
log . NewError ( operationID , "not find seqUid" , seqUid , value , uid , seqList , err . Error ( ) )
continue
}
singleCount = 0
for i := 0 ; i < len ( sChat . Msg ) ; i ++ {
msg := new ( open_im_sdk . MsgData )
if err = proto . Unmarshal ( sChat . Msg [ i ] . Msg , msg ) ; err != nil {
log . NewError ( operationID , "Unmarshal err" , seqUid , value , uid , seqList , err . Error ( ) )
return nil , err
}
if isContainInt32 ( msg . Seq , value ) {
seqMsg = append ( seqMsg , msg )
hasSeqList = append ( hasSeqList , msg . Seq )
singleCount ++
if singleCount == len ( value ) {
break
}
}
}
}
if len ( hasSeqList ) != len ( seqList ) {
var diff [ ] uint32
diff = utils . Difference ( hasSeqList , seqList )
exceptionMSg := genExceptionMessageBySeqList ( diff )
seqMsg = append ( seqMsg , exceptionMSg ... )
}
}
return MinSeq , nil
return seqMsg , nil
}
}
func ( d * DataBases ) GetMsgBySeqList ( uid string , seqList [ ] uint32 , operationID string ) ( seqMsg [ ] * open_im_sdk . MsgData , err error ) {
func ( d * DataBases ) GetMsgBySeqListMongo2 ( uid string , seqList [ ] uint32 , operationID string ) ( seqMsg [ ] * open_im_sdk . MsgData , err error ) {
var hasSeqList [ ] uint32
var hasSeqList [ ] uint32
singleCount := 0
singleCount := 0
session := d . mgoSession . Clone ( )
ctx , _ := context . WithTimeout ( context . Background ( ) , time . Duration ( config . Config . Mongo . DBTimeout ) * time . Second )
if session == nil {
c := d . mongoClient . Database ( config . Config . Mongo . DBDatabase ) . Collection ( cChat )
return nil , errors . New ( "session == nil" )
}
defer session . Close ( )
c := session . DB ( config . Config . Mongo . DBDatabase ) . C ( cChat )
m := func ( uid string , seqList [ ] uint32 ) map [ string ] [ ] uint32 {
m := func ( uid string , seqList [ ] uint32 ) map [ string ] [ ] uint32 {
t := make ( map [ string ] [ ] uint32 )
t := make ( map [ string ] [ ] uint32 )
for i := 0 ; i < len ( seqList ) ; i ++ {
for i := 0 ; i < len ( seqList ) ; i ++ {
@ -85,7 +142,7 @@ func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID st
} ( uid , seqList )
} ( uid , seqList )
sChat := UserChat { }
sChat := UserChat { }
for seqUid , value := range m {
for seqUid , value := range m {
if err = c . Find ( bson . M { "uid" : seqUid } ) . On e( & sChat ) ; err != nil {
if err = c . Find One ( ctx , bson . M { "uid" : seqUid } ) . Decod e( & sChat ) ; err != nil {
log . NewError ( operationID , "not find seqUid" , seqUid , value , uid , seqList , err . Error ( ) )
log . NewError ( operationID , "not find seqUid" , seqUid , value , uid , seqList , err . Error ( ) )
continue
continue
}
}
@ -115,6 +172,8 @@ func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID st
}
}
return seqMsg , nil
return seqMsg , nil
}
}
func genExceptionMessageBySeqList ( seqList [ ] uint32 ) ( exceptionMsg [ ] * open_im_sdk . MsgData ) {
func genExceptionMessageBySeqList ( seqList [ ] uint32 ) ( exceptionMsg [ ] * open_im_sdk . MsgData ) {
for _ , v := range seqList {
for _ , v := range seqList {
msg := new ( open_im_sdk . MsgData )
msg := new ( open_im_sdk . MsgData )
@ -124,6 +183,37 @@ func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk
return exceptionMsg
return exceptionMsg
}
}
func ( d * DataBases ) SaveUserChatMongo2 ( uid string , sendTime int64 , m * pbMsg . MsgDataToDB ) error {
ctx , _ := context . WithTimeout ( context . Background ( ) , time . Duration ( config . Config . Mongo . DBTimeout ) * time . Second )
c := d . mongoClient . Database ( config . Config . Mongo . DBDatabase ) . Collection ( cChat )
newTime := getCurrentTimestampByMill ( )
operationID := ""
seqUid := getSeqUid ( uid , m . MsgData . Seq )
filter := bson . M { "uid" : seqUid }
var err error
sMsg := MsgInfo { }
sMsg . SendTime = sendTime
if sMsg . Msg , err = proto . Marshal ( m . MsgData ) ; err != nil {
return utils . Wrap ( err , "" )
}
err = c . FindOneAndUpdate ( ctx , filter , bson . M { "$push" : bson . M { "msg" : sMsg } } ) . Err ( )
log . NewDebug ( operationID , "get mgoSession cost time" , getCurrentTimestampByMill ( ) - newTime )
if err != nil {
sChat := UserChat { }
sChat . UID = seqUid
sChat . Msg = append ( sChat . Msg , sMsg )
if _ , err = c . InsertOne ( ctx , & sChat ) ; err != nil {
log . NewDebug ( operationID , "InsertOne failed" , filter )
return utils . Wrap ( err , "" )
}
} else {
log . NewDebug ( operationID , "FindOneAndUpdate ok" , filter )
}
log . NewDebug ( operationID , "find mgo uid cost time" , getCurrentTimestampByMill ( ) - newTime )
return nil
}
func ( d * DataBases ) SaveUserChat ( uid string , sendTime int64 , m * pbMsg . MsgDataToDB ) error {
func ( d * DataBases ) SaveUserChat ( uid string , sendTime int64 , m * pbMsg . MsgDataToDB ) error {
var seqUid string
var seqUid string
newTime := getCurrentTimestampByMill ( )
newTime := getCurrentTimestampByMill ( )
@ -163,115 +253,137 @@ func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToD
return nil
return nil
}
}
func ( d * DataBases ) DelUserChat ( uid string ) error {
func ( d * DataBases ) DelUserChat ( uid string ) error {
session := d . mgoSession . Clone ( )
return nil
if session == nil {
//session := d.mgoSession.Clone()
return errors . New ( "session == nil" )
//if session == nil {
}
// return errors.New("session == nil")
defer session . Close ( )
//}
//defer session.Close()
//
//c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
//
//delTime := time.Now().Unix() - int64(config.Config.Mongo.DBRetainChatRecords)*24*3600
//if err := c.Update(bson.M{"uid": uid}, bson.M{"$pull": bson.M{"msg": bson.M{"sendtime": bson.M{"$lte": delTime}}}}); err != nil {
// return err
//}
//
//return nil
}
c := session . DB ( config . Config . Mongo . DBDatabase ) . C ( cChat )
func ( d * DataBases ) DelUserChatMongo2 ( uid string ) error {
ctx , _ := context . WithTimeout ( context . Background ( ) , time . Duration ( config . Config . Mongo . DBTimeout ) * time . Second )
c := d . mongoClient . Database ( config . Config . Mongo . DBDatabase ) . Collection ( cChat )
filter := bson . M { "uid" : uid }
delTime := time . Now ( ) . Unix ( ) - int64 ( config . Config . Mongo . DBRetainChatRecords ) * 24 * 3600
delTime := time . Now ( ) . Unix ( ) - int64 ( config . Config . Mongo . DBRetainChatRecords ) * 24 * 3600
if err := c . Update ( bson . M { "uid" : uid } , bson . M { "$pull" : bson . M { "msg" : bson . M { "sendtime" : bson . M { "$lte" : delTime } } } } ) ; err != nil {
if _, err := c . UpdateOne ( ctx , filter , bson . M { "$pull" : bson . M { "msg" : bson . M { "sendtime" : bson . M { "$lte" : delTime } } } } ) ; err != nil {
return err
return utils. Wrap ( err, "" )
}
}
return nil
return nil
}
}
func ( d * DataBases ) MgoUserCount ( ) ( int , error ) {
session := d . mgoSession . Clone ( )
if session == nil {
return 0 , errors . New ( "session == nil" )
}
defer session . Close ( )
c := session . DB ( config . Config . Mongo . DBDatabase ) . C ( cChat )
return c . Find ( nil ) . Count ( )
func ( d * DataBases ) MgoUserCount ( ) ( int , error ) {
return 0 , nil
//session := d.mgoSession.Clone()
//if session == nil {
// return 0, errors.New("session == nil")
//}
//defer session.Close()
//
//c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
//
//return c.Find(nil).Count()
}
}
func ( d * DataBases ) MgoSkipUID ( count int ) ( string , error ) {
func ( d * DataBases ) MgoSkipUID ( count int ) ( string , error ) {
session := d . mgoSession . Clone ( )
return "" , nil
if session == nil {
//session := d.mgoSession.Clone()
return "" , errors . New ( "session == nil" )
//if session == nil {
}
// return "", errors.New("session == nil")
defer session . Close ( )
//}
//defer session.Close()
c := session . DB ( config . Config . Mongo . DBDatabase ) . C ( cChat )
//
//c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
sChat := UserChat { }
//
c . Find ( nil ) . Skip ( count ) . Limit ( 1 ) . One ( & sChat )
//sChat := UserChat{}
return sChat . UID , nil
//c.Find(nil).Skip(count).Limit(1).One(&sChat)
//return sChat.UID, nil
}
}
func ( d * DataBases ) GetGroupMember ( groupID string ) [ ] string {
func ( d * DataBases ) GetGroupMember ( groupID string ) [ ] string {
groupInfo := GroupMember_x { }
return nil
groupInfo . GroupID = groupID
//groupInfo := GroupMember_x{}
groupInfo . UIDList = make ( [ ] string , 0 )
//groupInfo.GroupID = groupID
//groupInfo.UIDList = make([]string, 0)
session := d . mgoSession . Clone ( )
//
if session == nil {
//session := d.mgoSession.Clone()
return groupInfo . UIDList
//if session == nil {
}
// return groupInfo.UIDList
defer session . Close ( )
//}
//defer session.Close()
c := session . DB ( config . Config . Mongo . DBDatabase ) . C ( cGroup )
//
//c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup)
if err := c . Find ( bson . M { "groupid" : groupInfo . GroupID } ) . One ( & groupInfo ) ; err != nil {
//
return groupInfo . UIDList
//if err := c.Find(bson.M{"groupid": groupInfo.GroupID}).One(&groupInfo); err != nil {
}
// return groupInfo.UIDList
//}
return groupInfo . UIDList
//
//return groupInfo.UIDList
}
}
func ( d * DataBases ) AddGroupMember ( groupID , uid string ) error {
func ( d * DataBases ) AddGroupMember ( groupID , uid string ) error {
session := d . mgoSession . Clone ( )
if session == nil {
return errors . New ( "session == nil" )
}
defer session . Close ( )
c := session . DB ( config . Config . Mongo . DBDatabase ) . C ( cGroup )
n , err := c . Find ( bson . M { "groupid" : groupID } ) . Count ( )
if err != nil {
return err
}
if n == 0 {
groupInfo := GroupMember_x { }
groupInfo . GroupID = groupID
groupInfo . UIDList = append ( groupInfo . UIDList , uid )
err = c . Insert ( & groupInfo )
if err != nil {
return err
}
} else {
err = c . Update ( bson . M { "groupid" : groupID } , bson . M { "$addToSet" : bson . M { "uidlist" : uid } } )
if err != nil {
return err
}
}
return nil
return nil
//session := d.mgoSession.Clone()
//if session == nil {
// return errors.New("session == nil")
//}
//defer session.Close()
//
//c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup)
//
//n, err := c.Find(bson.M{"groupid": groupID}).Count()
//if err != nil {
// return err
//}
//
//if n == 0 {
// groupInfo := GroupMember_x{}
// groupInfo.GroupID = groupID
// groupInfo.UIDList = append(groupInfo.UIDList, uid)
// err = c.Insert(&groupInfo)
// if err != nil {
// return err
// }
//} else {
// err = c.Update(bson.M{"groupid": groupID}, bson.M{"$addToSet": bson.M{"uidlist": uid}})
// if err != nil {
// return err
// }
//}
//
//return nil
}
}
func ( d * DataBases ) DelGroupMember ( groupID , uid string ) error {
func ( d * DataBases ) DelGroupMember ( groupID , uid string ) error {
session := d . mgoSession . Clone ( )
if session == nil {
return errors . New ( "session == nil" )
}
defer session . Close ( )
c := session . DB ( config . Config . Mongo . DBDatabase ) . C ( cGroup )
if err := c . Update ( bson . M { "groupid" : groupID } , bson . M { "$pull" : bson . M { "uidlist" : uid } } ) ; err != nil {
return err
}
return nil
return nil
//session := d.mgoSession.Clone()
//if session == nil {
// return errors.New("session == nil")
//}
//defer session.Close()
//
//c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup)
//
//if err := c.Update(bson.M{"groupid": groupID}, bson.M{"$pull": bson.M{"uidlist": uid}}); err != nil {
// return err
//}
//
//return nil
}
}
func getCurrentTimestampByMill ( ) int64 {
func getCurrentTimestampByMill ( ) int64 {