Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: limit the maximum number of message hashes to request per query #1190

Merged
merged 2 commits into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 44 additions & 26 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

const maxContentTopicsPerRequest = 10
const maxMsgHashesPerRequest = 50

// MessageTracker should keep track of messages it has seen before and
// provide a way to determine whether a message exists or not. This
Expand Down Expand Up @@ -247,38 +248,55 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
return nil
}

result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
return m.store.QueryByHash(ctx, missingHashes, store.WithPeer(interest.peerID), store.WithPaging(false, 100))
}, logger, "retrieving missing messages")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))
wg := sync.WaitGroup{}
// Split into batches
for i := 0; i < len(missingHashes); i += maxMsgHashesPerRequest {
j := i + maxMsgHashesPerRequest
if j > len(missingHashes) {
j = len(missingHashes)
}
return err
}

for !result.IsComplete() {
for _, mkv := range result.Messages() {
select {
case c <- protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()):
default:
m.logger.Warn("subscriber is too slow!")
}
}
wg.Add(1)
go func(messageHashes []pb.MessageHash) {
defer wg.Wait()

result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
if err = result.Next(ctx); err != nil {
return nil, err
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
return m.store.QueryByHash(ctx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest))
}, logger, "retrieving missing messages")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))
}
return
}
return result, nil
}, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))

for !result.IsComplete() {
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
for _, mkv := range result.Messages() {
select {
case c <- protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()):
default:
m.logger.Warn("subscriber is too slow!")
richard-ramos marked this conversation as resolved.
Show resolved Hide resolved
}
}

result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
if err = result.Next(ctx); err != nil {
return nil, err
}
return result, nil
}, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))
}
return
}
}
return err
}

}(missingHashes[i:j])
}

wg.Wait()

return nil
}
8 changes: 4 additions & 4 deletions waku/v2/api/publish/message_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"go.uber.org/zap"
)

const DefaultMaxHashQueryLength = 100
const DefaultMaxHashQueryLength = 50
const DefaultHashQueryInterval = 3 * time.Second
const DefaultMessageSentPeriod = 3 // in seconds
const DefaultMessageExpiredPerid = 10 // in seconds
Expand Down Expand Up @@ -216,7 +216,7 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
messageHashes[i] = pb.ToMessageHash(hash.Bytes())
}

m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Any("messageHashes", messageHashes))
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes))

result, err := m.store.QueryByHash(ctx, messageHashes, opts...)
if err != nil {
Expand Down Expand Up @@ -248,8 +248,8 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
}
}

m.logger.Debug("ack message hashes", zap.Any("ackHashes", ackHashes))
m.logger.Debug("missed message hashes", zap.Any("missedHashes", missedHashes))
m.logger.Debug("ack message hashes", zap.Stringers("ackHashes", ackHashes))
m.logger.Debug("missed message hashes", zap.Stringers("missedHashes", missedHashes))

return append(ackHashes, missedHashes...)
}
Loading