From 2bc600f3536b73077b7aea9ffc4a8985121bbd41 Mon Sep 17 00:00:00 2001 From: hawklin2017 <32898629+hawklin2017@users.noreply.github.com> Date: Fri, 15 May 2026 10:59:03 +0800 Subject: [PATCH] redpacket eth indexer poll error --- config/openim-rpc-redpacket.yml | 2 + internal/rpc/redpacket/chain/indexer.go | 66 ++++++++++++++++++------- internal/rpc/redpacket/redpacket.go | 2 +- internal/rpc/rtc/signal.go | 8 +++ pkg/common/config/config.go | 3 ++ 5 files changed, 63 insertions(+), 18 deletions(-) diff --git a/config/openim-rpc-redpacket.yml b/config/openim-rpc-redpacket.yml index 58bed2edf..0af851ba2 100644 --- a/config/openim-rpc-redpacket.yml +++ b/config/openim-rpc-redpacket.yml @@ -29,3 +29,5 @@ tron: # Indexer polling interval (in seconds). Used by both EVM and TRON event indexers. indexer: pollInterval: 5 + # EVM only: max block span per eth_getLogs request (0 = default 2000). Increase if your node allows larger ranges. + maxBlocksPerPoll: 2000 diff --git a/internal/rpc/redpacket/chain/indexer.go b/internal/rpc/redpacket/chain/indexer.go index 590b6049d..9cd7a3981 100644 --- a/internal/rpc/redpacket/chain/indexer.go +++ b/internal/rpc/redpacket/chain/indexer.go @@ -14,27 +14,50 @@ import ( "github.com/openimsdk/tools/log" ) +const defaultIndexerMaxBlocksPerPoll uint64 = 2000 + type Indexer struct { - client *ChainClient - db controller.RedPacketDatabase - pollInterval time.Duration - lastBlock uint64 - contractAddr common.Address + client *ChainClient + db controller.RedPacketDatabase + pollInterval time.Duration + lastBlock uint64 + contractAddr common.Address + maxBlocksPerPoll uint64 // 0 => defaultIndexerMaxBlocksPerPoll } -func NewIndexer(client *ChainClient, db controller.RedPacketDatabase, pollInterval int, startBlock uint64) *Indexer { +func NewIndexer(client *ChainClient, db controller.RedPacketDatabase, pollInterval int, startBlock uint64, maxBlocksPerPoll int) *Indexer { if pollInterval <= 0 { pollInterval = 5 } + var maxB uint64 + if maxBlocksPerPoll > 0 { + maxB = uint64(maxBlocksPerPoll) + } return &Indexer{ - client: client, - db: db, - pollInterval: time.Duration(pollInterval) * time.Second, - lastBlock: startBlock, - contractAddr: client.contractAddr, + client: client, + db: db, + pollInterval: time.Duration(pollInterval) * time.Second, + lastBlock: startBlock, + contractAddr: client.contractAddr, + maxBlocksPerPoll: maxB, } } +func (i *Indexer) chunkEndBlock(chainTip uint64) uint64 { + maxSpan := i.maxBlocksPerPoll + if maxSpan == 0 { + maxSpan = defaultIndexerMaxBlocksPerPoll + } + if chainTip <= i.lastBlock { + return i.lastBlock + } + span := chainTip - i.lastBlock + if span > maxSpan { + return i.lastBlock + maxSpan + } + return chainTip +} + func (i *Indexer) Start(ctx context.Context) { log.ZInfo(ctx, "starting RedPacket ETH event indexer") @@ -105,20 +128,25 @@ func (i *Indexer) poll(ctx context.Context) error { return fmt.Errorf("get header failed: %w", err) } - currentBlock := header.Number.Uint64() - if currentBlock <= i.lastBlock { + chainTip := header.Number.Uint64() + if chainTip <= i.lastBlock { + return nil + } + + toBlock := i.chunkEndBlock(chainTip) + if toBlock <= i.lastBlock { return nil } query := ethereum.FilterQuery{ FromBlock: big.NewInt(int64(i.lastBlock + 1)), - ToBlock: big.NewInt(int64(currentBlock)), + ToBlock: big.NewInt(int64(toBlock)), Addresses: []common.Address{i.contractAddr}, } logs, err := i.client.client.FilterLogs(ctx, query) if err != nil { - return fmt.Errorf("filter logs failed: %w", err) + return fmt.Errorf("filter logs failed (blocks %d-%d): %w", i.lastBlock+1, toBlock, err) } logPtrs := make([]*types.Log, len(logs)) @@ -137,8 +165,12 @@ func (i *Indexer) poll(ctx context.Context) error { } } - i.lastBlock = currentBlock - log.ZInfo(ctx, "redpacket eth indexed", "block", currentBlock, "events", len(events)) + i.lastBlock = toBlock + if toBlock < chainTip { + log.ZDebug(ctx, "redpacket eth indexer chunk done, catching up", "indexedTo", toBlock, "chainTip", chainTip, "events", len(events)) + } else { + log.ZInfo(ctx, "redpacket eth indexed", "block", toBlock, "events", len(events)) + } return nil } diff --git a/internal/rpc/redpacket/redpacket.go b/internal/rpc/redpacket/redpacket.go index 15b9b1139..4609fc488 100644 --- a/internal/rpc/redpacket/redpacket.go +++ b/internal/rpc/redpacket/redpacket.go @@ -138,7 +138,7 @@ func Start(ctx context.Context, conf *Config, registry discovery.SvcDiscoveryReg pbredpacket.RegisterRedPacketServer(server, srv) if chainClient != nil { - ethIndexer := chain.NewIndexer(chainClient, repo, conf.RpcConfig.Indexer.PollInterval, 0) + ethIndexer := chain.NewIndexer(chainClient, repo, conf.RpcConfig.Indexer.PollInterval, 0, conf.RpcConfig.Indexer.MaxBlocksPerPoll) ethIndexer.Start(ctx) } if tronClient != nil { diff --git a/internal/rpc/rtc/signal.go b/internal/rpc/rtc/signal.go index be4b8ea50..bb2f71c1a 100644 --- a/internal/rpc/rtc/signal.go +++ b/internal/rpc/rtc/signal.go @@ -97,6 +97,10 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq, inv.InviterUserID = req.UserID inv.InitiateTime = time.Now().UnixMilli() + if len(inv.InviteeUserIDList) == 0 { + return nil, errs.ErrArgs.WrapMsg("no invitees", "inviteeUserIDList", inv.InviteeUserIDList) + } + for _, inviteeID := range inv.InviteeUserIDList { allowed, err := s.isCallAllowed(ctx, req.UserID, inviteeID) if err != nil { @@ -119,6 +123,10 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq, } inv.BusyLineUserIDList = busyUserIDs + if len(inv.InviteeUserIDList) == len(busyUserIDs) { + return nil, errs.ErrNoPermission.WrapMsg("all invitees are busy", "inviteeUserIDList", inv.InviteeUserIDList) + } + // 从主叫用户资料获取铃声 URL,注入到邀请信息中,被叫方收到后播放主叫方铃声 if inviterInfo, err := s.userClient.GetUserInfo(ctx, req.UserID); err == nil && inviterInfo.CallRingtoneURL != "" { inv.CallerRingtoneURL = inviterInfo.CallRingtoneURL diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index a37fc14d8..34673884a 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -532,6 +532,9 @@ type RedPacketTron struct { type RedPacketIndexer struct { PollInterval int `mapstructure:"pollInterval"` + // MaxBlocksPerPoll limits each eth_getLogs range (lastBlock+1 .. toBlock). Many public RPCs + // reject large ranges; 0 means use default 2000. + MaxBlocksPerPoll int `mapstructure:"maxBlocksPerPoll"` } // FullConfig stores all configurations for before and after events