Skip to content

Conversation

@zhaohaidao
Copy link
Contributor

Purpose

Linked issue: close #141

Brief change log

Tests

API and Format

Documentation

@zhaohaidao
Copy link
Contributor Author

@luoyuxia Hi, PTAL if u have time

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request improves error handling in the read path of the Fluss Rust client, focusing on fetch and scan operations. The changes add robust error handling for API errors, corrupt messages, and transient failures during log fetching and scanning.

Changes:

  • Enhanced error handling in list_offsets.rs to properly convert API error codes to Fluss errors with descriptive messages
  • Implemented comprehensive error handling in the scanner with proper metadata refresh scheduling for transient errors
  • Added error propagation through the LogFetchBuffer with new ErrorCompletedFetch type to handle errors at the fetch level
  • Improved projection validation in arrow.rs with bounds checking and better error messages
  • Added extensive test coverage for new error paths, including tests for wakeup errors, API errors, and metadata refresh scheduling

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
crates/fluss/src/error.rs Added new WakeupError variant for handling wakeup interruptions
crates/fluss/src/rpc/message/list_offsets.rs Improved error handling to properly convert error codes to API errors with descriptive messages
crates/fluss/src/client/table/scanner.rs Added MetadataRefreshScheduler for automatic metadata refresh on transient errors, enhanced error handling throughout fetch operations
crates/fluss/src/client/table/log_fetch_buffer.rs Implemented ErrorCompletedFetch type, added error propagation methods, improved corrupt record handling
crates/fluss/src/record/arrow.rs Added projection validation with bounds checking, improved error messages for projection failures
crates/fluss/src/util/mod.rs Added tests for FairBucketStatusMap
crates/fluss/src/row/column.rs Added tests for ColumnarRow
crates/fluss/src/record/mod.rs Added tests for ChangeType, ScanRecords, and ScanRecord
crates/fluss/src/client/write/sender.rs Added HashMap import

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 835 to 840
);
}
FlussError::UnknownTableOrBucketException => {
warn!(
"Received unknown table or bucket error in fetch for bucket {table_bucket}"
);
Copy link

Copilot AI Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the handle_fetch_response function, errors like NotLeaderOrFollower, LogStorageException, KvStorageException, StorageException, FencedLeaderEpochException, and UnknownTableOrBucketException are logged but do not trigger metadata refresh. However, in the initialize_fetch function (lines 1071-1085), these same errors do trigger metadata refresh via schedule_metadata_update. This inconsistency could lead to delayed recovery from transient errors. Consider adding metadata refresh scheduling for these error cases in handle_fetch_response as well.

Suggested change
);
}
FlussError::UnknownTableOrBucketException => {
warn!(
"Received unknown table or bucket error in fetch for bucket {table_bucket}"
);
);
self.schedule_metadata_update();
}
FlussError::UnknownTableOrBucketException => {
warn!(
"Received unknown table or bucket error in fetch for bucket {table_bucket}"
);
self.schedule_metadata_update();

Copilot uses AI. Check for mistakes.
Comment on lines 500 to 543
tokio::spawn(async move {
loop {
let (delay, error_for_log) = {
let mut guard = state.lock();
if !guard.pending {
guard.running = false;
return;
}
guard.pending = false;

let now = Instant::now();
let delay = match guard.last_refresh {
Some(last) => {
let earliest = last + min_interval;
if now < earliest {
earliest - now
} else {
Duration::from_millis(0)
}
}
None => Duration::from_millis(0),
};
(delay, guard.last_error.take())
};

if !delay.is_zero() {
sleep(delay).await;
}

if let Err(e) = (refresh)().await {
if let Some(error) = error_for_log {
warn!(
"Failed to update metadata for {table_path} after fetch error {error:?}: {e:?}"
);
} else {
warn!("Failed to update metadata for {table_path}: {e:?}");
}
}

let mut guard = state.lock();
guard.last_refresh = Some(Instant::now());
}
});
}
Copy link

Copilot AI Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MetadataRefreshScheduler spawns a background tokio task (line 500) that is not explicitly tracked or canceled. When the LogFetcher (and its metadata_refresh field) is dropped, this background task will continue running until it naturally exits, potentially holding references to shared state longer than necessary. Consider using JoinHandle to track the spawned task and implement a cancellation mechanism (e.g., via CancellationToken from tokio_util) to ensure clean shutdown when the LogFetcher is dropped.

Copilot uses AI. Check for mistakes.

fn fetch_error(&self) -> Error {
let mut message = format!(
"Received exception when fetching the next record from {table_bucket}. If needed, please back to past the record to continue scanning.",
Copy link

Copilot AI Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The phrase "back to past the record" is grammatically incorrect. It should be "go back past the record" or "skip past the record" to be clear and correct.

Suggested change
"Received exception when fetching the next record from {table_bucket}. If needed, please back to past the record to continue scanning.",
"Received exception when fetching the next record from {table_bucket}. If needed, please skip past the record to continue scanning.",

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhaohaidao Thanks for the pr. Left some comments. PTAL

corrupt_last_record: bool,
}

impl DefaultCompletedFetch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, we don't make it return Result

pub fn new(
        table_bucket: TableBucket,
        log_record_batch: LogRecordsBatches,
        size_in_bytes: usize,
        read_context: ReadContext,
        fetch_offset: i64,
        high_watermark: i64,
    ) -> Self {
        Self {
            table_bucket,
            log_record_batch,
            read_context,
            next_fetch_offset: fetch_offset,
            high_watermark,
            size_in_bytes,
            consumed: false,
            initialized: false,
            records_read: 0,
            current_record_iterator: None,
            current_record_batch: None,
            last_record: None,
            cached_record_error: None,
            corrupt_last_record: false,
        }
    }

completed_to_push.push(completed);
has_completed = true;
}
Err(e) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if meet any error, shouldn't it still mark has_completed to false and retry again?
Otherwsie, the fail pending fetch will be ignore and start to read next fetch, which case miss the data in pending fetch, and we'll nerver know that..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the fetch request is driven by the user's poll function, rather than automatically retrying in the background?

And some errors, such as outdated metadata, require explicit metadata updates, not automatic retries.

I looked at the Java code, and it doesn't seem to have any retry logic; it just completes the fetch request directly?

Please feel free to correct me if I've misunderstood anything. @luoyuxia

scan_records.push(record);
} else {
break;
if self.cached_record_error.is_none() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether the following code more clear?

for _ in 0..max_records {
            if self.cached_record_error.is_some() {
                break;
            }
            
            self.corrupt_last_record = true;
            match self.next_fetched_record() {
                Ok(Some(record)) => {
                    self.corrupt_last_record = false;
                    self.next_fetch_offset = record.offset() + 1;
                    self.records_read += 1;
                    scan_records.push(record);
                }
                Ok(None) => {
                    self.corrupt_last_record = false;
                    self.last_record = None;
                    break;
                }
                Err(e) => {
                    self.cached_record_error = Some(e);
                    break;
                }
            }
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From your suggestion, I see there's no logic related to last_record.take. Using last_record is to preserve the behavior of "partial success followed by error" (returning read records first, then reporting the error on the next attempt).
This logic itself is also to maintain consistency with Java.

.update_offset(&table_bucket, next_fetch_offset);
}

if next_in_line_fetch.is_consumed() && next_in_line_fetch.records_read() > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem no such logic in java side, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it remind me that next_in_line_fetch.drain()
should moveBucketToEnd just like java.
Curernt drain won't move bucket to end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust's CompletedFetch does not have a LogScannerStatus reference, so the move_to_end is implemented in fetch_records_from_fetch.
Do you have any suggestions?

if result.is_empty() {
return Err(e);
}
if !next_fetch.is_consumed() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it duplicated with https://github.com/apache/fluss-rust/pull/143/files#diff-540a310c6f8139ea4592281917afecdd364792ffc3655a65221307465cabd09eR1041
and i think this peice of code is error-prone. I'm wondering whether the following code will be more straight forward and more coinsistent with java side to avoid potential bug

let collect_result: Result<()> = {
            while records_remaining > 0 {
                // Get the next in line fetch, or get a new one from buffer
                let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
                if next_in_line.is_none() || next_in_line.as_ref().unwrap().is_consumed() {
                    // Get a new fetch from buffer
                    if let Some(completed_fetch) = self.log_fetch_buffer.poll() {
                        // Initialize the fetch if not already initialized
                        if !completed_fetch.is_initialized() {
                            let size_in_bytes = completed_fetch.size_in_bytes();
                            match self.initialize_fetch(completed_fetch) {
                                Ok(initialized) => {
                                    self.log_fetch_buffer.set_next_in_line_fetch(initialized);
                                    continue;
                                }
                                Err(e) => {
                                    if result.is_empty() && size_in_bytes == 0 {
                                        // todo: add back completed_fetch to log_fetch_buffer, 
                                        // otherwise, we will skip this fetch error by mistake, which cause
                                        // data loss but user never have chance to know it. 
                                        
                                    }
                                    return Err(e);
                                }
                            }
                        } else {
                            self.log_fetch_buffer
                                .set_next_in_line_fetch(Some(completed_fetch));
                        }
                    } else {
                        // No more fetches available
                        break;
                    }
                } else {
                    // Fetch records from next_in_line
                    if let Some(mut next_fetch) = next_in_line {
                        let records =
                            self.fetch_records_from_fetch(&mut next_fetch, records_remaining)?;

                        if !records.is_empty() {
                            let table_bucket = next_fetch.table_bucket().clone();
                            // Merge with existing records for this bucket
                            let existing = result.entry(table_bucket).or_default();
                            let records_count = records.len();
                            existing.extend(records);

                            records_remaining = records_remaining.saturating_sub(records_count);
                        }

                        // If the fetch is not fully consumed, put it back for the next round
                        if !next_fetch.is_consumed() {
                            self.log_fetch_buffer
                                .set_next_in_line_fetch(Some(next_fetch));
                        }
                        // If consumed, next_fetch will be dropped here (which is correct)
                    }
                }
            }
            Ok(())
        };

        match collect_result {
            Ok(_) => Ok(result),
            Err(e) => {
                // If we have already collected records, suppress the exception 
                // and return the partial results to the user first.
                if result.is_empty() {
                    Err(e)
                } else {
                    // Swallow the exception and return the currently accumulated result.
                    // The error will likely be re-triggered during the next poll attempt.
                    Ok(result)
                }
            }
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, when Rust returns an Err in fetch_records_from_fetch, if next_fetch has not been consumed, it will put next_in_line_fetch back so that it can continue next time. The version you provided uses ? to directly return an Err, which does not backfetch and may result in the loss of unconsumed fetches.
I changed it to match here. When Err is called, if next_fetch has not been consumed, it is first put back into next_in_line_fetch before returning Err(e).

@zhaohaidao
Copy link
Contributor Author

@luoyuxia Thank you so much for your thorough and patient review. I have processed the clear comments, but there are still some unclear points that require your further consideration. PTAL if u have time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Improve Read Path Fault Tolerance Abilities

2 participants