Skip to content

Conversation

@julienrbrt
Copy link
Member

Closes: #2890

Add submission strategies.

@github-actions
Copy link
Contributor

github-actions bot commented Jan 13, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedJan 13, 2026, 4:22 PM

@claude
Copy link
Contributor

claude bot commented Jan 13, 2026

Claude finished @julienrbrt's task in 2m 16s —— View job


Code Review Complete

I've completed a comprehensive code review of PR #2973. Overall, this is a well-designed and thoroughly tested implementation of batching strategies for DA submissions. The code demonstrates strong Go idioms, good test coverage, and thoughtful architecture. Below are my findings organized by focus area.


1. Code Quality & Architecture ⭐

Strengths:

  • Clean abstraction: The BatchingStrategy interface is well-designed with a clear single responsibility
  • Factory pattern: BatchingStrategyFactory provides clean instantiation logic
  • Good separation of concerns: Strategy logic is isolated from submission logic
  • Helper functions: estimateBatchSize, optimizeBatchSize, and calculateBatchMetrics are well-factored utilities

Issues:

Critical: Concurrency Safety Issue in daSubmissionLoop

Location: submitter.go:184-233 and submitter.go:242-292

The batching decision logic fetches headers/data inside goroutines spawned after TryLock(). This creates a race condition:

if s.headerSubmissionMtx.TryLock() {
    go func() {
        defer s.headerSubmissionMtx.Unlock()
        
        // Get pending headers to estimate size
        headers, err := s.cache.GetPendingHeaders(s.ctx)  // ⚠️ Race: cache may change between check and fetch

Problem: Between checking headersNb > 0 (line 178) and fetching headers inside the goroutine (line 189), the cache state can change. The count may be stale, leading to unnecessary goroutine spawns or missed submissions.

Recommendation: Move the batching decision logic before spawning the goroutine, or refactor to avoid the race:

if headersNb > 0 && s.headerSubmissionMtx.TryLock() {
    // Fetch headers before spawning goroutine to avoid race
    headers, err := s.cache.GetPendingHeaders(s.ctx)
    if err != nil {
        s.headerSubmissionMtx.Unlock()
        s.logger.Error().Err(err).Msg("failed to get pending headers")
        continue
    }
    
    timeSinceLastSubmit := time.Since(s.lastHeaderSubmit)
    totalSize := estimateBatchSize(marshaled)
    shouldSubmit := s.batchingStrategy.ShouldSubmit(...)
    
    if shouldSubmit {
        go func() {
            defer s.headerSubmissionMtx.Unlock()
            // Perform submission with already-fetched headers
        }()
    } else {
        s.headerSubmissionMtx.Unlock()
    }
}

Fix this →


⚠️ Medium: Duplicate Size Estimation

Location: submitter.go:196-202 and submitter.go:253-260

Headers and data are marshaled twice - once for size estimation and again inside SubmitHeaders/SubmitData. This is inefficient.

Recommendation: Consider refactoring to pass pre-marshaled data to submission methods or cache marshaled results.


⚠️ Medium: Missing Error Handling in Goroutines

Location: submitter.go:198-201

Marshaling errors are silently ignored when estimating batch size:

data, err := h.MarshalBinary()
if err == nil {  // ⚠️ Silent failure if marshaling fails
    totalSize += len(data)
}

Impact: Incorrect size estimates could lead to oversized submissions or poor batching decisions.

Recommendation: Log marshaling errors and consider failing the batching decision if critical items can't be marshaled.

Fix this →


ℹ️ Minor: Magic Number for Check Interval

Location: submitter.go:165-168

The check interval calculation uses hardcoded values:

checkInterval := s.config.DA.BlockTime.Duration / 4
if checkInterval < 100*time.Millisecond {
    checkInterval = 100 * time.Millisecond
}

Recommendation: Extract these to named constants with comments explaining the rationale:

const (
    // minCheckInterval prevents excessive polling when DA block time is very short
    minCheckInterval = 100 * time.Millisecond
    // checkIntervalDivisor: check 4x per DA block to balance responsiveness vs overhead
    checkIntervalDivisor = 4
)

2. Error Handling & Resilience ✅

Strengths:

  • Excellent retry logic in setFinalWithRetry with exponential backoff
  • Proper critical error handling for ErrOversizedItem preventing livelock
  • Defensive programming with zero-value checks in strategy constructors

Issues:

⚠️ Medium: Silent Fallback in Strategy Initialization

Location: submitter.go:92-96

When strategy creation fails, the code falls back to a default without propagating the error:

strategy, err := BatchingStrategyFactory(config.DA)
if err != nil {
    submitterLogger.Warn().Err(err).Msg("failed to create batching strategy, using time-based default")
    strategy = NewTimeBasedStrategy(config.DA.BlockTime.Duration, 1)
}

Issue: Invalid configuration goes unnoticed. Users might think they're using "adaptive" but actually get "time-based".

Recommendation: Make this a hard error during initialization to fail fast on misconfiguration.

Fix this →


3. Security 🔒

Strengths:

  • No hardcoded secrets or credentials
  • Proper context propagation for cancellation
  • Safe concurrent access via mutexes

No critical security issues found.

Minor observations:

  • Consider adding rate limiting to prevent DA layer abuse if batching strategies are misconfigured
  • The sendCriticalError channel is non-blocking (line 413), which is good defensive design

4. Performance & Resource Efficiency ⚡

Strengths:

  • Adaptive batching can significantly reduce DA submission costs
  • Good use of buffered channels and non-blocking operations
  • Lock-free atomic operations for daIncludedHeight

Issues:

⚠️ Medium: Potential Goroutine Leak

Location: submitter.go:185-233

If context is cancelled while a submission goroutine is marshaling data, the goroutine may not exit cleanly.

Recommendation: Add context checks inside long-running goroutines:

go func() {
    defer s.headerSubmissionMtx.Unlock()
    
    select {
    case <-s.ctx.Done():
        return
    default:
    }
    
    // ... continue with submission logic
}()

ℹ️ Minor: Repeated Time.Since Calls

Location: submitter.go:180 and submitter.go:240

time.Since() is called for every ticker iteration even when no items are pending.

Recommendation: Move inside the if headersNb > 0 block to avoid unnecessary work.


5. Testing & Reliability 🧪

Strengths:

  • Excellent test coverage: Comprehensive table-driven tests for all strategies
  • Edge case testing: Tests cover zero values, boundary conditions, and concurrent scenarios
  • Integration tests: TestSubmitter_daSubmissionLoop validates end-to-end flow
  • Clear test naming: Tests follow Go conventions with descriptive names

Issues:

ℹ️ Minor: Test Flakiness Potential

Location: batching_strategy_test.go:563-593

TestBatchingStrategiesComparison doesn't test the actual submission behavior, only the decision logic. Consider adding an integration test that verifies actual submission timing.


⚠️ Medium: Missing Test for Concurrent Submissions

Observation: No test validates behavior when both header and data submissions occur simultaneously.

Recommendation: Add a test verifying that concurrent header/data submissions don't interfere:

func TestSubmitter_ConcurrentHeaderAndDataSubmission(t *testing.T) {
    // Verify headerSubmissionMtx and dataSubmissionMtx work independently
}

6. Documentation & Developer Experience 📚

Strengths:

  • Clear inline comments explaining strategy behavior
  • Good function/parameter naming (self-documenting code)
  • Configuration flags have detailed descriptions in config.go

Issues:

⚠️ Medium: Missing Package-Level Documentation

Location: batching_strategy.go:1

The file lacks a package comment explaining the overall batching strategy system.

Recommendation: Add package doc:

// Package submitting implements DA (Data Availability) submission logic for the ev-node.
//
// Batching Strategies:
// - immediate: Submit as soon as items are available (lowest latency, highest cost)
// - size: Wait until batch reaches a size threshold (optimizes blob utilization)
// - time: Submit after a time interval (predictable latency)
// - adaptive: Balance between size and time constraints (recommended for production)
//
// See BatchingStrategy interface for implementation details.
package submitting

ℹ️ Minor: Configuration Documentation

Location: config.go:196-199

The batching strategy comments are good, but could benefit from examples:

BatchingStrategy string `comment:"...Options: 'immediate', 'size', 'time', 'adaptive'. Example: For cost optimization with Celestia, use 'adaptive' with BatchSizeThreshold=0.8"`

7. Additional Observations

Good: Max Blob Size Update

Location: common/consts.go:3

The update from 2MB to 8MB correctly reflects Celestia's current limits. Good alignment with upstream changes.


Good: Metric Logging

Location: submitter.go:212-218

Debug logging includes all relevant context (timestamp, count, size, duration, strategy). This will be invaluable for debugging batching behavior in production.


⚠️ Medium: Default Strategy Selection

Location: defaults.go:81

The default strategy is "time" rather than "adaptive". Given that adaptive is described as balancing both concerns, consider if this should be the default:

BatchingStrategy:   "adaptive",  // Better default for most use cases?

Rationale: Adaptive provides the best of both worlds. Users seeking immediate submission can explicitly opt-in.


Summary

This PR introduces a valuable feature with solid design and implementation. The batching strategies will significantly improve DA submission efficiency. However, there are a few critical issues that should be addressed:

Must Fix:

  1. 🔴 Concurrency race in daSubmissionLoop - Critical for correctness
  2. 🟡 Silent strategy fallback - Should fail fast on misconfiguration
  3. 🟡 Missing error logging in marshaling - Could hide critical failures

Nice to Have:

  • Extract magic numbers to constants
  • Add package-level documentation
  • Add concurrent submission test
  • Consider adaptive as default strategy

Test Coverage: ⭐⭐⭐⭐⭐ (Excellent)
Code Quality: ⭐⭐⭐⭐☆ (Very Good, with noted concurrency issue)
Documentation: ⭐⭐⭐⭐☆ (Good, could use package docs)


@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @julienrbrt, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly upgrades the Data Availability (DA) layer submission process by introducing a robust and configurable batching system. Instead of immediate submission, the system can now intelligently decide when to send data based on various strategies, such as accumulating a certain amount of data, waiting for a specific time interval, or an adaptive combination of both. This enhancement aims to improve the efficiency and cost-effectiveness of data submissions, particularly for DA layers like Celestia, by better utilizing blob capacity and reducing transaction overhead. The maximum blob size has also been updated to reflect current DA layer capabilities.

Highlights

  • Flexible DA Submission Strategies: Introduced an interface and four concrete implementations for batching strategies: Immediate, Size-Based, Time-Based, and Adaptive. This allows for configurable logic to determine when to submit data to the Data Availability layer, optimizing for latency, throughput, or cost.
  • Increased Default Max Blob Size: The DefaultMaxBlobSize has been increased from 2MB to 8MB across the codebase, aligning with Celestia's current blob size limits. This change allows for larger batches of data to be submitted in a single transaction.
  • Configurable Batching Parameters: New configuration flags and fields have been added to DAConfig to control the chosen batching strategy, size thresholds, maximum delays, and minimum item counts, providing fine-grained control over DA submission behavior.
  • Integrated into Submitter Loop: The DA submission loop in submitter.go has been refactored to utilize the new batching strategies. It now periodically checks the strategy to decide whether to submit pending headers and data, improving efficiency by avoiding premature or undersized submissions.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces configurable batching strategies for DA submissions, a valuable feature for optimizing costs and latency. The implementation is well-structured, adding immediate, size, time, and adaptive strategies, along with comprehensive tests. My review focuses on improving the efficiency and robustness of the new logic in the submitter, particularly around size estimation and data fetching.

// Wait if current utilization is below minimum threshold
// Use epsilon for floating point comparison
const epsilon = 0.001
currentUtilization := float64(currentSize) / float64(maxBlobSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's a potential for division by zero if maxBlobSize is 0. In Go, floating-point division by zero results in +Inf or NaN rather than a panic, but this can lead to unexpected behavior in the comparison that follows. It would be safer to add a guard against this, similar to the pattern used in calculateBatchMetrics.

Comment on lines 184 to 185
if s.headerSubmissionMtx.TryLock() {
s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission in progress")
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation fetches pending headers from the cache (s.cache.GetPendingHeaders) inside this goroutine to estimate the batch size. Then, if the strategy decides to submit, s.daSubmitter.SubmitHeaders fetches the same headers again. This double-fetching can be inefficient, especially if there are many pending headers.

Consider refactoring daSubmitter.SubmitHeaders to accept the list of headers directly, avoiding the second fetch. This same feedback applies to the data submission logic below.

Comment on lines +198 to +201
data, err := h.MarshalBinary()
if err == nil {
totalSize += len(data)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Errors from h.MarshalBinary() are silently ignored. This could lead to an inaccurate totalSize, causing the batching strategy to make a suboptimal decision (e.g., delaying a submission). It's better to log these errors for visibility.

Additionally, this size estimation logic is duplicated for data submission. Consider extracting it into a shared helper function to improve maintainability.

              data, err := h.MarshalBinary()
              if err != nil {
                s.logger.Warn().Err(err).Msg("failed to marshal header for size estimation")
                continue
              }
              totalSize += len(data)

Comment on lines +256 to +259
data, err := d.MarshalBinary()
if err == nil {
totalSize += len(data)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the header submission logic, errors from d.MarshalBinary() are silently ignored here. This can lead to inaccurate size estimations and suboptimal batching. It's better to log these errors for improved diagnostics and robustness.

              data, err := d.MarshalBinary()
              if err != nil {
                s.logger.Warn().Err(err).Msg("failed to marshal data for size estimation")
                continue
              }
              totalSize += len(data)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] bump blob size to 8mb

2 participants