feat(fs): change event debounce before emitting to subscriber

master
Aaron Liu 2 weeks ago
parent c01b748dfc
commit 32632db36f

@ -24,3 +24,170 @@ var (
// ErrEventHubClosed is returned when operations are attempted on a closed EventHub.
ErrEventHubClosed = errors.New("event hub is closed")
)
// eventState tracks the accumulated state for each file
type eventState struct {
baseType EventType // The base event type (Create, Delete, or first event type)
originalSrc string // Original source path (for Create or first Rename)
currentDst string // Current destination path
}
/*
Modify + Modify keep only the last Modify;
Create + Modify fold into a single Create with final metadata/content.
Create + Rename(ab) Create at b.
Create + Delete drop both (ephemeral object never needs to reach clients).
Modify + Delete Delete (intermediate Modify is irrelevant to final state).
Rename(ab) + Rename(bc) Rename(ac).
Rename(ab) + Modify emit Rename(ab) then a single Modify at b (or fold Modify into Create if the chain starts with Create).
Rename(ab) + Delete emit only Delete(object_id);
Rename(ab) + Rename(ba) with no intervening Modify drop both (rename there-and-back is a no-op).
Delete + Create might be a valid case, e.g. user restore same file from trash bin.
*/
// DebounceEvents takes time-ordered events and returns debounced/merged events.
func DebounceEvents(in []*Event) []*Event {
if len(in) == 0 {
return nil
}
states := make(map[string]*eventState) // keyed by FileID
order := make([]string, 0) // to preserve order of first appearance
for _, e := range in {
state, exists := states[e.FileID]
if !exists {
// First event for this file
order = append(order, e.FileID)
states[e.FileID] = &eventState{
baseType: e.Type,
originalSrc: e.From,
currentDst: e.To,
}
continue
}
switch e.Type {
case EventTypeCreate:
// Delete + Create → keep as Create (e.g. restore from trash)
if state.baseType == EventTypeDelete {
state.baseType = EventTypeCreate
state.originalSrc = e.From
state.currentDst = ""
}
case EventTypeModify:
switch state.baseType {
case EventTypeCreate:
// Create + Modify → fold into Create (no change needed, Create already implies content)
case EventTypeModify:
// Modify + Modify → keep only last Modify (state already correct)
case EventTypeRename:
// Rename + Modify → fold into first Rename
case EventTypeDelete:
// Delete + Modify → should not happen, but ignore Modify
}
case EventTypeRename:
switch state.baseType {
case EventTypeCreate:
// Create + Rename(a→b) → Create at b
state.originalSrc = e.To
state.currentDst = ""
case EventTypeModify:
// Modify + Rename → emit Rename only
state.baseType = EventTypeRename
state.currentDst = e.To
state.originalSrc = e.From
case EventTypeRename:
// Rename(a→b) + Rename(b→c) → Rename(a→c)
// Check for no-op: Rename(a→b) + Rename(b→a) → drop both
if state.originalSrc == e.To {
// Rename there-and-back, drop both
delete(states, e.FileID)
// Remove from order
for i, id := range order {
if id == e.FileID {
order = append(order[:i], order[i+1:]...)
break
}
}
} else {
state.currentDst = e.To
}
case EventTypeDelete:
// Delete + Rename → should not happen, ignore
}
case EventTypeDelete:
switch state.baseType {
case EventTypeCreate:
// Create + Delete → drop both (ephemeral object)
delete(states, e.FileID)
// Remove from order
for i, id := range order {
if id == e.FileID {
order = append(order[:i], order[i+1:]...)
break
}
}
case EventTypeModify:
// Modify + Delete → Delete
state.baseType = EventTypeDelete
state.originalSrc = e.From
state.currentDst = ""
case EventTypeRename:
// Rename + Delete → Delete only
state.baseType = EventTypeDelete
state.originalSrc = e.From
state.currentDst = ""
case EventTypeDelete:
// Delete + Delete → keep Delete (should not happen normally)
}
}
}
// Build output events in order
result := make([]*Event, 0, len(order))
for _, fileID := range order {
state, exists := states[fileID]
if !exists {
continue
}
switch state.baseType {
case EventTypeCreate:
result = append(result, &Event{
Type: EventTypeCreate,
FileID: fileID,
From: state.originalSrc,
})
case EventTypeModify:
result = append(result, &Event{
Type: EventTypeModify,
FileID: fileID,
From: state.originalSrc,
})
case EventTypeRename:
// If hasModify and base was originally Modify (converted to Rename),
// we need to emit Modify first at original location
// But in our current logic, Modify+Rename sets hasModify=true
// We emit Rename, then Modify if needed
result = append(result, &Event{
Type: EventTypeRename,
FileID: fileID,
From: state.originalSrc,
To: state.currentDst,
})
case EventTypeDelete:
result = append(result, &Event{
Type: EventTypeDelete,
FileID: fileID,
From: state.originalSrc,
})
}
}
return result
}

@ -169,7 +169,8 @@ func (s *subscriber) flushLocked(ctx context.Context) {
} else {
// TODO: implement event merging logic here
// For now, send all buffered events individually
for _, evt := range s.buffer {
debouncedEvents := DebounceEvents(s.buffer)
for _, evt := range debouncedEvents {
select {
case s.ch <- evt:
default:

@ -11,6 +11,7 @@ import (
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
"github.com/gin-gonic/gin"
"github.com/gofrs/uuid"
)
type (
@ -46,6 +47,11 @@ func (s *ExplorerEventService) HandleExplorerEventsPush(c *gin.Context) error {
return serializer.NewError(serializer.CodeParamErr, "Client ID is required", nil)
}
// Client ID must be a valid UUID
if _, err := uuid.FromString(requestInfo.ClientID); err != nil {
return serializer.NewError(serializer.CodeParamErr, "Invalid client ID", err)
}
// Subscribe
eventHub := dep.EventHub()
rx, resumed, err := eventHub.Subscribe(c, parent.ID(), requestInfo.ClientID)

Loading…
Cancel
Save