diff --git a/.claude/skills/ev-node-explainer/SKILL.md b/.claude/skills/ev-node-explainer/SKILL.md new file mode 100644 index 000000000..3fea68503 --- /dev/null +++ b/.claude/skills/ev-node-explainer/SKILL.md @@ -0,0 +1,240 @@ +--- +name: ev-node-explainer +description: Explains ev-node architecture, components, and internal workings. Use when the user asks how ev-node works, wants to understand the block package, DA layer, sequencing, namespaces, or needs architecture explanations. Covers block production, syncing, DA submission, forced inclusion, single vs based sequencer, and censorship resistance. +--- + +# ev-node Architecture Explainer + +ev-node is a sovereign rollup framework that allows building rollups on any Data Availability (DA) layer. It follows a modular architecture where components can be swapped. + +**Reference files:** +- [block-architecture.md](block-architecture.md) - Block package deep dive +- [da-sequencing.md](da-sequencing.md) - DA and sequencing deep dive + +## Core Principles + +1. **Zero-dependency core** - `core/` contains only interfaces, no external deps +2. **Modular components** - Executor, Sequencer, DA are pluggable +3. **Two operating modes** - Aggregator (produces blocks) and Sync-only (follows chain) +4. **Separation of concerns** - Block production, syncing, and DA submission are independent + +## Package Overview + +| Package | Responsibility | +|---------|---------------| +| `core/` | Interfaces only (Executor, Sequencer) | +| `types/` | Data structures (Header, Data, State, SignedHeader) | +| `block/` | Block lifecycle management | +| `execution/` | Execution layer implementations (EVM, ABCI) | +| `node/` | Node initialization and orchestration | +| `pkg/p2p/` | libp2p-based networking | +| `pkg/store/` | Persistent storage | +| `pkg/da/` | DA layer abstraction | + +## Block Package Deep Dive + +The block package is the most complex part of ev-node. See [block-architecture.md](block-architecture.md) for the complete breakdown. + +### Component Summary + +``` +Components struct: +├── Executor - Block production (Aggregator only) +├── Reaper - Transaction scraping (Aggregator only) +├── Syncer - Block synchronization +├── Submitter - DA submission and inclusion +└── Cache - Unified state caching +``` + +### Entry Points + +- `NewAggregatorComponents()` - Full node that produces and syncs blocks +- `NewSyncComponents()` - Non-aggregator that only syncs + +### Key Data Types + +**Header** - Block metadata (height, time, hashes, proposer) +**Data** - Transaction list with metadata +**SignedHeader** - Header with proposer signature +**State** - Chain state (last block, app hash, DA height) + +## Block Production Flow (Aggregator) + +``` +Sequencer.GetNextBatch() + │ + ▼ +Executor.ExecuteTxs() + │ + ├──► SignedHeader + Data + │ + ├──► P2P Broadcast + │ + └──► Submitter Queue + │ + ▼ + DA Layer +``` + +## Block Sync Flow (Non-Aggregator) + +``` +┌─────────────────────────────────────┐ +│ Syncer │ +├─────────────┬─────────────┬─────────┤ +│ DA Worker │ P2P Worker │ Forced │ +│ │ │ Incl. │ +└──────┬──────┴──────┬──────┴────┬────┘ + │ │ │ + └─────────────┴───────────┘ + │ + ▼ + processHeightEvent() + │ + ▼ + ExecuteTxs → Update State +``` + +## Data Availability Layer + +The DA layer abstracts blob storage. ev-node uses Celestia but the interface is pluggable. See [da-sequencing.md](da-sequencing.md) for full details. + +### Namespaces + +DA uses 29-byte namespaces (1 byte version + 28 byte ID). Three namespaces are used: + +| Namespace | Purpose | +|-----------|---------| +| Header | Block headers | +| Data | Transaction data (optional, can share with header) | +| Forced Inclusion | User-submitted txs for censorship resistance | + +### DA Client Interface + +```go +type Client interface { + Submit(ctx, data [][]byte, gasPrice, namespace, options) ResultSubmit + Retrieve(ctx, height uint64, namespace) ResultRetrieve + Get(ctx, ids []ID, namespace) ([]Blob, error) +} +``` + +### Key Files + +| File | Purpose | +|------|---------| +| `pkg/da/types/types.go` | Core types (Blob, ID, Commitment) | +| `pkg/da/types/namespace.go` | Namespace handling | +| `block/internal/da/client.go` | DA client wrapper | +| `block/internal/da/forced_inclusion_retriever.go` | Forced tx retrieval | + +--- + +## Sequencing + +Sequencers order transactions for block production. See [da-sequencing.md](da-sequencing.md) for full details. + +### Two Modes + +| Mode | Mempool | Forced Inclusion | Use Case | +|------|---------|------------------|----------| +| **Single** | Yes | Yes | Traditional rollup | +| **Based** | No | Only source | High liveness guarantee | + +### Sequencer Interface + +```go +type Sequencer interface { + SubmitBatchTxs(ctx, req) (*SubmitBatchTxsResponse, error) + GetNextBatch(ctx, req) (*GetNextBatchResponse, error) + VerifyBatch(ctx, req) (*VerifyBatchResponse, error) + SetDAHeight(height uint64) + GetDAHeight() uint64 +} +``` + +### ForceIncludedMask + +Batches include a mask distinguishing tx sources: + +```go +type Batch struct { + Transactions [][]byte + ForceIncludedMask []bool // true = from DA (must validate) +} +``` + +This allows the execution layer to skip validation for already-validated mempool txs. + +### Key Files + +| File | Purpose | +|------|---------| +| `core/sequencer/sequencing.go` | Core interface | +| `pkg/sequencers/single/sequencer.go` | Hybrid sequencer | +| `pkg/sequencers/based/sequencer.go` | Pure DA sequencer | +| `pkg/sequencers/common/checkpoint.go` | Shared checkpoint logic | + +--- + +## Forced Inclusion + +Forced inclusion prevents sequencer censorship: + +1. User submits tx directly to DA layer +2. Syncer detects tx in forced-inclusion namespace +3. Grace period starts (adjusts based on block fullness) +4. If not included by sequencer within grace period → sequencer marked malicious +5. Tx gets included regardless + +## Key Files + +| File | Purpose | +|------|---------| +| `block/public.go` | Exported types and factories | +| `block/components.go` | Component creation | +| `block/internal/executing/executor.go` | Block production | +| `block/internal/syncing/syncer.go` | Sync orchestration | +| `block/internal/submitting/submitter.go` | DA submission | +| `block/internal/cache/manager.go` | Unified cache | + +## Common Questions + +### How does block production work? + +The Executor runs `executionLoop()`: +1. Wait for block time or new transactions +2. Get batch from sequencer +3. Execute via execution layer +4. Create SignedHeader + Data +5. Broadcast to P2P +6. Queue for DA submission + +### How does syncing work? + +The Syncer coordinates three workers: +- **DA Worker** - Fetches confirmed blocks from DA +- **P2P Worker** - Receives gossiped blocks +- **Forced Inclusion** - Monitors for censored txs + +All feed into `processHeightEvent()` which validates and executes. + +### What happens if DA submission fails? + +Submitter has retry logic with exponential backoff. Status codes: +- `TooBig` - Splits blob into chunks +- `AlreadyInMempool` - Skips (duplicate) +- `NotIncludedInBlock` - Retries with backoff +- `ContextCanceled` - Request canceled + +### How is state recovered after crash? + +The Replayer syncs execution layer from disk: +1. Load last committed height from store +2. Check execution layer height +3. Replay any missing blocks +4. Ensure consistency before starting + +## Architecture Diagrams + +For detailed component diagrams and state machines, see [block-architecture.md](block-architecture.md). diff --git a/.claude/skills/ev-node-explainer/block-architecture.md b/.claude/skills/ev-node-explainer/block-architecture.md new file mode 100644 index 000000000..8d77e8711 --- /dev/null +++ b/.claude/skills/ev-node-explainer/block-architecture.md @@ -0,0 +1,639 @@ +# Block Package Architecture + +Complete technical reference for the ev-node block package. + +## Directory Structure + +``` +block/ +├── public.go # Exported types, DA client factory +├── components.go # Component creation and lifecycle +└── internal/ + ├── common/ + │ ├── errors.go # Error definitions + │ ├── event.go # DAHeightEvent, event types + │ ├── metrics.go # Prometheus metrics + │ ├── options.go # BlockOptions configuration + │ ├── expected_interfaces.go + │ └── replay.go # Replayer for crash recovery + ├── executing/ + │ └── executor.go # Block production loop + ├── syncing/ + │ ├── syncer.go # Main sync orchestration + │ ├── da_retriever.go # DA block fetching + │ └── p2p_handler.go # P2P block coordination + ├── submitting/ + │ ├── submitter.go # Main submission loop + │ └── da_submitter.go # DA submission with retries + ├── reaping/ + │ └── reaper.go # Transaction scraping + ├── cache/ + │ ├── manager.go # Unified cache interface + │ ├── generic_cache.go # Generic cache impl + │ ├── pending_headers.go # Header tracking + │ └── pending_data.go # Data tracking + └── da/ + ├── client.go # DA client wrapper + ├── interface.go # DA interfaces + ├── async_block_retriever.go + └── forced_inclusion_retriever.go +``` + +## Component Lifecycle + +All components implement: + +```go +type Component interface { + Start(ctx context.Context) error + Stop() error +} +``` + +Startup order: +1. Cache Manager (loads persisted state) +2. Syncer (begins sync workers) +3. Executor (begins production loop) - Aggregator only +4. Reaper (begins tx scraping) - Aggregator only +5. Submitter (begins DA submission) + +## Executor (`internal/executing/executor.go`) + +Block production for aggregator nodes. + +### State + +```go +type Executor struct { + lastState *atomic.Pointer[types.State] + sequencer Sequencer + exec Executor + broadcaster Broadcaster + submitter Submitter + cache Cache + + blockTime time.Duration + lazyMode bool + maxPending uint64 +} +``` + +### Main Loop + +```go +func (e *Executor) executionLoop(ctx context.Context) { + timer := time.NewTimer(e.blockTime) + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + e.produceBlock(ctx) + timer.Reset(e.blockTime) + case <-e.txNotifyCh: + // New txs arrived, produce immediately if not lazy + if !e.lazyMode { + e.produceBlock(ctx) + timer.Reset(e.blockTime) + } + } + } +} +``` + +### Block Production + +```go +func (e *Executor) produceBlock(ctx context.Context) error { + // 1. Check backpressure + if e.cache.PendingCount() >= e.maxPending { + return ErrTooManyPending + } + + // 2. Get batch from sequencer + batch, err := e.sequencer.GetNextBatch(ctx) + + // 3. Execute transactions + stateRoot, gasUsed, err := e.exec.ExecuteTxs(ctx, batch.Txs, ...) + + // 4. Create header + header := &types.Header{ + Height: lastState.LastBlockHeight + 1, + Time: time.Now().UnixNano(), + LastHeaderHash: lastState.LastHeaderHash, + DataHash: batch.Txs.Hash(), + AppHash: stateRoot, + ProposerAddress: e.proposer, + } + + // 5. Sign header + signedHeader, err := e.signer.SignHeader(header) + + // 6. Create data + data := &types.Data{Txs: batch.Txs} + + // 7. Update state + newState := lastState.NextState(header, stateRoot) + e.lastState.Store(newState) + + // 8. Broadcast to P2P + e.broadcaster.BroadcastHeader(ctx, signedHeader) + e.broadcaster.BroadcastData(ctx, data) + + // 9. Queue for DA submission + e.submitter.AddPending(signedHeader, data) + + return nil +} +``` + +## Syncer (`internal/syncing/syncer.go`) + +Coordinates block synchronization from multiple sources. + +### Workers + +```go +func (s *Syncer) startSyncWorkers(ctx context.Context) { + go s.daWorkerLoop(ctx) // DA retrieval + go s.pendingWorkerLoop(ctx) // Pending events + go s.p2pWorkerLoop(ctx) // P2P blocks +} +``` + +### DA Worker + +```go +func (s *Syncer) daWorkerLoop(ctx context.Context) { + for { + // Get next DA height to retrieve + height := s.daRetrieverHeight.Load() + + // Retrieve blocks at this DA height + events, err := s.daRetriever.Retrieve(ctx, height) + + // Send to processing channel + for _, event := range events { + s.heightInCh <- event + } + + // Advance DA height + s.daRetrieverHeight.Add(1) + } +} +``` + +### P2P Worker + +```go +func (s *Syncer) p2pWorkerLoop(ctx context.Context) { + for { + select { + case header := <-s.p2pHandler.HeaderCh(): + s.p2pHandler.HandleHeader(header) + case data := <-s.p2pHandler.DataCh(): + s.p2pHandler.HandleData(data) + case event := <-s.p2pHandler.EventCh(): + // Complete header+data pair received + s.heightInCh <- event + } + } +} +``` + +### Process Loop + +```go +func (s *Syncer) processLoop(ctx context.Context) { + for { + select { + case event := <-s.heightInCh: + if err := s.processHeightEvent(ctx, event); err != nil { + // Log error, continue + } + case <-ctx.Done(): + return + } + } +} + +func (s *Syncer) processHeightEvent(ctx context.Context, event DAHeightEvent) error { + // 1. Validate header signature + if err := s.verifyHeader(event.SignedHeader); err != nil { + return err + } + + // 2. Validate data hash matches header + if event.SignedHeader.DataHash != event.Data.Hash() { + return ErrDataHashMismatch + } + + // 3. Execute transactions + stateRoot, _, err := s.exec.ExecuteTxs(ctx, event.Data.Txs, ...) + + // 4. Verify state root + if stateRoot != event.SignedHeader.AppHash { + return ErrStateRootMismatch + } + + // 5. Update state + newState := s.lastState.NextState(event.SignedHeader.Header, stateRoot) + s.lastState.Store(newState) + + // 6. Persist to store + s.store.SaveBlock(event.SignedHeader, event.Data, newState) + + return nil +} +``` + +## Submitter (`internal/submitting/submitter.go`) + +Manages DA submission with retries and inclusion tracking. + +### Two Loops + +```go +func (s *Submitter) Start(ctx context.Context) error { + go s.daSubmissionLoop(ctx) // Submit to DA + go s.inclusionProcessingLoop(ctx) // Track inclusion + return nil +} +``` + +### DA Submission Loop + +```go +func (s *Submitter) daSubmissionLoop(ctx context.Context) { + for { + // Get pending headers + headers := s.cache.GetPendingHeaders() + if len(headers) > 0 { + if err := s.submitHeaders(ctx, headers); err != nil { + s.handleSubmitError(err) + continue + } + } + + // Get pending data + data := s.cache.GetPendingData() + if len(data) > 0 { + if err := s.submitData(ctx, data); err != nil { + s.handleSubmitError(err) + continue + } + } + + time.Sleep(s.submitInterval) + } +} +``` + +### Retry Policy + +```go +type DASubmitter struct { + maxRetries int + initialBackoff time.Duration + maxBackoff time.Duration +} + +func (d *DASubmitter) Submit(ctx context.Context, blob []byte) error { + backoff := d.initialBackoff + + for attempt := 0; attempt < d.maxRetries; attempt++ { + status, err := d.client.Submit(ctx, blob) + + switch status { + case StatusSuccess: + return nil + case StatusTooBig: + return d.splitAndSubmit(ctx, blob) + case StatusAlreadyInMempool: + return nil // Already submitted + case StatusNotIncludedInBlock: + time.Sleep(backoff) + backoff = min(backoff*2, d.maxBackoff) + continue + default: + return err + } + } + + return ErrMaxRetriesExceeded +} +``` + +## Forced Inclusion (`internal/da/forced_inclusion_retriever.go`) + +Prevents sequencer censorship. + +### Grace Period Calculation + +```go +func (r *ForcedInclusionRetriever) calculateGracePeriod() uint64 { + // Base period: 1 epoch + basePeriod := r.epochLength + + // Adjust based on block fullness + // Higher fullness = longer grace period (congestion tolerance) + ema := r.blockFullnessEMA.Load() + + if ema > 0.8 { + // High congestion, extend grace period + return basePeriod * 2 + } + + return basePeriod +} +``` + +### Pending TX Tracking + +```go +type PendingForcedTx struct { + Tx types.Tx + DAHeight uint64 // When tx appeared in DA + GraceDeadline uint64 // DA height deadline for inclusion +} + +func (r *ForcedInclusionRetriever) checkPending(currentDAHeight uint64) { + for _, pending := range r.pendingTxs { + if currentDAHeight > pending.GraceDeadline { + // Sequencer failed to include tx + r.markSequencerMalicious(pending) + // Force include the tx + r.forceInclude(pending.Tx) + } + } +} +``` + +## Cache Manager (`internal/cache/manager.go`) + +Unified cache for headers, data, and transactions. + +### Structure + +```go +type Manager struct { + headerCache *GenericCache[types.Hash, HeaderEntry] + dataCache *GenericCache[types.Hash, DataEntry] + txCache *GenericCache[types.Hash, TxEntry] + pendingEvents map[uint64]*DAHeightEvent + + cleanupTicker *time.Ticker + retentionTime time.Duration +} +``` + +### Key Operations + +```go +// Header tracking +func (m *Manager) IsHeaderSeen(hash types.Hash) bool +func (m *Manager) SetHeaderSeen(hash types.Hash, height uint64) +func (m *Manager) GetHeaderDAIncluded(hash types.Hash) (uint64, bool) +func (m *Manager) SetHeaderDAIncluded(hash types.Hash, daHeight uint64) + +// Transaction deduplication +func (m *Manager) IsTxSeen(hash types.Hash) bool +func (m *Manager) SetTxSeen(hash types.Hash) + +// Pending management +func (m *Manager) GetPendingHeaders() []*types.SignedHeader +func (m *Manager) GetPendingData() []*types.Data +func (m *Manager) PendingCount() uint64 +``` + +### Disk Persistence + +```go +func (m *Manager) SaveToDisk(path string) error { + state := &CacheState{ + Headers: m.headerCache.Entries(), + Data: m.dataCache.Entries(), + Pending: m.pendingEvents, + } + return json.WriteFile(path, state) +} + +func (m *Manager) LoadFromDisk(path string) error { + state, err := json.ReadFile(path) + // Restore caches from state +} +``` + +## Replayer (`internal/common/replay.go`) + +Syncs execution layer after crash. + +```go +func (r *Replayer) Replay(ctx context.Context) error { + // Get heights + storeHeight := r.store.GetLastHeight() + execHeight := r.exec.GetHeight() + + if execHeight >= storeHeight { + return nil // Already synced + } + + // Replay missing blocks + for height := execHeight + 1; height <= storeHeight; height++ { + header, data, err := r.store.GetBlock(height) + if err != nil { + return err + } + + _, _, err = r.exec.ExecuteTxs(ctx, data.Txs, ...) + if err != nil { + return err + } + } + + return nil +} +``` + +## Metrics (`internal/common/metrics.go`) + +```go +var ( + Height = prometheus.NewGauge(...) + NumTxs = prometheus.NewGauge(...) + BlockSizeBytes = prometheus.NewHistogram(...) + CommittedHeight = prometheus.NewGauge(...) + TxsPerBlock = prometheus.NewHistogram(...) + OperationDuration = prometheus.NewHistogramVec(...) + + // DA metrics + DASubmitterFailures = prometheus.NewCounterVec(...) + DASubmitterLastFailure = prometheus.NewGauge(...) + DASubmitterPendingBlobs = prometheus.NewGauge(...) + DARetrievalAttempts = prometheus.NewCounter(...) + DARetrievalSuccesses = prometheus.NewCounter(...) + DARetrievalFailures = prometheus.NewCounter(...) + DAInclusionHeight = prometheus.NewGauge(...) + + // Cache metrics + PendingHeadersCount = prometheus.NewGauge(...) + PendingDataCount = prometheus.NewGauge(...) + + // Forced inclusion + ForcedInclusionTxsInGracePeriod = prometheus.NewGauge(...) + ForcedInclusionTxsMalicious = prometheus.NewCounter(...) +) +``` + +## Configuration + +Key options in `BlockOptions`: + +```go +type BlockOptions struct { + BlockTime time.Duration // Block interval + LazyBlockInterval time.Duration // Lazy mode timeout + MaxPendingHeadersAndData uint64 // Backpressure limit + BasedSequencer bool // No DA submissions + DABlockTime time.Duration // DA block interval + ScrapeInterval time.Duration // Tx reaping frequency + + // Namespaces + HeaderNamespace []byte + DataNamespace []byte + ForcedInclusionNamespace []byte +} +``` + +## Error Types + +```go +var ( + ErrNoHeader = errors.New("no header found") + ErrNoData = errors.New("no data found") + ErrDataHashMismatch = errors.New("data hash does not match header") + ErrStateRootMismatch = errors.New("state root mismatch after execution") + ErrInvalidSignature = errors.New("invalid header signature") + ErrTooManyPending = errors.New("too many pending submissions") + ErrMaxRetriesExceeded = errors.New("max DA submission retries exceeded") + ErrSequencerMalicious = errors.New("sequencer failed to include forced tx") +) +``` + +## State Machines + +### Executor State Machine + +``` +┌──────────────┐ +│ IDLE │ +└──────┬───────┘ + │ BlockTime elapsed OR TxNotify + ▼ +┌──────────────┐ +│ CHECK_PENDING│──── Too many? ───► Wait +└──────┬───────┘ + │ OK + ▼ +┌──────────────┐ +│ GET_BATCH │ +└──────┬───────┘ + │ + ▼ +┌──────────────┐ +│ EXECUTE_TXS │ +└──────┬───────┘ + │ + ▼ +┌──────────────┐ +│ CREATE_BLOCK │ +└──────┬───────┘ + │ + ▼ +┌──────────────┐ +│ BROADCAST │ +└──────┬───────┘ + │ + ▼ +┌──────────────┐ +│ QUEUE_SUBMIT │───► Back to IDLE +└──────────────┘ +``` + +### Syncer State Machine + +``` +┌─────────────────────────────────────────┐ +│ START │ +└──────────────────┬──────────────────────┘ + │ + ┌──────────────┼──────────────┐ + │ │ │ + ▼ ▼ ▼ +┌───────┐ ┌───────┐ ┌───────────┐ +│ DA │ │ P2P │ │ FORCED │ +│WORKER │ │WORKER │ │ INCLUSION │ +└───┬───┘ └───┬───┘ └─────┬─────┘ + │ │ │ + └────────────┴──────────────┘ + │ + ▼ + ┌──────────────┐ + │ PROCESS_LOOP │ + └──────┬───────┘ + │ + ▼ + ┌──────────────┐ + │ VALIDATE │──── Invalid? ───► Log, skip + └──────┬───────┘ + │ Valid + ▼ + ┌──────────────┐ + │ EXECUTE │ + └──────┬───────┘ + │ + ▼ + ┌──────────────┐ + │ UPDATE_STATE│ + └──────┬───────┘ + │ + ▼ + ┌──────────────┐ + │ PERSIST │───► Back to PROCESS_LOOP + └──────────────┘ +``` + +### Submitter State Machine + +``` +┌──────────────┐ +│ START │ +└──────┬───────┘ + │ + ├─────────────────────┐ + │ │ + ▼ ▼ +┌──────────────┐ ┌──────────────────┐ +│ SUBMIT_LOOP │ │ INCLUSION_LOOP │ +└──────┬───────┘ └────────┬─────────┘ + │ │ + ▼ ▼ +┌──────────────┐ ┌──────────────────┐ +│ GET_PENDING │ │ CHECK_DA_HEIGHT │ +└──────┬───────┘ └────────┬─────────┘ + │ │ + ▼ │ Included? +┌──────────────┐ │ +│ SUBMIT │ ▼ +└──────┬───────┘ ┌──────────────────┐ + │ │ RESET_STATE │ + │ Failed? └────────┬─────────┘ + ▼ │ +┌──────────────┐ │ +│ RETRY │ │ +│ (backoff) │ │ +└──────┬───────┘ │ + │ │ + └─────────────────────┘ +``` diff --git a/.claude/skills/ev-node-explainer/da-sequencing.md b/.claude/skills/ev-node-explainer/da-sequencing.md new file mode 100644 index 000000000..57b2e94b4 --- /dev/null +++ b/.claude/skills/ev-node-explainer/da-sequencing.md @@ -0,0 +1,534 @@ +# Data Availability & Sequencing Architecture + +Deep dive into ev-node's DA layer and sequencing system. + +## Data Availability Layer + +### Overview + +The DA layer abstracts blob storage and retrieval. ev-node uses Celestia as the primary DA implementation but the interface is pluggable. + +### Directory Structure + +``` +pkg/da/ +├── types/ +│ ├── types.go # Core DA types (Blob, ID, Commitment, Proof) +│ ├── namespace.go # Namespace handling (29 bytes: version + ID) +│ └── errors.go # Error definitions +├── selector.go # Round-robin address selection +└── jsonrpc/ # Celestia JSON-RPC client + +block/internal/da/ +├── client.go # DA client wrapper +├── interface.go # Client + Verifier interfaces +├── forced_inclusion_retriever.go # Epoch-based forced tx retrieval +└── async_block_retriever.go # Background prefetching +``` + +### Core Types + +```go +// Status codes for DA operations +const ( + StatusSuccess + StatusNotFound + StatusNotIncludedInBlock + StatusAlreadyInMempool + StatusTooBig + StatusContextDeadline + StatusError + StatusIncorrectAccountSequence + StatusContextCanceled + StatusHeightFromFuture +) + +// Blob primitives +type Blob = []byte // Data submitted to DA +type ID = []byte // Height + commitment to locate blob +type Commitment = []byte // Cryptographic commitment +type Proof = []byte // Inclusion proof +``` + +### Namespace Format + +Namespaces are 29 bytes: +- **Version** (1 byte): Protocol version (max 255) +- **ID** (28 bytes): Namespace identifier + +Version 0 rules: +- First 18 bytes of ID must be zero +- Leaves 10 bytes for user data + +```go +func NewNamespaceV0(id []byte) (Namespace, error) { + if len(id) > 10 { + return Namespace{}, ErrInvalidNamespaceLength + } + ns := Namespace{Version: 0} + copy(ns.ID[28-len(id):], id) // Right-pad zeros + return ns, nil +} +``` + +### DA Client Interface + +```go +type Client interface { + // Submit blobs to DA layer + Submit(ctx context.Context, data [][]byte, gasPrice float64, + namespace []byte, options []byte) ResultSubmit + + // Retrieve all blobs at height for namespace + Retrieve(ctx context.Context, height uint64, + namespace []byte) ResultRetrieve + + // Get specific blobs by ID + Get(ctx context.Context, ids []ID, namespace []byte) ([]Blob, error) + + // Namespace accessors + GetHeaderNamespace() []byte + GetDataNamespace() []byte + GetForcedInclusionNamespace() []byte + HasForcedInclusionNamespace() bool +} + +type Verifier interface { + GetProofs(ctx context.Context, ids []ID, namespace []byte) ([]Proof, error) + Validate(ctx context.Context, ids []ID, proofs []Proof, + namespace []byte) ([]bool, error) +} + +type FullClient interface { + Client + Verifier +} +``` + +### Submit Flow + +```go +func (c *Client) Submit(ctx, data, gasPrice, namespace, options) ResultSubmit { + // 1. Validate blob size + for _, blob := range data { + if len(blob) > DefaultMaxBlobSize { + return ResultSubmit{Code: StatusTooBig} + } + } + + // 2. Create Celestia blobs with namespace + blobs := make([]*blob.Blob, len(data)) + for i, d := range data { + blobs[i], _ = blob.NewBlobV0(namespace, d) + } + + // 3. Submit via RPC + height, err := c.blobRPC.Submit(ctx, blobs, submitOptions) + + // 4. Return result with IDs + return ResultSubmit{ + Code: StatusSuccess, + Height: height, + IDs: createIDs(height, blobs), + } +} +``` + +### Retrieve Flow + +```go +func (c *Client) Retrieve(ctx, height, namespace) ResultRetrieve { + // 1. Fetch all blobs at height + blobs, err := c.blobRPC.GetAll(ctx, height, []Namespace{namespace}) + + // 2. Handle errors + if errors.Is(err, ErrBlobNotFound) { + return ResultRetrieve{Code: StatusNotFound} + } + if errors.Is(err, ErrHeightFromFuture) { + return ResultRetrieve{Code: StatusHeightFromFuture} + } + + // 3. Get timestamp from DA header + header, _ := c.headerRPC.GetByHeight(ctx, height) + + // 4. Extract blob data + data := make([][]byte, len(blobs)) + for i, b := range blobs { + data[i] = b.Data() + } + + return ResultRetrieve{ + Code: StatusSuccess, + Height: height, + Timestamp: header.Time().UnixNano(), + Data: data, + } +} +``` + +### Address Selection + +For Cosmos SDK compatibility (preventing sequence mismatches): + +```go +type RoundRobinSelector struct { + addresses []string + counter atomic.Uint64 +} + +func (s *RoundRobinSelector) Next() string { + idx := s.counter.Add(1) % uint64(len(s.addresses)) + return s.addresses[idx] +} +``` + +--- + +## Sequencing System + +### Overview + +Sequencers order transactions for block production. ev-node supports two modes: +- **Single Sequencer**: Hybrid (mempool + forced inclusion) +- **Based Sequencer**: Pure DA (only forced inclusion) + +### Directory Structure + +``` +core/sequencer/ +├── sequencing.go # Core interface +└── dummy.go # Test implementation + +pkg/sequencers/ +├── single/ +│ ├── sequencer.go # Hybrid sequencer +│ └── queue.go # Persistent batch queue +├── based/ +│ └── sequencer.go # Pure DA sequencer +└── common/ + └── checkpoint.go # Shared checkpoint logic +``` + +### Core Interface + +```go +type Sequencer interface { + // Submit transactions from reaper to sequencer + SubmitBatchTxs(ctx, req SubmitBatchTxsRequest) (*SubmitBatchTxsResponse, error) + + // Get next batch for block production + GetNextBatch(ctx, req GetNextBatchRequest) (*GetNextBatchResponse, error) + + // Verify batch was included in DA + VerifyBatch(ctx, req VerifyBatchRequest) (*VerifyBatchResponse, error) + + // DA height tracking for forced inclusion + SetDAHeight(height uint64) + GetDAHeight() uint64 +} +``` + +### Batch Structure + +```go +type Batch struct { + Transactions [][]byte + + // ForceIncludedMask[i] == true: From DA (MUST validate) + // ForceIncludedMask[i] == false: From mempool (already validated) + // nil: Backward compatibility (validate all) + ForceIncludedMask []bool +} +``` + +### Single Sequencer (Hybrid) + +Accepts both mempool transactions and forced inclusion from DA. + +**Components:** + +1. **BatchQueue** - Persistent mempool storage + ```go + type BatchQueue struct { + db DB + maxSize uint64 + nextSeq uint64 // Starts at 0x8000000000000000 + } + + func (q *BatchQueue) AddBatch(batch [][]byte) error + func (q *BatchQueue) Next() ([][]byte, error) + func (q *BatchQueue) Prepend(batch [][]byte) error // Return unused txs + ``` + +2. **Checkpoint** - Tracks position in DA epoch + ```go + type Checkpoint struct { + DAHeight uint64 // Current DA height being processed + TxIndex uint64 // Position within epoch + } + ``` + +**GetNextBatch Flow:** + +```go +func (s *SingleSequencer) GetNextBatch(ctx, req) (*Response, error) { + // 1. Check if need next DA epoch + if s.checkpoint.DAHeight > 0 && len(s.cachedForcedTxs) == 0 { + s.fetchNextDAEpoch(ctx) + } + + // 2. Process forced txs from checkpoint + forcedTxs, forcedBytes := s.processForcedTxs(req.MaxBytes) + + // 3. Get mempool txs (remaining space) + mempoolTxs := s.queue.Next() + mempoolTxs = truncateToSize(mempoolTxs, req.MaxBytes - forcedBytes) + + // 4. Return unused mempool txs to queue + s.queue.Prepend(unusedTxs) + + // 5. Combine batches + batch := &Batch{ + Transactions: append(forcedTxs, mempoolTxs...), + ForceIncludedMask: makeMask(len(forcedTxs), len(mempoolTxs)), + } + + // 6. Update and persist checkpoint + s.updateCheckpoint() + + return &Response{Batch: batch, Timestamp: s.timestamp} +} +``` + +### Based Sequencer (Pure DA) + +Only processes forced inclusion transactions. No mempool. + +**Key Differences:** + +```go +func (s *BasedSequencer) SubmitBatchTxs(ctx, req) (*Response, error) { + // No-op: Ignores mempool transactions + return &SubmitBatchTxsResponse{}, nil +} + +func (s *BasedSequencer) GetNextBatch(ctx, req) (*Response, error) { + // Only returns forced inclusion txs + txs := s.fetchForcedInclusion(ctx) + + // Timestamp spread: prevents duplicate timestamps + // timestamp = DAEpochEndTime - (remainingTxs * 1ms) + timestamp := s.calculateSpreadTimestamp() + + return &Response{ + Batch: &Batch{Transactions: txs}, + Timestamp: timestamp, + } +} + +func (s *BasedSequencer) VerifyBatch(ctx, req) (*Response, error) { + // Always true: All txs come from DA (already verified) + return &VerifyBatchResponse{Status: true}, nil +} +``` + +### Forced Inclusion Flow + +``` +User submits tx to DA forced-inclusion namespace + │ + ▼ +DA stores tx at height H + │ + ▼ +Sequencer detects epoch boundary + │ + ▼ +ForcedInclusionRetriever.Retrieve(epochStart, epochEnd) + │ + ├── AsyncBlockRetriever checks cache + │ │ + │ ├── Cache hit: Return cached block + │ │ + │ └── Cache miss: Sync fetch from DA + │ + ▼ +Return ForcedInclusionEvent{Txs, Timestamp} + │ + ▼ +Sequencer caches txs, updates checkpoint + │ + ▼ +GetNextBatch returns txs with ForceIncludedMask[i]=true + │ + ▼ +Executor passes mask to execution layer + │ + ▼ +Execution layer validates forced txs (skips mempool validation) +``` + +### Async Block Retriever + +Background prefetching reduces latency: + +```go +type AsyncBlockRetriever struct { + client DAClient + cache map[uint64]*Block // In-memory cache + currentHeight atomic.Uint64 + prefetchSize uint64 // 2x epoch size + pollInterval time.Duration // DA block time +} + +func (r *AsyncBlockRetriever) Start(ctx context.Context) { + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(r.pollInterval): + r.prefetch() + } + } + }() +} + +func (r *AsyncBlockRetriever) prefetch() { + current := r.currentHeight.Load() + end := current + r.prefetchSize + + for h := current; h < end; h++ { + if _, exists := r.cache[h]; !exists { + block, _ := r.client.Retrieve(ctx, h, namespace) + r.cache[h] = block + } + } + + // Cleanup old entries + r.cleanupBefore(current - r.prefetchSize) +} +``` + +--- + +## Integration with Block Package + +### Executor Integration + +```go +func (e *Executor) initializeState() error { + state := e.store.GetState() + + // Sync sequencer DA height with stored state + e.sequencer.SetDAHeight(state.DAHeight) + + return nil +} + +func (e *Executor) produceBlock(ctx context.Context) error { + // 1. Get batch from sequencer + resp, _ := e.sequencer.GetNextBatch(ctx, GetNextBatchRequest{ + Id: e.genesis.ChainID, + MaxBytes: DefaultMaxBlobSize, + }) + + // 2. Pass ForceIncludedMask to execution layer + ctx = WithForceIncludedMask(ctx, resp.Batch.ForceIncludedMask) + + // 3. Execute transactions + stateRoot, _ := e.exec.ExecuteTxs(ctx, resp.Batch.Transactions, ...) + + // 4. Update state with new DA height + newState := &State{ + DAHeight: e.sequencer.GetDAHeight(), + // ... + } + + // 5. Create and broadcast block + // ... +} +``` + +### Configuration + +```go +type DAConfig struct { + Address string // Celestia RPC endpoint + AuthToken string // Auth token + Namespace string // Header namespace + DataNamespace string // Data namespace (optional) + ForcedInclusionNamespace string // Forced inclusion namespace + BlockTime Duration // DA block time + SubmitOptions string // JSON gas settings + SigningAddresses []string // Round-robin addresses + MaxSubmitAttempts int // Retry limit + RequestTimeout Duration // Per-request timeout +} + +type NodeConfig struct { + Aggregator bool // Enable block production + BasedSequencer bool // Use based sequencer (requires Aggregator) + BlockTime Duration // App block time + LazyMode bool // Only produce on txs +} +``` + +### Genesis Configuration + +```go +type Genesis struct { + DAStartHeight uint64 // First DA height (0 at genesis) + DAEpochForcedInclusion uint64 // Epoch size (default 50) +} +``` + +--- + +## Key Design Decisions + +### 1. ForceIncludedMask Optimization + +Distinguishes DA-sourced (untrusted) from mempool (trusted) transactions: +- Execution layer validates forced txs +- Skips redundant validation for mempool txs +- Significant performance improvement + +### 2. Epoch-Based Processing + +Only retrieves forced inclusion at epoch boundaries: +- Reduces DA queries +- Enables batching +- Checkpoint ensures resumable processing + +### 3. Async Prefetching + +Background goroutine prefetches 2x epoch size ahead: +- Reduces latency when sequencer needs txs +- Cache misses fall back to sync fetch +- Bounded memory via cleanup + +### 4. Namespace Strategy + +Three separate namespaces: +- **Header**: Block headers (required) +- **Data**: Transaction data (optional, can share with header) +- **Forced Inclusion**: User-submitted txs for censorship resistance + +### 5. Crash Recovery + +Both sequencers persist state: +- **Checkpoint**: DAHeight + TxIndex position +- **Queue**: Pending mempool batches +- Protobuf serialization to DB + +### 6. Single vs Based Mode + +| Aspect | Single | Based | +|--------|--------|-------| +| Mempool | Yes | No | +| Forced Inclusion | Yes | Yes (only source) | +| SubmitBatchTxs | Stores in queue | No-op | +| VerifyBatch | Validates proofs | Always true | +| Use Case | Traditional rollup | High liveness | diff --git a/.github/workflows/update-onboarding-skill.yml b/.github/workflows/update-onboarding-skill.yml new file mode 100644 index 000000000..be023f1c0 --- /dev/null +++ b/.github/workflows/update-onboarding-skill.yml @@ -0,0 +1,126 @@ +--- +name: Update Onboarding Skill + +"on": + schedule: + - cron: '0 9 * * 6' # Run every Saturday at 09:00 UTC + workflow_dispatch: # Allow manual trigger + +jobs: + update-skill: + runs-on: ubuntu-latest + permissions: + contents: write + pull-requests: write + id-token: write + steps: + - name: Checkout repository + uses: actions/checkout@v6 + with: + fetch-depth: 0 # Full history for commit analysis + + - name: Get commits from last week + id: commits + env: + LOOKBACK_DAYS: "7" + run: | + SINCE=$(date -d "${LOOKBACK_DAYS} days ago" --iso-8601) + echo "since=${SINCE}" >> "$GITHUB_OUTPUT" + + # Get commit log for relevant paths + COMMIT_LOG=$(git log --since="${SINCE}" --oneline --no-merges \ + -- '*.go' 'block/' 'core/' 'types/' 'pkg/' 2>/dev/null || echo "") + + if [ -z "$COMMIT_LOG" ]; then + echo "No relevant commits in the last week" + echo "has_commits=false" >> "$GITHUB_OUTPUT" + else + echo "has_commits=true" >> "$GITHUB_OUTPUT" + # Save to file for Claude to read + echo "${COMMIT_LOG}" > /tmp/weekly_commits.txt + echo "Found $(echo "${COMMIT_LOG}" | wc -l) commits" + fi + + - name: Run Claude to update skill + if: steps.commits.outputs.has_commits == 'true' + id: claude + uses: anthropics/claude-code-action@v1 + env: + SINCE_DATE: ${{ steps.commits.outputs.since }} + with: + claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} + direct_prompt: | + You are updating the ev-node-explainer skill based on recent code changes. + + ## Task + 1. Read the commits from the last week using the SINCE_DATE environment variable + 2. For significant changes, read the actual diffs in block/, core/, types/ + 3. Analyze what changed in the block package, core interfaces, or types + 4. Update `.claude/skills/ev-node-explainer/SKILL.md` and/or `block-architecture.md` if needed + + ## Guidelines + - Only update if there are meaningful architectural changes + - Keep documentation accurate to the current codebase + - Add new sections for new components + - Update state machines if flows changed + - Update file lists if new files were added + - Do NOT make changes if commits are just bug fixes or minor tweaks + + ## Output + - If updates were made, commit with message "docs: update ev-node-explainer skill" + - If no meaningful updates needed, output "No skill updates required" + + allowed_tools: | + Bash(git:*) + Bash(date:*) + Read + Edit + Write + Glob + Grep + + - name: Create Pull Request + if: steps.commits.outputs.has_commits == 'true' + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + # Check if there are changes + if git diff --quiet HEAD; then + echo "No changes to commit" + exit 0 + fi + + # Configure git + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + + BRANCH="docs/update-onboarding-skill-$(date +%Y%m%d)" + git checkout -b "${BRANCH}" 2>/dev/null || git checkout "${BRANCH}" + + # Stage and commit any changes Claude made + git add -A + git commit -m "docs: update ev-node-explainer skill based on weekly changes + + Co-Authored-By: Claude " || { + echo "Nothing to commit" + exit 0 + } + + git push -u origin "${BRANCH}" + + # Create PR + gh pr create \ + --title "docs: weekly update to ev-node-explainer skill" \ + --body "## Summary + Automated weekly update to the ev-node-explainer skill based on code changes from the past week. + + ## Changes + This PR updates the onboarding documentation to reflect recent architectural changes. + + ## Review + Please review the documentation changes for accuracy. + + --- + Generated automatically by the weekly skill update workflow" \ + --base main \ + --head "${BRANCH}" diff --git a/.gitignore b/.gitignore index 08d1d2bfd..5ccf28f52 100644 --- a/.gitignore +++ b/.gitignore @@ -28,7 +28,6 @@ docs/.vitepress/cache .temp .vite_opt_cache .vscode -.claude .gocache .gomodcache /.cache diff --git a/README.md b/README.md index 5ce13f579..013fc7360 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,25 @@ TestApp serves as a reference implementation to help you get started with your o Check out our tutorials on our [website][docs]. +## Onboarding with Claude Code + +Use [Claude Code](https://claude.ai/code) to explore the codebase: + +```bash +cd ev-node && claude +``` + +Example prompts: + +| Goal | Prompt | +|------|--------| +| Overview | "How does ev-node work?" | +| Block package | "Explain block production flow" | +| DA layer | "How does the DA layer work?" | +| Sequencing | "Explain single vs based sequencer" | + +The `.claude/skills/ev-node-explainer/` skill provides architecture docs for block, DA, and sequencing systems. + ## Contributing We welcome your contributions! Everyone is welcome to contribute, whether it's