|
|
|
|
@ -108,6 +108,10 @@ func (s *subscriber) Owner() (*ent.User, error) {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
return s.ownerLocked()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *subscriber) ownerLocked() (*ent.User, error) {
|
|
|
|
|
if time.Since(s.cachedAt) > userCacheTTL || s.ownerCached == nil {
|
|
|
|
|
user, err := s.userClient.GetLoginUserByID(context.Background(), s.uid)
|
|
|
|
|
if err != nil {
|
|
|
|
|
@ -165,7 +169,11 @@ func (s *subscriber) flushLocked(ctx context.Context) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !s.online {
|
|
|
|
|
_ = s.fsEventClient.Create(ctx, s.ownerCached.ID, uuid.FromStringOrNil(s.id), lo.Map(s.buffer, func(item *Event, index int) string {
|
|
|
|
|
owner, err := s.ownerLocked()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
_ = s.fsEventClient.Create(ctx, owner.ID, uuid.FromStringOrNil(s.id), lo.Map(s.buffer, func(item *Event, index int) string {
|
|
|
|
|
res, _ := json.Marshal(item)
|
|
|
|
|
return string(res)
|
|
|
|
|
})...)
|
|
|
|
|
|