-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: retain message, max delivery, empty header #181
base: master
Are you sure you want to change the base?
fix: retain message, max delivery, empty header #181
Conversation
WalkthroughThe changes introduce a new boolean field Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (4)
natsjobs/driver.go (1)
410-413
: Consider potential race conditionThe atomic load of
c.listeners
and the check ofc.retainStreamMessages
are not performed atomically. While this might not cause issues in practice due to the Stop method's usage pattern, consider using a mutex to ensure thread safety.-if atomic.LoadUint32(&c.listeners) > 0 && !c.retainStreamMessages { +c.consumerLock.RLock() +shouldPurge := atomic.LoadUint32(&c.listeners) > 0 && !c.retainStreamMessages +c.consumerLock.RUnlock() +if shouldPurge { err := c.stream.Purge(ctx) if err != nil { c.log.Error("drain error", zap.Error(err)) } c.stopCh <- struct{}{} }natsjobs/item.go (3)
6-6
: File is not formatted with goimports.Please run
goimports
to format the code and ensure imports are correctly grouped and ordered.🧰 Tools
🪛 golangci-lint
6-6: File is not
goimports
-ed(goimports)
113-137
: Use appropriate logging levels for heartbeat messages.Currently, the heartbeat is logged at the
Error
level, which may not be appropriate for regular operation. Consider usinglog.Info
orlog.Debug
for heartbeat messages to reduce noise in the logs and reservelog.Error
for actual error conditions.Apply this diff to adjust the logging level:
- log.Error("heartbeat") + log.Info("heartbeat")
118-118
: Consider making the heartbeat interval configurable.The heartbeat interval is hardcoded to 20 seconds. To provide flexibility and accommodate different NATS timeout settings, consider making the heartbeat interval a configurable parameter.
You can modify the code as follows:
Add a new field in the
Options
struct:+ HeartbeatInterval time.Duration
Update the
startHeartbeat
method to use the configurable interval:- ticker := time.NewTicker(20 * time.Second) + ticker := time.NewTicker(i.Options.HeartbeatInterval)Ensure that
HeartbeatInterval
is set during initialization, possibly with a default value if not specified.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
go.work.sum
is excluded by!**/*.sum
📒 Files selected for processing (5)
natsjobs/config.go
(1 hunks)natsjobs/driver.go
(4 hunks)natsjobs/item.go
(4 hunks)natsjobs/listener.go
(1 hunks)schema.json
(1 hunks)
🧰 Additional context used
🪛 golangci-lint
natsjobs/item.go
6-6: File is not goimports
-ed
(goimports)
🔇 Additional comments (10)
natsjobs/config.go (2)
32-33
: Clarify interaction between DeleteStreamOnStop and RetainStreamMessages
These two boolean flags appear to have overlapping or potentially conflicting behaviors:
DeleteStreamOnStop
: Deletes the stream when stoppingRetainStreamMessages
: Retains messages in the stream
Please clarify how these flags interact:
- What happens if both are true?
- Should they be mutually exclusive?
- Which flag takes precedence?
Consider adding validation logic to prevent conflicting configurations.
#!/bin/bash
# Look for any validation or flag interaction handling
rg -A 5 "DeleteStreamOnStop|RetainStreamMessages" --type go
17-17
: Verify configuration key naming
The PR objectives mention reserve_stream_on_stop
as the configuration key, but the code implements retain_stream_messages
. This inconsistency could lead to confusion for users following the PR documentation.
natsjobs/listener.go (3)
70-72
: Verify interaction between heartbeat and message retention.
The PR adds message retention functionality, but we need to ensure the heartbeat mechanism properly interacts with it.
Let's verify the retention implementation:
#!/bin/bash
# Description: Check retention implementation and shutdown handling
# Expected: Find proper cleanup of heartbeat on shutdown
# Search for retention configuration
rg 'retain.*stream.*message'
# Check shutdown handling
ast-grep --pattern 'func ($_ *Driver) Stop() {
$$$
}'
Consider these architectural points:
- Ensure heartbeat is properly stopped when the driver is stopped
- Document the interaction between message retention and heartbeat in comments
- Add metrics/logging to track heartbeat status for retained messages
70-72
: 🛠️ Refactor suggestion
Verify heartbeat implementation and consider adding timeout configuration.
The heartbeat mechanism is a good addition to prevent duplicate tasks due to message timeouts. However, there are a few considerations:
- The heartbeat is started before context propagation, which means it might continue even if the context is canceled.
- There's no configurable timeout for the heartbeat interval.
Let's verify the heartbeat implementation in the Item struct:
#!/bin/bash
# Description: Check the heartbeat implementation details
# Expected: Find the heartbeat implementation with proper context handling
# Search for the heartbeat implementation
ast-grep --pattern 'func (i *Item) startHeartbeat($_) {
$$$
}'
# Check for context usage in heartbeat
rg -A 10 'startHeartbeat'
Consider these improvements:
- Start the heartbeat after context propagation to ensure proper cancellation
- Add configurable heartbeat interval:
-item.startHeartbeat(c.log)
+ctx := c.prop.Extract(context.Background(), propagation.HeaderCarrier(item.headers))
+ctx, span := c.tracer.Tracer(tracerName).Start(ctx, "nats_listener")
+item.startHeartbeat(ctx, c.log)
70-71
: Consider adding error handling for heartbeat initialization.
While simplifying error handling is good, we should ensure failures are properly handled to prevent message redelivery issues.
Let's check the error handling in the codebase:
Consider adding basic error handling:
-item.Options.inProgressFunc = m.InProgress
-item.startHeartbeat(c.log)
+item.Options.inProgressFunc = m.InProgress
+if err := item.startHeartbeat(c.log); err != nil {
+ c.log.Error("failed to start heartbeat", zap.Error(err))
+ // Consider if we should Nak the message here
+}
natsjobs/driver.go (5)
57-65
: LGTM: Driver struct changes are well-organized
The new retainStreamMessages
field is appropriately placed in the config section and follows Go naming conventions.
410-413
: LGTM: Stream retention logic is correctly implemented
The conditional stream purge based on retainStreamMessages
aligns with the PR objectives.
227-236
: Verify intentional difference in default values
The retainStreamMessages
default value is set to true
in FromPipeline
but might be different in FromConfig
. This could lead to inconsistent behavior depending on how the driver is initialized.
#!/bin/bash
# Description: Compare default values between config and pipeline initialization
# Expected: Find default value in config initialization
# Search for default value initialization in config
rg "RetainStreamMessages.*default"
145-154
: Verify config parsing implementation
The initialization of retainStreamMessages
looks correct. Let's verify the config struct implementation.
✅ Verification successful
Config parsing implementation is correctly implemented
The retainStreamMessages
field is properly defined in the config struct with the correct mapstructure tag retain_stream_messages
. The implementation is consistent across:
- Config struct definition in
natsjobs/config.go
- Schema definition in
schema.json
- Driver initialization using the config value
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify config struct implementation for retain_stream_messages
# Expected: Find the config struct definition with the new field
# Search for config struct definition
rg -A 10 "type\s+config\s+struct"
Length of output: 662
Script:
#!/bin/bash
# Let's search for the complete config struct definition to find retainStreamMessages
rg -A 20 "type\s+config\s+struct"
# Also search for any references to retain_stream_messages in the codebase
rg "retain_stream_messages" -i
Length of output: 1384
Line range hint 1-1
: Missing heartbeat implementation
The PR objectives mention adding a heartbeat mechanism to handle the 30-second NATS timeout, but the implementation for this feature is missing in the changes.
Would you like help implementing the heartbeat mechanism mentioned in the PR objectives?
natsjobs/item.go
Outdated
@@ -107,7 +110,41 @@ func (i *Item) Context() ([]byte, error) { | |||
return ctx, nil | |||
} | |||
|
|||
func (i *Item) startHeartbeat(log *zap.Logger) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, refer to the NATS docs about hearbeats.
I set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (2)
natsjobs/item.go (1)
31-34
: Consider header access consistency and thread safetyThe introduction of
JsonItem
with a publicHeaders
field alongsideItem
's privateheaders
creates potential issues:
- Inconsistent access patterns (direct vs method-based)
- Possible race conditions with concurrent modifications
- Risk of header state divergence
Consider these alternatives:
- Use JSON struct tags on the private
headers
field- Implement custom MarshalJSON/UnmarshalJSON methods
- Add synchronization if concurrent access is needed
🧰 Tools
🪛 golangci-lint
[warning] 31-31: var-naming: type JsonItem should be JSONItem
(revive)
natsjobs/driver.go (1)
Implementation does not match PR objectives
The codebase analysis confirms that the message retention configuration and heartbeat functionality are not implemented:
- The configuration struct in
natsjobs/config.go
does not contain any fields for message retention settings.- Stream configurations in
natsjobs/driver.go
andnatsjobs/listener.go
do not include retention-related parameters.- While
MaxDeliver: 1
is set innatsjobs/listener.go
, there are no heartbeat-related configurations or implementations.Please implement:
- Add the proposed
retain_stream_messages
configuration to the config struct- Implement the retention logic in the stream configuration
- Document why heartbeat is not needed if that's the intended approach
🔗 Analysis chain
Line range hint
1-1
: Implementation does not match PR objectivesThe PR objectives mention adding message retention configuration (
retain_stream_messages
) and heartbeat functionality, but these changes are not visible in the implementation. While the author's comment suggests that heartbeat isn't needed withMaxDeliver = 1
, the message retention feature is still missing.Let's verify if these features exist elsewhere:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Look for retention and heartbeat related code # Test: Search for retention configuration rg -i 'retain.*stream.*message' # Test: Search for heartbeat related code rg -i 'heartbeat' # Test: Search for MaxDeliver configuration ast-grep --pattern 'MaxDeliver.*=.*1'Length of output: 91
Script:
#!/bin/bash # Let's try a broader search for retention and configuration related code # Search for retention-related terms rg -i 'retention|retain' # Search for stream configuration rg -i 'stream.*config|config.*stream' # Search for MaxDeliver in any form rg -i 'maxdeliver' # Look for configuration structs ast-grep --pattern 'type $_ struct { $$$ }'Length of output: 6049
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
natsjobs/driver.go
(2 hunks)natsjobs/item.go
(1 hunks)natsjobs/unpack.go
(2 hunks)
🧰 Additional context used
🪛 golangci-lint
natsjobs/item.go
[warning] 31-31: var-naming: type JsonItem should be JSONItem
(revive)
🔇 Additional comments (4)
natsjobs/unpack.go (2)
Line range hint 1-30
: Verify alignment with PR objectives.
While the code changes improve message handling, I don't see direct implementation of the message retention functionality mentioned in the PR objectives (retain_stream_messages: true
). Could you clarify how this file's changes contribute to the message retention feature?
#!/bin/bash
# Description: Look for retention-related configuration and implementation
# Search for retention-related configuration
rg -i 'retain.*stream|stream.*retain' --type go --type yaml
# Search for JetStream configuration
ast-grep --pattern 'jetstream.Config{
$$$
}'
15-16
: LGTM! Consider verifying performance impact.
The introduction of the auxiliary JsonItem structure improves the separation of concerns and makes the code more maintainable. However, since this is in the message processing path, it's worth verifying that the additional allocation doesn't impact performance in high-throughput scenarios.
✅ Verification successful
Based on the analysis of the codebase, I can now generate the final response:
Performance impact is negligible, change is safe
The JsonItem auxiliary structure is used consistently throughout the codebase and follows a common pattern for JSON unmarshaling in Go. The implementation is lightweight with minimal overhead:
- The struct only adds a single Headers field to the base Item
- Used only during unmarshaling, not kept in memory
- No evidence of high-throughput performance requirements in tests
- Similar unmarshaling pattern used elsewhere in the codebase
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Look for benchmark tests that could be affected by this change
# Search for benchmark tests related to message processing
rg -l 'func.*Benchmark.*' --type go
# Search for any performance-related comments or documentation
rg -i '(performance|benchmark|throughput)' --type go
Length of output: 274
Script:
#!/bin/bash
# Let's try a different approach to find performance-related code and tests
# Look for benchmark tests
fd -e go -x grep -l "func.*Benchmark" {}
# Look for any test files that might contain performance tests
fd "_test.go$"
# Look for any performance-related comments
rg -i "performance|throughput" --type go
# Check if there are any existing JsonItem usages or similar patterns
rg "type.*JsonItem" --type go
rg "json.Unmarshal.*aux" --type go -A 2
Length of output: 419
Script:
#!/bin/bash
# Let's examine the test files and JsonItem implementation for better context
# Check the JsonItem struct definition
cat natsjobs/item.go
# Look at the test files to understand usage patterns and any performance considerations
cat tests/jobs_nats_test.go
cat tests/jobs_nats_durability_test.go
# Check for any other similar JSON unmarshaling patterns in the codebase
ast-grep --pattern 'json.Unmarshal($_, $_)'
Length of output: 33813
natsjobs/item.go (1)
31-34
: Verify alignment with PR objectives
The introduction of JsonItem
appears to be supporting infrastructure, but the connection to the PR's main objectives (message retention and heartbeat mechanism) isn't clear. Could you clarify:
- How does this change support message retention after pipeline stop?
- Is this serialization change related to the heartbeat implementation?
Let's check for related changes in other files:
🧰 Tools
🪛 golangci-lint
[warning] 31-31: var-naming: type JsonItem should be JSONItem
(revive)
natsjobs/driver.go (1)
254-257
: Verify consumer-side compatibility with the new message format
The message format has been changed to include both item and headers in a single JSON object. While this is a good practice for data cohesion, we need to ensure that all consumers are updated to handle this new format.
Let's verify the consumer-side implementation:
✅ Verification successful
Message format change is properly handled
The verification shows that the message format change is safe and properly implemented:
JsonItem
is well-defined innatsjobs/item.go
with proper JSON tags- The unmarshalling is handled correctly in
unpack()
method which properly deserializes both the item and headers - The same
JsonItem
structure is used consistently in both publishing (Push
) and requeuing operations - The change maintains backward compatibility as the unmarshalling gracefully handles the payload even when it doesn't match the expected format (falls back to creating a new Item with raw payload)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for JsonItem struct definition and its usage in consumer code
# Test: Look for JsonItem struct definition
ast-grep --pattern 'type JsonItem struct {
$$$
}'
# Test: Look for JsonItem unmarshalling in consumer code
rg -A 5 'json\.Unmarshal.*JsonItem'
Length of output: 249
Script:
#!/bin/bash
# Let's check how the JsonItem is used in consumer/subscriber code
rg -A 10 "Subscribe|QueueSubscribe|Pull" natsjobs/
# Check for any unmarshalling of messages in the codebase
rg -A 5 "json\.Unmarshal.*\(" natsjobs/
# Look for any message handling or processing code
ast-grep --pattern 'func ($_) Handle($$$) {
$$$
}'
Length of output: 438
Script:
#!/bin/bash
# Let's check the implementation of message processing in the driver
rg -A 15 "func.*Process" natsjobs/
# Check how messages are consumed/pulled
rg -A 15 "Pull.*Message|Fetch.*Message" natsjobs/
# Look at the implementation of JsonItem usage
rg -C 10 "JsonItem" natsjobs/
Length of output: 3993
@@ -28,6 +28,11 @@ type Item struct { | |||
Options *Options `json:"options,omitempty"` | |||
} | |||
|
|||
type JsonItem struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found an another bug where header fields are lost during NATS message passing. The issue occurs because the Item struct has an unexported headers field that won't be included in JSON marshaling:
type Item struct {
Job string `json:"job"`
Ident string `json:"id"`
Payload []byte `json:"payload"`
headers map[string][]string // unexported, gets dropped during marshaling
Options *Options `json:"options,omitempty"`
}
Interestingly, the Context() method already handles headers correctly through a custom JSON struct, but this wasn't implemented for the main message passing to nats. To fix this, we can add:
type JsonItem struct {
*Item
Headers map[string][]string `json:"headers"`
}
This simple solution ensures headers are properly preserved when passing messages through NATS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a simple solution, not a solution at all. Headers should be passed via NATS message headers and then populated via unpack
method. I agree, that atm, headers are not properly passed, but they should not be passed like this.
The simple solution is the following: pass headers to the NATS message in the Push
method. Get them in the listener m.Headers()
and pass into the Item
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like this: #182
This is not a good practice to hide some behavior. If you wanted to expose this option - it should be exposed via configuration, but not hidden like this. |
I understand your perspective, but implementing this option would introduce unnecessary complexity without significant practical benefits. While adding this limitation would align with other job drivers' behavior, it goes against Roadrunner's core strength - providing a clean abstraction layer that elegantly encapsulates implementation details. The power of Roadrunner lies precisely in its ability to simplify complex workflows through unified abstractions. |
Yes, simplify complex workflows, you're right. But this is not the same - introduce some hidden behavior. For you, for example, max_deliver: 1 is sufficient. But imagine cases, when the user wants to re-deliver the message until it'd be successfully processed (Acked). In this case, you introduced a breaking change. This is a good option, but should not be hardcoded. It should be moved to the configuration + added docs for it + added to the schema.json. |
Reason for This PR
purge
will cause Message deleted, so removepurge
, if you do not want to persist message, use onlydelete_stream_on_stop
The default ack timeout for nats is 30 seconds. If there is no ack after 30 seconds, the message will be delivered again, this is a bug, resulting in a duplicate task, so
MaxDeliver = 1
option shoud be add. refer to this document.Description of Changes
License Acceptance
By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.
PR Checklist
[Author TODO: Meet these criteria.]
[Reviewer TODO: Verify that these criteria are met. Request changes if not]
git commit -s
).CHANGELOG.md
.Summary by CodeRabbit
New Features
JsonItem
type for improved serialization of job items with headers.Bug Fixes
Documentation