Merge remote-tracking branch 'upstream/main'

# Conflicts:
#	internal/api/user.go
#	pkg/common/db/s3/minio/minio.go
pull/636/head
BanTanger 2 years ago
commit 47d126d02e

@ -83,9 +83,6 @@ func (s *Server) GetUsersOnlineStatus(
ctx context.Context, ctx context.Context,
req *msggateway.GetUsersOnlineStatusReq, req *msggateway.GetUsersOnlineStatusReq,
) (*msggateway.GetUsersOnlineStatusResp, error) { ) (*msggateway.GetUsersOnlineStatusResp, error) {
if !tokenverify.IsAppManagerUid(ctx) {
return nil, errs.ErrNoPermission.Wrap("only app manager")
}
var resp msggateway.GetUsersOnlineStatusResp var resp msggateway.GetUsersOnlineStatusResp
for _, userID := range req.UserIDs { for _, userID := range req.UserIDs {
clients, ok := s.LongConnServer.GetUserAllCons(userID) clients, ok := s.LongConnServer.GetUserAllCons(userID)

@ -65,7 +65,13 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
case constant.SuperGroupChatType: case constant.SuperGroupChatType:
err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
default: default:
err = c.pusher.Push2User(ctx, []string{pbData.MsgData.SendID, pbData.MsgData.RecvID}, pbData.MsgData) var pushUserIDs []string
if pbData.MsgData.SendID != pbData.MsgData.RecvID {
pushUserIDs = []string{pbData.MsgData.SendID, pbData.MsgData.RecvID}
} else {
pushUserIDs = []string{pbData.MsgData.SendID}
}
err = c.pusher.Push2User(ctx, pushUserIDs, pbData.MsgData)
} }
if err != nil { if err != nil {
if err == errNoOfflinePusher { if err == errNoOfflinePusher {

@ -42,7 +42,6 @@ type (
friend *rpcclient.FriendRpcClient friend *rpcclient.FriendRpcClient
GroupLocalCache *localcache.GroupLocalCache GroupLocalCache *localcache.GroupLocalCache
ConversationLocalCache *localcache.ConversationLocalCache ConversationLocalCache *localcache.ConversationLocalCache
MessageLocker MessageLocker
Handlers MessageInterceptorChain Handlers MessageInterceptorChain
notificationSender *rpcclient.NotificationSender notificationSender *rpcclient.NotificationSender
} }
@ -91,7 +90,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient), GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient),
ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient), ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient),
friend: &friendRpcClient, friend: &friendRpcClient,
MessageLocker: NewLockerMessage(cacheModel),
} }
s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg)) s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg))
s.addInterceptorHandler(MessageHasReadEnabled) s.addInterceptorHandler(MessageHasReadEnabled)

@ -95,6 +95,7 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID)) conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
} }
conversationIDs = append(conversationIDs, utils.GetSelfNotificationConversationID(req.UserID))
log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs) log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs)
maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs) maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs)
if err != nil { if err != nil {

@ -67,7 +67,7 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
return errors.New("len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname)") return errors.New("len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname)")
} }
for k, v := range config.Config.Manager.UserID { for k, v := range config.Config.Manager.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k]}) users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin})
} }
userDB := relation.NewUserGorm(db) userDB := relation.NewUserGorm(db)
cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()) cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())

@ -33,6 +33,7 @@ func (c *MsgTool) ConversationsDestructMsgs() {
} }
log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations)) log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations))
for _, conversation := range conversations { for _, conversation := range conversations {
ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
log.ZDebug( log.ZDebug(
ctx, ctx,
"UserMsgsDestruct", "UserMsgsDestruct",
@ -45,16 +46,17 @@ func (c *MsgTool) ConversationsDestructMsgs() {
"lastMsgDestructTime", "lastMsgDestructTime",
conversation.LatestMsgDestructTime, conversation.LatestMsgDestructTime,
) )
now := time.Now()
seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
if err != nil { if err != nil {
log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue continue
} }
if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": time.Now()}); err != nil { if len(seqs) > 0 {
if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err != nil {
log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue continue
} }
if len(seqs) > 0 {
if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil { if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
} }

@ -63,7 +63,7 @@ type CommonMsgDatabase interface {
// 删除会话消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) // 删除会话消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error
// 用户标记删除过期消息返回标记删除的seq列表 // 用户标记删除过期消息返回标记删除的seq列表
UserMsgsDestruct(cte context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error)
// 用户根据seq删除消息 // 用户根据seq删除消息
DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error
@ -641,7 +641,7 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
if err != nil || msgDocModel.DocID == "" { if err != nil || msgDocModel.DocID == "" {
if err != nil { if err != nil {
if err == unrelation.ErrMsgListNotExist { if err == unrelation.ErrMsgListNotExist {
log.ZDebug(ctx, "deleteMsgRecursion finished", "conversationID", conversationID, "userID", userID, "index", index) log.ZDebug(ctx, "not doc find", "conversationID", conversationID, "userID", userID, "index", index)
} else { } else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
} }
@ -652,25 +652,38 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
index++ index++
//&& msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli() //&& msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli()
if len(msgDocModel.Msg) > 0 { if len(msgDocModel.Msg) > 0 {
i := 0
var over bool
for _, msg := range msgDocModel.Msg { for _, msg := range msgDocModel.Msg {
i++
if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() { if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() {
if msg.Msg.SendTime > lastMsgDestructTime.UnixMilli() && !utils.Contain(userID, msg.DelList...) { if msg.Msg.SendTime+destructTime*1000 > lastMsgDestructTime.UnixMilli() && !utils.Contain(userID, msg.DelList...) {
seqs = append(seqs, msg.Msg.Seq) seqs = append(seqs, msg.Msg.Seq)
} }
} else { } else {
log.ZDebug(ctx, "deleteMsgRecursion finished", "conversationID", conversationID, "userID", userID, "index", index) log.ZDebug(ctx, "all msg need destruct is found", "conversationID", conversationID, "userID", userID, "index", index, "stop index", i)
over = true
break break
} }
} }
if over {
break
}
} }
} }
log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs)
if len(seqs) > 0 { if len(seqs) > 0 {
latestSeq := seqs[len(seqs)-1] userMinSeq := seqs[len(seqs)-1] + 1
if err := db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, latestSeq); err != nil { currentUserMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err return nil, err
} }
if currentUserMinSeq < userMinSeq {
if err := db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
return nil, err
}
}
} }
return seqs, nil return seqs, nil
} }

@ -207,7 +207,7 @@ func (c *ConversationGorm) GetConversationIDsNeedDestruct(
) (conversations []*relation.ConversationModel, err error) { ) (conversations []*relation.ConversationModel, err error) {
return conversations, utils.Wrap( return conversations, utils.Wrap(
c.db(ctx). c.db(ctx).
Where("is_msg_destruct = 1 && UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) && msg_destruct_time != 0"). Where("is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)").
Find(&conversations). Find(&conversations).
Error, Error,
"", "",

@ -22,6 +22,7 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7"
@ -56,21 +57,20 @@ func NewMinio() (s3.Interface, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
exists, err := client.BucketExists(context.Background(), conf.Bucket) m := &Minio{
if err != nil {
return nil, err
}
if !exists {
if err := client.MakeBucket(context.Background(), conf.Bucket, minio.MakeBucketOptions{}); err != nil {
return nil, err
}
}
return &Minio{
bucket: conf.Bucket, bucket: conf.Bucket,
bucketURL: conf.Endpoint + "/" + conf.Bucket + "/", bucketURL: conf.Endpoint + "/" + conf.Bucket + "/",
opts: opts, opts: opts,
core: &minio.Core{Client: client}, core: &minio.Core{Client: client},
}, nil lock: &sync.Mutex{},
init: false,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := m.initMinio(ctx); err != nil {
fmt.Println("init minio error:", err)
}
return m, nil
} }
type Minio struct { type Minio struct {
@ -78,6 +78,30 @@ type Minio struct {
bucketURL string bucketURL string
opts *minio.Options opts *minio.Options
core *minio.Core core *minio.Core
lock sync.Locker
init bool
}
func (m *Minio) initMinio(ctx context.Context) error {
if m.init {
return nil
}
m.lock.Lock()
defer m.lock.Unlock()
if m.init {
return nil
}
exists, err := m.core.Client.BucketExists(ctx, config.Config.Object.Minio.Bucket)
if err != nil {
return fmt.Errorf("check bucket exists error: %w", err)
}
if !exists {
if err := m.core.Client.MakeBucket(ctx, config.Config.Object.Minio.Bucket, minio.MakeBucketOptions{}); err != nil {
return fmt.Errorf("make bucket error: %w", err)
}
}
m.init = true
return nil
} }
func (m *Minio) Engine() string { func (m *Minio) Engine() string {
@ -93,6 +117,9 @@ func (m *Minio) PartLimit() *s3.PartLimit {
} }
func (m *Minio) InitiateMultipartUpload(ctx context.Context, name string) (*s3.InitiateMultipartUploadResult, error) { func (m *Minio) InitiateMultipartUpload(ctx context.Context, name string) (*s3.InitiateMultipartUploadResult, error) {
if err := m.initMinio(ctx); err != nil {
return nil, err
}
uploadID, err := m.core.NewMultipartUpload(ctx, m.bucket, name, minio.PutObjectOptions{}) uploadID, err := m.core.NewMultipartUpload(ctx, m.bucket, name, minio.PutObjectOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
@ -105,6 +132,9 @@ func (m *Minio) InitiateMultipartUpload(ctx context.Context, name string) (*s3.I
} }
func (m *Minio) CompleteMultipartUpload(ctx context.Context, uploadID string, name string, parts []s3.Part) (*s3.CompleteMultipartUploadResult, error) { func (m *Minio) CompleteMultipartUpload(ctx context.Context, uploadID string, name string, parts []s3.Part) (*s3.CompleteMultipartUploadResult, error) {
if err := m.initMinio(ctx); err != nil {
return nil, err
}
minioParts := make([]minio.CompletePart, len(parts)) minioParts := make([]minio.CompletePart, len(parts))
for i, part := range parts { for i, part := range parts {
minioParts[i] = minio.CompletePart{ minioParts[i] = minio.CompletePart{
@ -142,6 +172,9 @@ func (m *Minio) PartSize(ctx context.Context, size int64) (int64, error) {
} }
func (m *Minio) AuthSign(ctx context.Context, uploadID string, name string, expire time.Duration, partNumbers []int) (*s3.AuthSignResult, error) { func (m *Minio) AuthSign(ctx context.Context, uploadID string, name string, expire time.Duration, partNumbers []int) (*s3.AuthSignResult, error) {
if err := m.initMinio(ctx); err != nil {
return nil, err
}
creds, err := m.opts.Creds.Get() creds, err := m.opts.Creds.Get()
if err != nil { if err != nil {
return nil, err return nil, err
@ -170,6 +203,9 @@ func (m *Minio) AuthSign(ctx context.Context, uploadID string, name string, expi
} }
func (m *Minio) PresignedPutObject(ctx context.Context, name string, expire time.Duration) (string, error) { func (m *Minio) PresignedPutObject(ctx context.Context, name string, expire time.Duration) (string, error) {
if err := m.initMinio(ctx); err != nil {
return "", err
}
rawURL, err := m.core.Client.PresignedPutObject(ctx, m.bucket, name, expire) rawURL, err := m.core.Client.PresignedPutObject(ctx, m.bucket, name, expire)
if err != nil { if err != nil {
return "", err return "", err
@ -178,10 +214,16 @@ func (m *Minio) PresignedPutObject(ctx context.Context, name string, expire time
} }
func (m *Minio) DeleteObject(ctx context.Context, name string) error { func (m *Minio) DeleteObject(ctx context.Context, name string) error {
if err := m.initMinio(ctx); err != nil {
return err
}
return m.core.Client.RemoveObject(ctx, m.bucket, name, minio.RemoveObjectOptions{}) return m.core.Client.RemoveObject(ctx, m.bucket, name, minio.RemoveObjectOptions{})
} }
func (m *Minio) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) { func (m *Minio) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) {
if err := m.initMinio(ctx); err != nil {
return nil, err
}
info, err := m.core.Client.StatObject(ctx, m.bucket, name, minio.StatObjectOptions{}) info, err := m.core.Client.StatObject(ctx, m.bucket, name, minio.StatObjectOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
@ -195,6 +237,9 @@ func (m *Minio) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, er
} }
func (m *Minio) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyObjectInfo, error) { func (m *Minio) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyObjectInfo, error) {
if err := m.initMinio(ctx); err != nil {
return nil, err
}
result, err := m.core.Client.CopyObject(ctx, minio.CopyDestOptions{ result, err := m.core.Client.CopyObject(ctx, minio.CopyDestOptions{
Bucket: m.bucket, Bucket: m.bucket,
Object: dst, Object: dst,
@ -226,10 +271,16 @@ func (m *Minio) IsNotFound(err error) bool {
} }
func (m *Minio) AbortMultipartUpload(ctx context.Context, uploadID string, name string) error { func (m *Minio) AbortMultipartUpload(ctx context.Context, uploadID string, name string) error {
if err := m.initMinio(ctx); err != nil {
return err
}
return m.core.AbortMultipartUpload(ctx, m.bucket, name, uploadID) return m.core.AbortMultipartUpload(ctx, m.bucket, name, uploadID)
} }
func (m *Minio) ListUploadedParts(ctx context.Context, uploadID string, name string, partNumberMarker int, maxParts int) (*s3.ListUploadedPartsResult, error) { func (m *Minio) ListUploadedParts(ctx context.Context, uploadID string, name string, partNumberMarker int, maxParts int) (*s3.ListUploadedPartsResult, error) {
if err := m.initMinio(ctx); err != nil {
return nil, err
}
result, err := m.core.ListObjectParts(ctx, m.bucket, name, uploadID, partNumberMarker, maxParts) result, err := m.core.ListObjectParts(ctx, m.bucket, name, uploadID, partNumberMarker, maxParts)
if err != nil { if err != nil {
return nil, err return nil, err
@ -253,6 +304,9 @@ func (m *Minio) ListUploadedParts(ctx context.Context, uploadID string, name str
} }
func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) { func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) {
if err := m.initMinio(ctx); err != nil {
return "", err
}
reqParams := make(url.Values) reqParams := make(url.Values)
if opt != nil { if opt != nil {
if opt.ContentType != "" { if opt.ContentType != "" {

@ -29,7 +29,7 @@ type UserModel struct {
FaceURL string `gorm:"column:face_url;size:255"` FaceURL string `gorm:"column:face_url;size:255"`
Ex string `gorm:"column:ex;size:1024"` Ex string `gorm:"column:ex;size:1024"`
CreateTime time.Time `gorm:"column:create_time;index:create_time;autoCreateTime"` CreateTime time.Time `gorm:"column:create_time;index:create_time;autoCreateTime"`
AppMangerLevel int32 `gorm:"column:app_manger_level;default:18"` AppMangerLevel int32 `gorm:"column:app_manger_level;default:1"`
GlobalRecvMsgOpt int32 `gorm:"column:global_recv_msg_opt"` GlobalRecvMsgOpt int32 `gorm:"column:global_recv_msg_opt"`
} }

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package msg package locker
import ( import (
"context" "context"

@ -40,7 +40,17 @@ func (s *ZkClient) watch() {
event := <-s.eventChan event := <-s.eventChan
switch event.Type { switch event.Type {
case zk.EventSession: case zk.EventSession:
if event.State == zk.StateHasSession && s.isRegistered {
s.logger.Printf("zk session event stateHasSession: %+v, client prepare to create new temp node", event)
node, err := s.CreateTempNode(s.rpcRegisterName, s.rpcRegisterAddr)
if err != nil {
s.logger.Printf("zk session event stateHasSession: %+v, create temp node error: %v", event, err)
} else {
s.node = node
}
} else {
s.logger.Printf("zk session event: %+v", event) s.logger.Printf("zk session event: %+v", event)
}
case zk.EventNodeChildrenChanged: case zk.EventNodeChildrenChanged:
s.logger.Printf("zk event: %s", event.Path) s.logger.Printf("zk event: %s", event.Path)
l := strings.Split(event.Path, "/") l := strings.Split(event.Path, "/")

@ -30,6 +30,14 @@ func (s *ZkClient) CreateRpcRootNodes(serviceNames []string) error {
return nil return nil
} }
func (s *ZkClient) CreateTempNode(rpcRegisterName, addr string) (node string, err error) {
return s.conn.CreateProtectedEphemeralSequential(
s.getPath(rpcRegisterName)+"/"+addr+"_",
[]byte(addr),
zk.WorldACL(zk.PermAll),
)
}
func (s *ZkClient) Register(rpcRegisterName, host string, port int, opts ...grpc.DialOption) error { func (s *ZkClient) Register(rpcRegisterName, host string, port int, opts ...grpc.DialOption) error {
if err := s.ensureName(rpcRegisterName); err != nil { if err := s.ensureName(rpcRegisterName); err != nil {
return err return err
@ -39,15 +47,14 @@ func (s *ZkClient) Register(rpcRegisterName, host string, port int, opts ...grpc
if err != nil { if err != nil {
return err return err
} }
node, err := s.conn.CreateProtectedEphemeralSequential( node, err := s.CreateTempNode(rpcRegisterName, addr)
s.getPath(rpcRegisterName)+"/"+addr+"_",
[]byte(addr),
zk.WorldACL(zk.PermAll),
)
if err != nil { if err != nil {
return err return err
} }
s.rpcRegisterName = rpcRegisterName
s.rpcRegisterAddr = addr
s.node = node s.node = node
s.isRegistered = true
return nil return nil
} }
@ -60,6 +67,9 @@ func (s *ZkClient) UnRegister() error {
} }
time.Sleep(time.Second) time.Sleep(time.Second)
s.node = "" s.node = ""
s.rpcRegisterName = ""
s.rpcRegisterAddr = ""
s.isRegistered = false
s.localConns = make(map[string][]grpc.ClientConnInterface) s.localConns = make(map[string][]grpc.ClientConnInterface)
s.resolvers = make(map[string]*Resolver) s.resolvers = make(map[string]*Resolver)
return nil return nil

@ -73,7 +73,11 @@ func (s *ZkClient) Build(
opts resolver.BuildOptions, opts resolver.BuildOptions,
) (resolver.Resolver, error) { ) (resolver.Resolver, error) {
s.logger.Printf("build resolver: %+v, cc: %+v", target, cc.UpdateState) s.logger.Printf("build resolver: %+v, cc: %+v", target, cc.UpdateState)
// log.ZDebug(context.Background(), "build resolver start", "target", target, "cc", cc.UpdateState) serviceName := strings.TrimLeft(target.URL.Path, "/")
if oldResolver, ok := s.resolvers[serviceName]; ok {
s.logger.Printf("rpc resolver exist: %+v, cc: %+v, key: %s", target, cc.UpdateState, serviceName)
return oldResolver, nil
}
r := &Resolver{} r := &Resolver{}
r.target = target r.target = target
r.cc = cc r.cc = cc
@ -81,11 +85,8 @@ func (s *ZkClient) Build(
r.ResolveNowZK(resolver.ResolveNowOptions{}) r.ResolveNowZK(resolver.ResolveNowOptions{})
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
serviceName := strings.TrimLeft(target.URL.Path, "/")
s.resolvers[serviceName] = r s.resolvers[serviceName] = r
s.logger.Printf("build resolver finished: %+v, cc: %+v, key: %s", target, cc.UpdateState, serviceName) s.logger.Printf("build resolver finished: %+v, cc: %+v, key: %s", target, cc.UpdateState, serviceName)
// log.ZDebug(context.Background(), "build resolver finished", "target", target, "cc", cc.UpdateState,
// "serviceName", serviceName)
return r, nil return r, nil
} }

@ -40,6 +40,9 @@ type ZkClient struct {
userName string userName string
password string password string
rpcRegisterName string
rpcRegisterAddr string
isRegistered bool
scheme string scheme string
timeout int timeout int
@ -136,6 +139,7 @@ func NewClient(zkServers []string, zkRoot string, options ...ZkOption) (*ZkClien
resolver.Register(client) resolver.Register(client)
go client.refresh() go client.refresh()
go client.watch() go client.watch()
time.Sleep(time.Millisecond * 50)
return client, nil return client, nil
} }

@ -14,13 +14,16 @@
package auth package auth
import "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
)
func (x *UserTokenReq) Check() error { func (x *UserTokenReq) Check() error {
if x.UserID == "" { if x.UserID == "" {
return errs.ErrArgs.Wrap("userID is empty") return errs.ErrArgs.Wrap("userID is empty")
} }
if x.PlatformID > 9 || x.PlatformID < 1 { if x.PlatformID > constant.AdminPlatformID || x.PlatformID < constant.IOSPlatformID {
return errs.ErrArgs.Wrap("platform is invalidate") return errs.ErrArgs.Wrap("platform is invalidate")
} }
return nil return nil
@ -30,7 +33,7 @@ func (x *ForceLogoutReq) Check() error {
if x.UserID == "" { if x.UserID == "" {
return errs.ErrArgs.Wrap("userID is empty") return errs.ErrArgs.Wrap("userID is empty")
} }
if x.PlatformID > 9 || x.PlatformID < 1 { if x.PlatformID > constant.AdminPlatformID || x.PlatformID < constant.IOSPlatformID {
return errs.ErrArgs.Wrap("platformID is invalidate") return errs.ErrArgs.Wrap("platformID is invalidate")
} }
return nil return nil

@ -22,14 +22,13 @@ package auth
import ( import (
context "context" context "context"
reflect "reflect"
sync "sync"
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes" codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status" status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
) )
const ( const (

@ -22,16 +22,14 @@ package conversation
import ( import (
context "context" context "context"
reflect "reflect" wrapperspb "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/wrapperspb"
sync "sync"
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes" codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status" status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
wrapperspb "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/wrapperspb" sync "sync"
) )
const ( const (

@ -21,11 +21,10 @@
package errinfo package errinfo
import ( import (
reflect "reflect"
sync "sync"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
) )
const ( const (

@ -152,9 +152,6 @@ func (x *SetFriendRemarkReq) Check() error {
if x.FriendUserID == "" { if x.FriendUserID == "" {
return errs.ErrArgs.Wrap("fromUserID is empty") return errs.ErrArgs.Wrap("fromUserID is empty")
} }
if x.Remark == "" {
return errs.ErrArgs.Wrap("remark is empty")
}
return nil return nil
} }

@ -22,16 +22,14 @@ package friend
import ( import (
context "context" context "context"
reflect "reflect" sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
sync "sync"
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes" codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status" status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" sync "sync"
) )
const ( const (

@ -22,17 +22,15 @@ package group
import ( import (
context "context" context "context"
reflect "reflect" sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
sync "sync" wrapperspb "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/wrapperspb"
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes" codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status" status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" sync "sync"
wrapperspb "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/wrapperspb"
) )
const ( const (

@ -22,16 +22,14 @@ package msg
import ( import (
context "context" context "context"
reflect "reflect" sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
sync "sync"
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes" codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status" status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" sync "sync"
) )
const ( const (

@ -22,16 +22,14 @@ package msggateway
import ( import (
context "context" context "context"
reflect "reflect" sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
sync "sync"
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes" codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status" status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" sync "sync"
) )
const ( const (

@ -22,16 +22,14 @@ package push
import ( import (
context "context" context "context"
reflect "reflect" sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
sync "sync"
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes" codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status" status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" sync "sync"
) )
const ( const (

@ -21,13 +21,11 @@
package sdkws package sdkws
import ( import (
reflect "reflect" wrapperspb "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/wrapperspb"
sync "sync"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
wrapperspb "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/wrapperspb" sync "sync"
) )
const ( const (

@ -21,10 +21,9 @@
package statistics package statistics
import ( import (
reflect "reflect"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
) )
const ( const (

@ -22,14 +22,13 @@ package third
import ( import (
context "context" context "context"
reflect "reflect"
sync "sync"
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes" codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status" status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
) )
const ( const (

@ -137,9 +137,6 @@ func (x *GetPaginationUsersReq) Check() error {
} }
func (x *UserRegisterReq) Check() error { func (x *UserRegisterReq) Check() error {
if x.Secret == "" {
return errs.ErrArgs.Wrap("Secret is empty")
}
if x.Users == nil { if x.Users == nil {
return errs.ErrArgs.Wrap("Users is empty") return errs.ErrArgs.Wrap("Users is empty")
} }

@ -22,17 +22,15 @@ package user
import ( import (
context "context" context "context"
reflect "reflect" conversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
sync "sync" sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes" codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status" status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
conversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" sync "sync"
sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
) )
const ( const (

@ -21,11 +21,10 @@
package wrapperspb package wrapperspb
import ( import (
reflect "reflect"
sync "sync"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
) )
const ( const (

@ -73,6 +73,7 @@ func newContentTypeConf() map[int32]config.NotificationConf {
// msg // msg
constant.MsgRevokeNotification: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg}, constant.MsgRevokeNotification: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
constant.HasReadReceipt: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg}, constant.HasReadReceipt: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
constant.DeleteMsgsNotification: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
} }
} }
@ -116,7 +117,7 @@ func newSessionTypeConf() map[int32]int32 {
constant.ConversationUnreadNotification: constant.SingleChatType, constant.ConversationUnreadNotification: constant.SingleChatType,
constant.ConversationPrivateChatNotification: constant.SingleChatType, constant.ConversationPrivateChatNotification: constant.SingleChatType,
// delete // delete
constant.MsgDeleteNotification: constant.SingleChatType, constant.DeleteMsgsNotification: constant.SingleChatType,
} }
} }

@ -36,7 +36,7 @@ func (m *MsgNotificationSender) UserDeleteMsgsNotification(ctx context.Context,
ConversationID: conversationID, ConversationID: conversationID,
Seqs: seqs, Seqs: seqs,
} }
return m.Notification(ctx, userID, userID, constant.MsgDeleteNotification, &tips) return m.Notification(ctx, userID, userID, constant.DeleteMsgsNotification, &tips)
} }
func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conversationID string, sesstionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) error { func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conversationID string, sesstionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) error {

@ -367,6 +367,10 @@ func GetNotificationConversationIDByConversationID(conversationID string) string
return "" return ""
} }
func GetSelfNotificationConversationID(userID string) string {
return "n_" + userID + "_" + userID
}
func GetSeqsBeginEnd(seqs []int64) (int64, int64) { func GetSeqsBeginEnd(seqs []int64) (int64, int64) {
if len(seqs) == 0 { if len(seqs) == 0 {
return 0, 0 return 0, 0

@ -0,0 +1,11 @@
go build -o api.exe ../cmd/openim-api/main.go
go build -o auth.exe ../cmd/openim-rpc/openim-rpc-auth/main.go
go build -o conversation.exe ../cmd/openim-rpc/openim-rpc-conversation/main.go
go build -o friend.exe ../cmd/openim-rpc/openim-rpc-friend/main.go
go build -o group.exe ../cmd/openim-rpc/openim-rpc-group/main.go
go build -o msg.exe ../cmd/openim-rpc/openim-rpc-msg/main.go
go build -o third.exe ../cmd/openim-rpc/openim-rpc-third/main.go
go build -o user.exe ../cmd/openim-rpc/openim-rpc-user/main.go
go build -o push.exe ../cmd/openim-push/main.go
go build -o msgtransfer.exe ../cmd/openim-msgtransfer/main.go
go build -o msggateway.exe ../cmd/openim-msggateway/main.go
Loading…
Cancel
Save