Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 166 additions & 134 deletions bindings/cpp/examples/example.cpp

Large diffs are not rendered by default.

139 changes: 113 additions & 26 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,17 @@ struct NamedGetters {
private:
const Derived& Self() const { return static_cast<const Derived&>(*this); }
};

struct ScanData {
ffi::ScanResultInner* raw;
ColumnMap columns;

ScanData(ffi::ScanResultInner* r, ColumnMap cols) : raw(r), columns(std::move(cols)) {}
~ScanData();

ScanData(const ScanData&) = delete;
ScanData& operator=(const ScanData&) = delete;
};
} // namespace detail

class GenericRow {
Expand Down Expand Up @@ -623,9 +634,8 @@ class RowView : public detail::NamedGetters<RowView> {
friend struct detail::NamedGetters<RowView>;

public:
RowView(std::shared_ptr<const ffi::ScanResultInner> inner, size_t record_idx,
std::shared_ptr<const detail::ColumnMap> column_map)
: inner_(std::move(inner)), record_idx_(record_idx), column_map_(std::move(column_map)) {}
RowView(std::shared_ptr<const detail::ScanData> data, size_t bucket_idx, size_t rec_idx)
: data_(std::move(data)), bucket_idx_(bucket_idx), rec_idx_(rec_idx) {}

// ── Index-based getters ──────────────────────────────────────────
size_t FieldCount() const;
Expand Down Expand Up @@ -660,29 +670,90 @@ class RowView : public detail::NamedGetters<RowView> {

private:
size_t Resolve(const std::string& name) const {
if (!column_map_) {
if (!data_) {
throw std::runtime_error("RowView: name-based access not available");
}
return detail::ResolveColumn(*column_map_, name);
return detail::ResolveColumn(data_->columns, name);
}
std::shared_ptr<const detail::ScanData> data_;
size_t bucket_idx_;
size_t rec_idx_;
};

/// Identifies a specific bucket, optionally within a partition.
struct TableBucket {
int64_t table_id;
int32_t bucket_id;
std::optional<int64_t> partition_id;

bool operator==(const TableBucket& other) const {
return table_id == other.table_id && bucket_id == other.bucket_id &&
partition_id == other.partition_id;
}
std::shared_ptr<const ffi::ScanResultInner> inner_;
size_t record_idx_;
std::shared_ptr<const detail::ColumnMap> column_map_;

bool operator!=(const TableBucket& other) const { return !(*this == other); }
};

/// A single scan record. Contains metadata and a RowView for field access.
///
/// ScanRecord is a value type that can be freely copied, stored, and
/// accumulated across multiple Poll() calls.
struct ScanRecord {
int32_t bucket_id;
std::optional<int64_t> partition_id;
int64_t offset;
int64_t timestamp;
ChangeType change_type;
RowView row;
};

/// A view into a subset of scan results for a single bucket.
///
/// BucketView is a value type — it shares ownership of the underlying scan data
/// via reference counting, so it can safely outlive the ScanRecords that produced it.
class BucketView {
public:
BucketView(std::shared_ptr<const detail::ScanData> data, TableBucket bucket, size_t bucket_idx,
size_t count)
: data_(std::move(data)),
bucket_(std::move(bucket)),
bucket_idx_(bucket_idx),
count_(count) {}

/// The bucket these records belong to.
const TableBucket& Bucket() const { return bucket_; }

/// Number of records in this bucket.
size_t Size() const { return count_; }
bool Empty() const { return count_ == 0; }

/// Access a record by its position within this bucket (0-based).
ScanRecord operator[](size_t idx) const;

class Iterator {
public:
ScanRecord operator*() const;
Iterator& operator++() {
++idx_;
return *this;
}
bool operator!=(const Iterator& other) const { return idx_ != other.idx_; }

private:
friend class BucketView;
Iterator(const BucketView* owner, size_t idx) : owner_(owner), idx_(idx) {}
const BucketView* owner_;
size_t idx_;
};

Iterator begin() const { return Iterator(this, 0); }
Iterator end() const { return Iterator(this, count_); }

private:
std::shared_ptr<const detail::ScanData> data_;
TableBucket bucket_;
size_t bucket_idx_;
size_t count_;
};

class ScanRecords {
public:
ScanRecords() noexcept = default;
Expand All @@ -693,36 +764,52 @@ class ScanRecords {
ScanRecords(ScanRecords&&) noexcept = default;
ScanRecords& operator=(ScanRecords&&) noexcept = default;

size_t Size() const;
bool Empty() const;
ScanRecord operator[](size_t idx) const;
/// Total number of records across all buckets.
size_t Count() const;
bool IsEmpty() const;

/// Number of distinct buckets with records.
size_t BucketCount() const;

/// List of distinct buckets that have records.
std::vector<TableBucket> Buckets() const;

/// Get a view of records for a specific bucket.
///
/// Returns an empty BucketView if the bucket is not present (matches Rust/Java).
/// Note: O(B) linear scan. For iteration over all buckets, prefer BucketAt(idx).
BucketView Records(const TableBucket& bucket) const;

/// Get a view of records by bucket index (0-based). O(1).
///
/// Throws std::out_of_range if idx >= BucketCount().
BucketView BucketAt(size_t idx) const;

/// Flat iterator over all records across all buckets (matches Java Iterable<ScanRecord>).
class Iterator {
public:
ScanRecord operator*() const;
Iterator& operator++() {
++idx_;
return *this;
Iterator& operator++();
bool operator!=(const Iterator& other) const {
return bucket_idx_ != other.bucket_idx_ || rec_idx_ != other.rec_idx_;
}
bool operator!=(const Iterator& other) const { return idx_ != other.idx_; }

private:
friend class ScanRecords;
Iterator(const ScanRecords* owner, size_t idx) : owner_(owner), idx_(idx) {}
Iterator(const ScanRecords* owner, size_t bucket_idx, size_t rec_idx)
: owner_(owner), bucket_idx_(bucket_idx), rec_idx_(rec_idx) {}
const ScanRecords* owner_;
size_t idx_;
size_t bucket_idx_;
size_t rec_idx_;
};

Iterator begin() const { return Iterator(this, 0); }
Iterator end() const { return Iterator(this, Size()); }
Iterator begin() const;
Iterator end() const { return Iterator(this, BucketCount(), 0); }

private:
/// Returns the column name-to-index map (lazy-built, cached).
const std::shared_ptr<detail::ColumnMap>& GetColumnMap() const;
friend class LogScanner;
void BuildColumnMap() const;
std::shared_ptr<ffi::ScanResultInner> inner_;
mutable std::shared_ptr<detail::ColumnMap> column_map_;
ScanRecord RecordAt(size_t bucket, size_t rec_idx) const;
std::shared_ptr<const detail::ScanData> data_;
};

class ArrowRecordBatch {
Expand Down
Loading
Loading