diff --git a/bindings/cpp/examples/example.cpp b/bindings/cpp/examples/example.cpp index ea966d8e..d86ee5cd 100644 --- a/bindings/cpp/examples/example.cpp +++ b/bindings/cpp/examples/example.cpp @@ -168,58 +168,75 @@ int main() { fluss::ScanRecords records; check("poll", scanner.Poll(5000, records)); - std::cout << "Scanned records: " << records.Size() << std::endl; + // Flat iteration over all records (regardless of bucket) + std::cout << "Scanned records: " << records.Count() << " across " << records.BucketCount() + << " buckets" << std::endl; + for (const auto& rec : records) { + std::cout << " offset=" << rec.offset << " timestamp=" << rec.timestamp << std::endl; + } + + // Per-bucket access (with type verification) bool scan_ok = true; bool found_null_row = false; - for (const auto& rec : records) { - // Check if this is the all-null row (matches Rust: is_null_at for every column) - if (rec.row.IsNull(0)) { - found_null_row = true; - for (size_t i = 0; i < rec.row.FieldCount(); ++i) { - if (!rec.row.IsNull(i)) { - std::cerr << "ERROR: column " << i << " should be null" << std::endl; - scan_ok = false; + for (const auto& tb : records.Buckets()) { + auto view = records.Records(tb); + std::cout << " Bucket " << tb.bucket_id; + if (tb.partition_id.has_value()) { + std::cout << " (partition=" << *tb.partition_id << ")"; + } + std::cout << ": " << view.Size() << " records" << std::endl; + for (const auto& rec : view) { + // Check if this is the all-null row + if (rec.row.IsNull(0)) { + found_null_row = true; + for (size_t i = 0; i < rec.row.FieldCount(); ++i) { + if (!rec.row.IsNull(i)) { + std::cerr << "ERROR: column " << i << " should be null" << std::endl; + scan_ok = false; + } } + std::cout << " [null row] all " << rec.row.FieldCount() << " fields are null" + << std::endl; + continue; } - std::cout << " [null row] all " << rec.row.FieldCount() << " fields are null" - << std::endl; - continue; - } - - // Non-null rows: verify types - if (rec.row.GetType(4) != fluss::TypeId::Date) { - std::cerr << "ERROR: field 4 expected Date, got " - << static_cast(rec.row.GetType(4)) << std::endl; - scan_ok = false; - } - if (rec.row.GetType(5) != fluss::TypeId::Time) { - std::cerr << "ERROR: field 5 expected Time, got " - << static_cast(rec.row.GetType(5)) << std::endl; - scan_ok = false; - } - if (rec.row.GetType(6) != fluss::TypeId::Timestamp) { - std::cerr << "ERROR: field 6 expected Timestamp, got " - << static_cast(rec.row.GetType(6)) << std::endl; - scan_ok = false; - } - if (rec.row.GetType(7) != fluss::TypeId::TimestampLtz) { - std::cerr << "ERROR: field 7 expected TimestampLtz, got " - << static_cast(rec.row.GetType(7)) << std::endl; - scan_ok = false; - } - // Name-based getters (equivalent to index-based above) - auto date = rec.row.GetDate("event_date"); - auto time = rec.row.GetTime("event_time"); - auto ts_ntz = rec.row.GetTimestamp("created_at"); - auto ts_ltz = rec.row.GetTimestamp("updated_at"); + // Non-null rows: verify types + if (rec.row.GetType(4) != fluss::TypeId::Date) { + std::cerr << "ERROR: field 4 expected Date, got " + << static_cast(rec.row.GetType(4)) << std::endl; + scan_ok = false; + } + if (rec.row.GetType(5) != fluss::TypeId::Time) { + std::cerr << "ERROR: field 5 expected Time, got " + << static_cast(rec.row.GetType(5)) << std::endl; + scan_ok = false; + } + if (rec.row.GetType(6) != fluss::TypeId::Timestamp) { + std::cerr << "ERROR: field 6 expected Timestamp, got " + << static_cast(rec.row.GetType(6)) << std::endl; + scan_ok = false; + } + if (rec.row.GetType(7) != fluss::TypeId::TimestampLtz) { + std::cerr << "ERROR: field 7 expected TimestampLtz, got " + << static_cast(rec.row.GetType(7)) << std::endl; + scan_ok = false; + } - std::cout << " id=" << rec.row.GetInt32("id") << " name=" << rec.row.GetString("name") - << " score=" << rec.row.GetFloat32("score") << " age=" << rec.row.GetInt32("age") - << " date=" << date.Year() << "-" << date.Month() << "-" << date.Day() - << " time=" << time.Hour() << ":" << time.Minute() << ":" << time.Second() - << " ts_ntz=" << ts_ntz.epoch_millis << " ts_ltz=" << ts_ltz.epoch_millis << "+" - << ts_ltz.nano_of_millisecond << "ns" << std::endl; + // Name-based getters + auto date = rec.row.GetDate("event_date"); + auto time = rec.row.GetTime("event_time"); + auto ts_ntz = rec.row.GetTimestamp("created_at"); + auto ts_ltz = rec.row.GetTimestamp("updated_at"); + + std::cout << " id=" << rec.row.GetInt32("id") + << " name=" << rec.row.GetString("name") + << " score=" << rec.row.GetFloat32("score") + << " age=" << rec.row.GetInt32("age") << " date=" << date.Year() << "-" + << date.Month() << "-" << date.Day() << " time=" << time.Hour() << ":" + << time.Minute() << ":" << time.Second() << " ts_ntz=" << ts_ntz.epoch_millis + << " ts_ltz=" << ts_ltz.epoch_millis << "+" << ts_ltz.nano_of_millisecond + << "ns" << std::endl; + } } if (!found_null_row) { @@ -246,32 +263,34 @@ int main() { fluss::ScanRecords projected_records; check("poll_projected", projected_scanner.Poll(5000, projected_records)); - std::cout << "Projected records: " << projected_records.Size() << std::endl; - for (const auto& rec : projected_records) { - if (rec.row.FieldCount() != 2) { - std::cerr << "ERROR: expected 2 fields, got " << rec.row.FieldCount() << std::endl; - scan_ok = false; - continue; - } - // Skip the all-null row - if (rec.row.IsNull(0)) { - std::cout << " [null row] skipped" << std::endl; - continue; - } - if (rec.row.GetType(0) != fluss::TypeId::Int) { - std::cerr << "ERROR: projected field 0 expected Int, got " - << static_cast(rec.row.GetType(0)) << std::endl; - scan_ok = false; - } - if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) { - std::cerr << "ERROR: projected field 1 expected TimestampLtz, got " - << static_cast(rec.row.GetType(1)) << std::endl; - scan_ok = false; - } + std::cout << "Projected records: " << projected_records.Count() << std::endl; + for (const auto& tb : projected_records.Buckets()) { + for (const auto& rec : projected_records.Records(tb)) { + if (rec.row.FieldCount() != 2) { + std::cerr << "ERROR: expected 2 fields, got " << rec.row.FieldCount() << std::endl; + scan_ok = false; + continue; + } + // Skip the all-null row + if (rec.row.IsNull(0)) { + std::cout << " [null row] skipped" << std::endl; + continue; + } + if (rec.row.GetType(0) != fluss::TypeId::Int) { + std::cerr << "ERROR: projected field 0 expected Int, got " + << static_cast(rec.row.GetType(0)) << std::endl; + scan_ok = false; + } + if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) { + std::cerr << "ERROR: projected field 1 expected TimestampLtz, got " + << static_cast(rec.row.GetType(1)) << std::endl; + scan_ok = false; + } - auto ts = rec.row.GetTimestamp(1); - std::cout << " id=" << rec.row.GetInt32(0) << " updated_at=" << ts.epoch_millis << "+" - << ts.nano_of_millisecond << "ns" << std::endl; + auto ts = rec.row.GetTimestamp(1); + std::cout << " id=" << rec.row.GetInt32(0) << " updated_at=" << ts.epoch_millis << "+" + << ts.nano_of_millisecond << "ns" << std::endl; + } } // 7b) Projected scan by column names — same columns as above but using names @@ -287,32 +306,34 @@ int main() { fluss::ScanRecords name_projected_records; check("poll_name_projected", name_projected_scanner.Poll(5000, name_projected_records)); - std::cout << "Name-projected records: " << name_projected_records.Size() << std::endl; - for (const auto& rec : name_projected_records) { - if (rec.row.FieldCount() != 2) { - std::cerr << "ERROR: expected 2 fields, got " << rec.row.FieldCount() << std::endl; - scan_ok = false; - continue; - } - // Skip the all-null row - if (rec.row.IsNull(0)) { - std::cout << " [null row] skipped" << std::endl; - continue; - } - if (rec.row.GetType(0) != fluss::TypeId::Int) { - std::cerr << "ERROR: name-projected field 0 expected Int, got " - << static_cast(rec.row.GetType(0)) << std::endl; - scan_ok = false; - } - if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) { - std::cerr << "ERROR: name-projected field 1 expected TimestampLtz, got " - << static_cast(rec.row.GetType(1)) << std::endl; - scan_ok = false; - } + std::cout << "Name-projected records: " << name_projected_records.Count() << std::endl; + for (const auto& tb : name_projected_records.Buckets()) { + for (const auto& rec : name_projected_records.Records(tb)) { + if (rec.row.FieldCount() != 2) { + std::cerr << "ERROR: expected 2 fields, got " << rec.row.FieldCount() << std::endl; + scan_ok = false; + continue; + } + // Skip the all-null row + if (rec.row.IsNull(0)) { + std::cout << " [null row] skipped" << std::endl; + continue; + } + if (rec.row.GetType(0) != fluss::TypeId::Int) { + std::cerr << "ERROR: name-projected field 0 expected Int, got " + << static_cast(rec.row.GetType(0)) << std::endl; + scan_ok = false; + } + if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) { + std::cerr << "ERROR: name-projected field 1 expected TimestampLtz, got " + << static_cast(rec.row.GetType(1)) << std::endl; + scan_ok = false; + } - auto ts = rec.row.GetTimestamp(1); - std::cout << " id=" << rec.row.GetInt32(0) << " updated_at=" << ts.epoch_millis << "+" - << ts.nano_of_millisecond << "ns" << std::endl; + auto ts = rec.row.GetTimestamp(1); + std::cout << " id=" << rec.row.GetInt32(0) << " updated_at=" << ts.epoch_millis << "+" + << ts.nano_of_millisecond << "ns" << std::endl; + } } if (scan_ok) { @@ -356,8 +377,8 @@ int main() { std::unordered_map timestamp_offsets; check("list_timestamp_offsets", - admin.ListOffsets(table_path, all_bucket_ids, - fluss::OffsetSpec::Timestamp(timestamp_ms), timestamp_offsets)); + admin.ListOffsets(table_path, all_bucket_ids, fluss::OffsetSpec::Timestamp(timestamp_ms), + timestamp_offsets)); std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):" << std::endl; for (const auto& [bucket_id, offset] : timestamp_offsets) { std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl; @@ -381,15 +402,21 @@ int main() { fluss::ScanRecords batch_records; check("poll_batch", batch_scanner.Poll(5000, batch_records)); - std::cout << "Scanned " << batch_records.Size() << " records from batch subscription" + std::cout << "Scanned " << batch_records.Count() << " records from batch subscription" << std::endl; - for (size_t i = 0; i < batch_records.Size() && i < 5; ++i) { - const auto& rec = batch_records[i]; - std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id - << ", offset=" << rec.offset << ", timestamp=" << rec.timestamp << std::endl; - } - if (batch_records.Size() > 5) { - std::cout << " ... and " << (batch_records.Size() - 5) << " more records" << std::endl; + for (const auto& tb : batch_records.Buckets()) { + size_t shown = 0; + for (const auto& rec : batch_records.Records(tb)) { + if (shown < 5) { + std::cout << " bucket_id=" << tb.bucket_id << ", offset=" << rec.offset + << ", timestamp=" << rec.timestamp << std::endl; + } + ++shown; + } + if (shown > 5) { + std::cout << " ... and " << (shown - 5) << " more records in bucket " << tb.bucket_id + << std::endl; + } } // 9.1) Unsubscribe from a bucket @@ -520,11 +547,13 @@ int main() { fluss::ScanRecords arrow_write_records; check("poll_arrow_write", arrow_write_scanner.Poll(5000, arrow_write_records)); - std::cout << "Scanned " << arrow_write_records.Size() + std::cout << "Scanned " << arrow_write_records.Count() << " records written via AppendArrowBatch:" << std::endl; - for (const auto& rec : arrow_write_records) { - std::cout << " id=" << rec.row.GetInt32(0) << " name=" << rec.row.GetString(1) - << " score=" << rec.row.GetFloat32(2) << std::endl; + for (const auto& tb : arrow_write_records.Buckets()) { + for (const auto& rec : arrow_write_records.Records(tb)) { + std::cout << " id=" << rec.row.GetInt32(0) << " name=" << rec.row.GetString(1) + << " score=" << rec.row.GetFloat32(2) << std::endl; + } } } @@ -591,11 +620,13 @@ int main() { fluss::ScanRecords decimal_records; check("poll_decimal", decimal_scanner.Poll(5000, decimal_records)); - std::cout << "Scanned decimal records: " << decimal_records.Size() << std::endl; - for (const auto& rec : decimal_records) { - std::cout << " id=" << rec.row.GetInt32(0) << " price=" << rec.row.GetDecimalString(1) - << " amount=" << rec.row.GetDecimalString(2) - << " is_decimal=" << rec.row.IsDecimal(1) << std::endl; + std::cout << "Scanned decimal records: " << decimal_records.Count() << std::endl; + for (const auto& tb : decimal_records.Buckets()) { + for (const auto& rec : decimal_records.Records(tb)) { + std::cout << " id=" << rec.row.GetInt32(0) << " price=" << rec.row.GetDecimalString(1) + << " amount=" << rec.row.GetDecimalString(2) + << " is_decimal=" << rec.row.IsDecimal(1) << std::endl; + } } // 14) Partitioned table example @@ -690,14 +721,15 @@ int main() { fluss::ScanRecords partition_records; check("poll_partitioned", partition_scanner.Poll(5000, partition_records)); - std::cout << "Scanned " << partition_records.Size() << " records from partitioned table" + std::cout << "Scanned " << partition_records.Count() << " records from partitioned table" << std::endl; - for (size_t i = 0; i < partition_records.Size(); ++i) { - const auto& rec = partition_records[i]; - std::cout << " Record " << i << ": partition_id=" - << (rec.partition_id.has_value() ? std::to_string(*rec.partition_id) : "none") - << ", id=" << rec.row.GetInt32(0) << ", region=" << rec.row.GetString(1) - << ", value=" << rec.row.GetInt64(2) << std::endl; + for (const auto& tb : partition_records.Buckets()) { + for (const auto& rec : partition_records.Records(tb)) { + std::cout << " partition_id=" + << (tb.partition_id.has_value() ? std::to_string(*tb.partition_id) : "none") + << ", id=" << rec.row.GetInt32(0) << ", region=" << rec.row.GetString(1) + << ", value=" << rec.row.GetInt64(2) << std::endl; + } } // 14.2) subscribe_partition_buckets: batch subscribe to all partitions at once @@ -717,13 +749,13 @@ int main() { fluss::ScanRecords partition_batch_records; check("poll_partition_batch", partition_batch_scanner.Poll(5000, partition_batch_records)); - std::cout << "Scanned " << partition_batch_records.Size() + std::cout << "Scanned " << partition_batch_records.Count() << " records from batch partition subscription" << std::endl; - for (size_t i = 0; i < partition_batch_records.Size(); ++i) { - const auto& rec = partition_batch_records[i]; - std::cout << " Record " << i << ": id=" << rec.row.GetInt32(0) - << ", region=" << rec.row.GetString(1) << ", value=" << rec.row.GetInt64(2) - << std::endl; + for (const auto& tb : partition_batch_records.Buckets()) { + for (const auto& rec : partition_batch_records.Records(tb)) { + std::cout << " id=" << rec.row.GetInt32(0) << ", region=" << rec.row.GetString(1) + << ", value=" << rec.row.GetInt64(2) << std::endl; + } } // 14.3) UnsubscribePartition: unsubscribe from one partition, verify remaining @@ -743,12 +775,12 @@ int main() { fluss::ScanRecords unsub_records; check("poll_after_unsub", unsub_partition_scanner.Poll(5000, unsub_records)); - std::cout << "After unsubscribe, scanned " << unsub_records.Size() << " records" << std::endl; - for (size_t i = 0; i < unsub_records.Size(); ++i) { - const auto& rec = unsub_records[i]; - std::cout << " Record " << i << ": id=" << rec.row.GetInt32(0) - << ", region=" << rec.row.GetString(1) << ", value=" << rec.row.GetInt64(2) - << std::endl; + std::cout << "After unsubscribe, scanned " << unsub_records.Count() << " records" << std::endl; + for (const auto& tb : unsub_records.Buckets()) { + for (const auto& rec : unsub_records.Records(tb)) { + std::cout << " id=" << rec.row.GetInt32(0) << ", region=" << rec.row.GetString(1) + << ", value=" << rec.row.GetInt64(2) << std::endl; + } } // Cleanup diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 27d1fcb9..9ea7e416 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -507,6 +507,17 @@ struct NamedGetters { private: const Derived& Self() const { return static_cast(*this); } }; + +struct ScanData { + ffi::ScanResultInner* raw; + ColumnMap columns; + + ScanData(ffi::ScanResultInner* r, ColumnMap cols) : raw(r), columns(std::move(cols)) {} + ~ScanData(); + + ScanData(const ScanData&) = delete; + ScanData& operator=(const ScanData&) = delete; +}; } // namespace detail class GenericRow { @@ -623,9 +634,8 @@ class RowView : public detail::NamedGetters { friend struct detail::NamedGetters; public: - RowView(std::shared_ptr inner, size_t record_idx, - std::shared_ptr column_map) - : inner_(std::move(inner)), record_idx_(record_idx), column_map_(std::move(column_map)) {} + RowView(std::shared_ptr data, size_t bucket_idx, size_t rec_idx) + : data_(std::move(data)), bucket_idx_(bucket_idx), rec_idx_(rec_idx) {} // ── Index-based getters ────────────────────────────────────────── size_t FieldCount() const; @@ -660,14 +670,28 @@ class RowView : public detail::NamedGetters { private: size_t Resolve(const std::string& name) const { - if (!column_map_) { + if (!data_) { throw std::runtime_error("RowView: name-based access not available"); } - return detail::ResolveColumn(*column_map_, name); + return detail::ResolveColumn(data_->columns, name); + } + std::shared_ptr data_; + size_t bucket_idx_; + size_t rec_idx_; +}; + +/// Identifies a specific bucket, optionally within a partition. +struct TableBucket { + int64_t table_id; + int32_t bucket_id; + std::optional partition_id; + + bool operator==(const TableBucket& other) const { + return table_id == other.table_id && bucket_id == other.bucket_id && + partition_id == other.partition_id; } - std::shared_ptr inner_; - size_t record_idx_; - std::shared_ptr column_map_; + + bool operator!=(const TableBucket& other) const { return !(*this == other); } }; /// A single scan record. Contains metadata and a RowView for field access. @@ -675,14 +699,61 @@ class RowView : public detail::NamedGetters { /// ScanRecord is a value type that can be freely copied, stored, and /// accumulated across multiple Poll() calls. struct ScanRecord { - int32_t bucket_id; - std::optional partition_id; int64_t offset; int64_t timestamp; ChangeType change_type; RowView row; }; +/// A view into a subset of scan results for a single bucket. +/// +/// BucketView is a value type — it shares ownership of the underlying scan data +/// via reference counting, so it can safely outlive the ScanRecords that produced it. +class BucketView { + public: + BucketView(std::shared_ptr data, TableBucket bucket, size_t bucket_idx, + size_t count) + : data_(std::move(data)), + bucket_(std::move(bucket)), + bucket_idx_(bucket_idx), + count_(count) {} + + /// The bucket these records belong to. + const TableBucket& Bucket() const { return bucket_; } + + /// Number of records in this bucket. + size_t Size() const { return count_; } + bool Empty() const { return count_ == 0; } + + /// Access a record by its position within this bucket (0-based). + ScanRecord operator[](size_t idx) const; + + class Iterator { + public: + ScanRecord operator*() const; + Iterator& operator++() { + ++idx_; + return *this; + } + bool operator!=(const Iterator& other) const { return idx_ != other.idx_; } + + private: + friend class BucketView; + Iterator(const BucketView* owner, size_t idx) : owner_(owner), idx_(idx) {} + const BucketView* owner_; + size_t idx_; + }; + + Iterator begin() const { return Iterator(this, 0); } + Iterator end() const { return Iterator(this, count_); } + + private: + std::shared_ptr data_; + TableBucket bucket_; + size_t bucket_idx_; + size_t count_; +}; + class ScanRecords { public: ScanRecords() noexcept = default; @@ -693,36 +764,52 @@ class ScanRecords { ScanRecords(ScanRecords&&) noexcept = default; ScanRecords& operator=(ScanRecords&&) noexcept = default; - size_t Size() const; - bool Empty() const; - ScanRecord operator[](size_t idx) const; + /// Total number of records across all buckets. + size_t Count() const; + bool IsEmpty() const; + + /// Number of distinct buckets with records. + size_t BucketCount() const; + + /// List of distinct buckets that have records. + std::vector Buckets() const; + + /// Get a view of records for a specific bucket. + /// + /// Returns an empty BucketView if the bucket is not present (matches Rust/Java). + /// Note: O(B) linear scan. For iteration over all buckets, prefer BucketAt(idx). + BucketView Records(const TableBucket& bucket) const; + /// Get a view of records by bucket index (0-based). O(1). + /// + /// Throws std::out_of_range if idx >= BucketCount(). + BucketView BucketAt(size_t idx) const; + + /// Flat iterator over all records across all buckets (matches Java Iterable). class Iterator { public: ScanRecord operator*() const; - Iterator& operator++() { - ++idx_; - return *this; + Iterator& operator++(); + bool operator!=(const Iterator& other) const { + return bucket_idx_ != other.bucket_idx_ || rec_idx_ != other.rec_idx_; } - bool operator!=(const Iterator& other) const { return idx_ != other.idx_; } private: friend class ScanRecords; - Iterator(const ScanRecords* owner, size_t idx) : owner_(owner), idx_(idx) {} + Iterator(const ScanRecords* owner, size_t bucket_idx, size_t rec_idx) + : owner_(owner), bucket_idx_(bucket_idx), rec_idx_(rec_idx) {} const ScanRecords* owner_; - size_t idx_; + size_t bucket_idx_; + size_t rec_idx_; }; - Iterator begin() const { return Iterator(this, 0); } - Iterator end() const { return Iterator(this, Size()); } + Iterator begin() const; + Iterator end() const { return Iterator(this, BucketCount(), 0); } private: - /// Returns the column name-to-index map (lazy-built, cached). - const std::shared_ptr& GetColumnMap() const; friend class LogScanner; - void BuildColumnMap() const; - std::shared_ptr inner_; - mutable std::shared_ptr column_map_; + ScanRecord RecordAt(size_t bucket, size_t rec_idx) const; + std::shared_ptr data_; }; class ArrowRecordBatch { diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index cb29882d..9f987b94 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -141,6 +141,14 @@ mod ffi { timestamp: i64, } + struct FfiBucketInfo { + table_id: i64, + bucket_id: i32, + has_partition_id: bool, + partition_id: i64, + record_count: usize, + } + struct FfiBucketSubscription { bucket_id: i32, offset: i64, @@ -420,27 +428,96 @@ mod ffi { fn sv_column_count(self: &ScanResultInner) -> usize; fn sv_column_name(self: &ScanResultInner, field: usize) -> Result<&str>; fn sv_column_type(self: &ScanResultInner, field: usize) -> Result; - fn sv_bucket_id(self: &ScanResultInner, rec: usize) -> i32; - fn sv_has_partition_id(self: &ScanResultInner, rec: usize) -> bool; - fn sv_partition_id(self: &ScanResultInner, rec: usize) -> i64; - fn sv_offset(self: &ScanResultInner, rec: usize) -> i64; - fn sv_timestamp(self: &ScanResultInner, rec: usize) -> i64; - fn sv_change_type(self: &ScanResultInner, rec: usize) -> i32; + fn sv_offset(self: &ScanResultInner, bucket: usize, rec: usize) -> i64; + fn sv_timestamp(self: &ScanResultInner, bucket: usize, rec: usize) -> i64; + fn sv_change_type(self: &ScanResultInner, bucket: usize, rec: usize) -> i32; fn sv_field_count(self: &ScanResultInner) -> usize; - fn sv_is_null(self: &ScanResultInner, rec: usize, field: usize) -> Result; - fn sv_get_bool(self: &ScanResultInner, rec: usize, field: usize) -> Result; - fn sv_get_i32(self: &ScanResultInner, rec: usize, field: usize) -> Result; - fn sv_get_i64(self: &ScanResultInner, rec: usize, field: usize) -> Result; - fn sv_get_f32(self: &ScanResultInner, rec: usize, field: usize) -> Result; - fn sv_get_f64(self: &ScanResultInner, rec: usize, field: usize) -> Result; - fn sv_get_str(self: &ScanResultInner, rec: usize, field: usize) -> Result<&str>; - fn sv_get_bytes(self: &ScanResultInner, rec: usize, field: usize) -> Result<&[u8]>; - fn sv_get_date_days(self: &ScanResultInner, rec: usize, field: usize) -> Result; - fn sv_get_time_millis(self: &ScanResultInner, rec: usize, field: usize) -> Result; - fn sv_get_ts_millis(self: &ScanResultInner, rec: usize, field: usize) -> Result; - fn sv_get_ts_nanos(self: &ScanResultInner, rec: usize, field: usize) -> Result; - fn sv_is_ts_ltz(self: &ScanResultInner, rec: usize, field: usize) -> Result; - fn sv_get_decimal_str(self: &ScanResultInner, rec: usize, field: usize) -> Result; + fn sv_is_null( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result; + fn sv_get_bool( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result; + fn sv_get_i32( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result; + fn sv_get_i64( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result; + fn sv_get_f32( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result; + fn sv_get_f64( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result; + fn sv_get_str( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result<&str>; + fn sv_get_bytes( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result<&[u8]>; + fn sv_get_date_days( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result; + fn sv_get_time_millis( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result; + fn sv_get_ts_millis( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result; + fn sv_get_ts_nanos( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result; + fn sv_is_ts_ltz( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result; + fn sv_get_decimal_str( + self: &ScanResultInner, + bucket: usize, + rec: usize, + field: usize, + ) -> Result; + + fn sv_bucket_infos(self: &ScanResultInner) -> &Vec; } } @@ -1487,24 +1564,27 @@ impl LogScanner { match result { Ok(records) => { let columns = self.projected_columns.clone(); - // Flatten ScanRecords into a Vec — moves Arc, zero copy - let mut flat = Vec::with_capacity(records.count()); + let mut total_count = 0usize; + let mut buckets = Vec::new(); + let mut bucket_infos = Vec::new(); for (table_bucket, bucket_records) in records.into_records_by_buckets() { - let bucket_id = table_bucket.bucket_id(); - let partition = table_bucket.partition_id(); - for record in bucket_records { - flat.push(FlatScanRecord { - bucket_id, - has_partition_id: partition.is_some(), - partition_id: partition.unwrap_or(0), - record, - }); - } + let count = bucket_records.len(); + total_count += count; + bucket_infos.push(ffi::FfiBucketInfo { + table_id: table_bucket.table_id(), + bucket_id: table_bucket.bucket_id(), + has_partition_id: table_bucket.partition_id().is_some(), + partition_id: table_bucket.partition_id().unwrap_or(0), + record_count: count, + }); + buckets.push((table_bucket, bucket_records)); } Box::new(ScanResultInner { error: None, - records: flat, + buckets, columns, + bucket_infos, + total_count, }) } Err(e) => { @@ -1917,28 +1997,29 @@ mod row_reader { // Opaque types: ScanResultInner (scan read path) // ============================================================================ -struct FlatScanRecord { - bucket_id: i32, - has_partition_id: bool, - partition_id: i64, - record: fcore::record::ScanRecord, -} - pub struct ScanResultInner { error: Option<(i32, String)>, - records: Vec, + buckets: Vec<(fcore::metadata::TableBucket, Vec)>, columns: Vec, + bucket_infos: Vec, + total_count: usize, } impl ScanResultInner { fn from_error(code: i32, msg: String) -> Self { Self { error: Some((code, msg)), - records: Vec::new(), + buckets: Vec::new(), columns: Vec::new(), + bucket_infos: Vec::new(), + total_count: 0, } } + fn resolve(&self, bucket: usize, rec: usize) -> &fcore::record::ScanRecord { + &self.buckets[bucket].1[rec] + } + fn sv_has_error(&self) -> bool { self.error.is_some() } @@ -1952,7 +2033,7 @@ impl ScanResultInner { } fn sv_record_count(&self) -> usize { - self.records.len() + self.total_count } fn sv_column_count(&self) -> usize { @@ -1965,71 +2046,70 @@ impl ScanResultInner { row_reader::column_type(&self.columns, field) } - // Metadata accessors — C++ validates rec in operator[] before calling these. - fn sv_bucket_id(&self, rec: usize) -> i32 { - self.records[rec].bucket_id + fn sv_offset(&self, bucket: usize, rec: usize) -> i64 { + self.resolve(bucket, rec).offset() } - fn sv_has_partition_id(&self, rec: usize) -> bool { - self.records[rec].has_partition_id + fn sv_timestamp(&self, bucket: usize, rec: usize) -> i64 { + self.resolve(bucket, rec).timestamp() } - fn sv_partition_id(&self, rec: usize) -> i64 { - self.records[rec].partition_id - } - fn sv_offset(&self, rec: usize) -> i64 { - self.records[rec].record.offset() - } - fn sv_timestamp(&self, rec: usize) -> i64 { - self.records[rec].record.timestamp() - } - fn sv_change_type(&self, rec: usize) -> i32 { - self.records[rec].record.change_type().to_byte_value() as i32 + fn sv_change_type(&self, bucket: usize, rec: usize) -> i32 { + self.resolve(bucket, rec).change_type().to_byte_value() as i32 } fn sv_field_count(&self) -> usize { self.columns.len() } - // Field accessors — C++ validates rec in operator[], validate() checks field. - fn sv_is_null(&self, rec: usize, field: usize) -> Result { - row_reader::is_null(self.records[rec].record.row(), &self.columns, field) + // Field accessors — C++ validates bounds in BucketView/RecordAt, validate() checks field. + fn sv_is_null(&self, bucket: usize, rec: usize, field: usize) -> Result { + row_reader::is_null(self.resolve(bucket, rec).row(), &self.columns, field) } - fn sv_get_bool(&self, rec: usize, field: usize) -> Result { - row_reader::get_bool(self.records[rec].record.row(), &self.columns, field) + fn sv_get_bool(&self, bucket: usize, rec: usize, field: usize) -> Result { + row_reader::get_bool(self.resolve(bucket, rec).row(), &self.columns, field) } - fn sv_get_i32(&self, rec: usize, field: usize) -> Result { - row_reader::get_i32(self.records[rec].record.row(), &self.columns, field) + fn sv_get_i32(&self, bucket: usize, rec: usize, field: usize) -> Result { + row_reader::get_i32(self.resolve(bucket, rec).row(), &self.columns, field) } - fn sv_get_i64(&self, rec: usize, field: usize) -> Result { - row_reader::get_i64(self.records[rec].record.row(), &self.columns, field) + fn sv_get_i64(&self, bucket: usize, rec: usize, field: usize) -> Result { + row_reader::get_i64(self.resolve(bucket, rec).row(), &self.columns, field) } - fn sv_get_f32(&self, rec: usize, field: usize) -> Result { - row_reader::get_f32(self.records[rec].record.row(), &self.columns, field) + fn sv_get_f32(&self, bucket: usize, rec: usize, field: usize) -> Result { + row_reader::get_f32(self.resolve(bucket, rec).row(), &self.columns, field) } - fn sv_get_f64(&self, rec: usize, field: usize) -> Result { - row_reader::get_f64(self.records[rec].record.row(), &self.columns, field) + fn sv_get_f64(&self, bucket: usize, rec: usize, field: usize) -> Result { + row_reader::get_f64(self.resolve(bucket, rec).row(), &self.columns, field) } - fn sv_get_str(&self, rec: usize, field: usize) -> Result<&str, String> { - row_reader::get_str(self.records[rec].record.row(), &self.columns, field) + fn sv_get_str(&self, bucket: usize, rec: usize, field: usize) -> Result<&str, String> { + row_reader::get_str(self.resolve(bucket, rec).row(), &self.columns, field) } - fn sv_get_bytes(&self, rec: usize, field: usize) -> Result<&[u8], String> { - row_reader::get_bytes(self.records[rec].record.row(), &self.columns, field) + fn sv_get_bytes(&self, bucket: usize, rec: usize, field: usize) -> Result<&[u8], String> { + row_reader::get_bytes(self.resolve(bucket, rec).row(), &self.columns, field) } - fn sv_get_date_days(&self, rec: usize, field: usize) -> Result { - row_reader::get_date_days(self.records[rec].record.row(), &self.columns, field) + fn sv_get_date_days(&self, bucket: usize, rec: usize, field: usize) -> Result { + row_reader::get_date_days(self.resolve(bucket, rec).row(), &self.columns, field) } - fn sv_get_time_millis(&self, rec: usize, field: usize) -> Result { - row_reader::get_time_millis(self.records[rec].record.row(), &self.columns, field) + fn sv_get_time_millis(&self, bucket: usize, rec: usize, field: usize) -> Result { + row_reader::get_time_millis(self.resolve(bucket, rec).row(), &self.columns, field) } - fn sv_get_ts_millis(&self, rec: usize, field: usize) -> Result { - row_reader::get_ts_millis(self.records[rec].record.row(), &self.columns, field) + fn sv_get_ts_millis(&self, bucket: usize, rec: usize, field: usize) -> Result { + row_reader::get_ts_millis(self.resolve(bucket, rec).row(), &self.columns, field) } - fn sv_get_ts_nanos(&self, rec: usize, field: usize) -> Result { - row_reader::get_ts_nanos(self.records[rec].record.row(), &self.columns, field) + fn sv_get_ts_nanos(&self, bucket: usize, rec: usize, field: usize) -> Result { + row_reader::get_ts_nanos(self.resolve(bucket, rec).row(), &self.columns, field) } - fn sv_is_ts_ltz(&self, _rec: usize, field: usize) -> Result { + fn sv_is_ts_ltz(&self, _bucket: usize, _rec: usize, field: usize) -> Result { row_reader::is_ts_ltz(&self.columns, field) } - fn sv_get_decimal_str(&self, rec: usize, field: usize) -> Result { - row_reader::get_decimal_str(self.records[rec].record.row(), &self.columns, field) + fn sv_get_decimal_str( + &self, + bucket: usize, + rec: usize, + field: usize, + ) -> Result { + row_reader::get_decimal_str(self.resolve(bucket, rec).row(), &self.columns, field) + } + + fn sv_bucket_infos(&self) -> &Vec { + &self.bucket_infos } } diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp index a697cea0..73035bb9 100644 --- a/bindings/cpp/src/table.cpp +++ b/bindings/cpp/src/table.cpp @@ -191,75 +191,91 @@ void GenericRow::SetDecimal(size_t idx, const std::string& value) { inner_->gr_set_decimal_str(idx, value); } +// ============================================================================ +// ScanData — destructor must live in .cpp where rust::Box is visible +// ============================================================================ + +detail::ScanData::~ScanData() { + if (raw) { + rust::Box::from_raw(raw); + } +} + // ============================================================================ // RowView — zero-copy read-only row view for scan results // ============================================================================ -size_t RowView::FieldCount() const { return inner_ ? inner_->sv_field_count() : 0; } +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +#define CHECK_DATA(name) \ + do { \ + if (!data_) throw std::logic_error(name ": not available (moved-from or null)"); \ + } while (0) + +size_t RowView::FieldCount() const { return data_ ? data_->raw->sv_field_count() : 0; } TypeId RowView::GetType(size_t idx) const { - CHECK_INNER("RowView"); - return static_cast(inner_->sv_column_type(idx)); + CHECK_DATA("RowView"); + return static_cast(data_->raw->sv_column_type(idx)); } bool RowView::IsNull(size_t idx) const { - CHECK_INNER("RowView"); - return inner_->sv_is_null(record_idx_, idx); + CHECK_DATA("RowView"); + return data_->raw->sv_is_null(bucket_idx_, rec_idx_, idx); } bool RowView::GetBool(size_t idx) const { - CHECK_INNER("RowView"); - return inner_->sv_get_bool(record_idx_, idx); + CHECK_DATA("RowView"); + return data_->raw->sv_get_bool(bucket_idx_, rec_idx_, idx); } int32_t RowView::GetInt32(size_t idx) const { - CHECK_INNER("RowView"); - return inner_->sv_get_i32(record_idx_, idx); + CHECK_DATA("RowView"); + return data_->raw->sv_get_i32(bucket_idx_, rec_idx_, idx); } int64_t RowView::GetInt64(size_t idx) const { - CHECK_INNER("RowView"); - return inner_->sv_get_i64(record_idx_, idx); + CHECK_DATA("RowView"); + return data_->raw->sv_get_i64(bucket_idx_, rec_idx_, idx); } float RowView::GetFloat32(size_t idx) const { - CHECK_INNER("RowView"); - return inner_->sv_get_f32(record_idx_, idx); + CHECK_DATA("RowView"); + return data_->raw->sv_get_f32(bucket_idx_, rec_idx_, idx); } double RowView::GetFloat64(size_t idx) const { - CHECK_INNER("RowView"); - return inner_->sv_get_f64(record_idx_, idx); + CHECK_DATA("RowView"); + return data_->raw->sv_get_f64(bucket_idx_, rec_idx_, idx); } std::string_view RowView::GetString(size_t idx) const { - CHECK_INNER("RowView"); - auto s = inner_->sv_get_str(record_idx_, idx); + CHECK_DATA("RowView"); + auto s = data_->raw->sv_get_str(bucket_idx_, rec_idx_, idx); return std::string_view(s.data(), s.size()); } std::pair RowView::GetBytes(size_t idx) const { - CHECK_INNER("RowView"); - auto bytes = inner_->sv_get_bytes(record_idx_, idx); + CHECK_DATA("RowView"); + auto bytes = data_->raw->sv_get_bytes(bucket_idx_, rec_idx_, idx); return {bytes.data(), bytes.size()}; } Date RowView::GetDate(size_t idx) const { - CHECK_INNER("RowView"); - return Date{inner_->sv_get_date_days(record_idx_, idx)}; + CHECK_DATA("RowView"); + return Date{data_->raw->sv_get_date_days(bucket_idx_, rec_idx_, idx)}; } Time RowView::GetTime(size_t idx) const { - CHECK_INNER("RowView"); - return Time{inner_->sv_get_time_millis(record_idx_, idx)}; + CHECK_DATA("RowView"); + return Time{data_->raw->sv_get_time_millis(bucket_idx_, rec_idx_, idx)}; } Timestamp RowView::GetTimestamp(size_t idx) const { - CHECK_INNER("RowView"); - return Timestamp{inner_->sv_get_ts_millis(record_idx_, idx), - inner_->sv_get_ts_nanos(record_idx_, idx)}; + CHECK_DATA("RowView"); + return Timestamp{data_->raw->sv_get_ts_millis(bucket_idx_, rec_idx_, idx), + data_->raw->sv_get_ts_nanos(bucket_idx_, rec_idx_, idx)}; } bool RowView::IsDecimal(size_t idx) const { return GetType(idx) == TypeId::Decimal; } std::string RowView::GetDecimalString(size_t idx) const { - CHECK_INNER("RowView"); - return std::string(inner_->sv_get_decimal_str(record_idx_, idx)); + CHECK_DATA("RowView"); + return std::string(data_->raw->sv_get_decimal_str(bucket_idx_, rec_idx_, idx)); } // ============================================================================ @@ -268,48 +284,94 @@ std::string RowView::GetDecimalString(size_t idx) const { // ScanRecords constructor, destructor, move operations are all defaulted in the header. -size_t ScanRecords::Size() const { return inner_ ? inner_->sv_record_count() : 0; } +size_t ScanRecords::Count() const { return data_ ? data_->raw->sv_record_count() : 0; } -bool ScanRecords::Empty() const { return Size() == 0; } +bool ScanRecords::IsEmpty() const { return Count() == 0; } -void ScanRecords::BuildColumnMap() const { - if (!inner_) return; - auto map = std::make_shared(); - auto count = inner_->sv_column_count(); - for (size_t i = 0; i < count; ++i) { - auto name = inner_->sv_column_name(i); - (*map)[std::string(name.data(), name.size())] = { - i, static_cast(inner_->sv_column_type(i))}; +ScanRecord ScanRecords::RecordAt(size_t bucket, size_t rec_idx) const { + if (!data_) { + throw std::logic_error("ScanRecords: not available (moved-from or null)"); } - column_map_ = std::move(map); + return ScanRecord{data_->raw->sv_offset(bucket, rec_idx), + data_->raw->sv_timestamp(bucket, rec_idx), + static_cast(data_->raw->sv_change_type(bucket, rec_idx)), + RowView(data_, bucket, rec_idx)}; +} + +static TableBucket to_table_bucket(const ffi::FfiBucketInfo& g) { + return TableBucket{g.table_id, g.bucket_id, + g.has_partition_id ? std::optional(g.partition_id) : std::nullopt}; +} + +size_t ScanRecords::BucketCount() const { return data_ ? data_->raw->sv_bucket_infos().size() : 0; } + +ScanRecord ScanRecords::Iterator::operator*() const { + return owner_->RecordAt(bucket_idx_, rec_idx_); } -const std::shared_ptr& ScanRecords::GetColumnMap() const { - if (!column_map_) { - BuildColumnMap(); +ScanRecords::Iterator ScanRecords::begin() const { return Iterator(this, 0, 0); } + +ScanRecords::Iterator& ScanRecords::Iterator::operator++() { + ++rec_idx_; + if (owner_->data_) { + const auto& infos = owner_->data_->raw->sv_bucket_infos(); + while (bucket_idx_ < infos.size() && rec_idx_ >= infos[bucket_idx_].record_count) { + rec_idx_ = 0; + ++bucket_idx_; + } } - return column_map_; + return *this; +} + +std::vector ScanRecords::Buckets() const { + std::vector result; + if (!data_) return result; + const auto& infos = data_->raw->sv_bucket_infos(); + result.reserve(infos.size()); + for (const auto& g : infos) { + result.push_back(to_table_bucket(g)); + } + return result; } -ScanRecord ScanRecords::operator[](size_t idx) const { - if (!inner_) { +BucketView ScanRecords::Records(const TableBucket& bucket) const { + if (!data_) { + return BucketView({}, bucket, 0, 0); + } + const auto& infos = data_->raw->sv_bucket_infos(); + for (size_t i = 0; i < infos.size(); ++i) { + TableBucket tb = to_table_bucket(infos[i]); + if (tb == bucket) { + return BucketView(data_, std::move(tb), i, infos[i].record_count); + } + } + return BucketView({}, bucket, 0, 0); +} + +BucketView ScanRecords::BucketAt(size_t idx) const { + if (!data_) { throw std::logic_error("ScanRecords: not available (moved-from or null)"); } - if (idx >= inner_->sv_record_count()) { - throw std::out_of_range("ScanRecords: index " + std::to_string(idx) + " out of range (" + - std::to_string(inner_->sv_record_count()) + " records)"); + const auto& infos = data_->raw->sv_bucket_infos(); + if (idx >= infos.size()) { + throw std::out_of_range("ScanRecords::BucketAt: index " + std::to_string(idx) + + " out of range (" + std::to_string(infos.size()) + " buckets)"); } - return ScanRecord{inner_->sv_bucket_id(idx), - inner_->sv_has_partition_id(idx) - ? std::optional(inner_->sv_partition_id(idx)) - : std::nullopt, - inner_->sv_offset(idx), - inner_->sv_timestamp(idx), - static_cast(inner_->sv_change_type(idx)), - RowView(inner_, idx, GetColumnMap())}; + return BucketView(data_, to_table_bucket(infos[idx]), idx, infos[idx].record_count); } -ScanRecord ScanRecords::Iterator::operator*() const { return owner_->operator[](idx_); } +ScanRecord BucketView::operator[](size_t idx) const { + if (idx >= count_) { + throw std::out_of_range("BucketView: index " + std::to_string(idx) + " out of range (" + + std::to_string(count_) + " records)"); + } + return ScanRecord{data_->raw->sv_offset(bucket_idx_, idx), + data_->raw->sv_timestamp(bucket_idx_, idx), + static_cast(data_->raw->sv_change_type(bucket_idx_, idx)), + RowView(data_, bucket_idx_, idx)}; +} + +ScanRecord BucketView::Iterator::operator*() const { return owner_->operator[](idx_); } // ============================================================================ // LookupResult — backed by opaque Rust LookupResultInner @@ -1082,10 +1144,16 @@ Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) { std::string(result_box->sv_error_message())); } - out.column_map_.reset(); - out.inner_ = std::shared_ptr( - result_box.into_raw(), - [](ffi::ScanResultInner* p) { rust::Box::from_raw(p); }); + // Wrap raw pointer in ScanData immediately so it's never leaked on exception. + auto data = std::make_shared(result_box.into_raw(), detail::ColumnMap{}); + // Build column map eagerly — shared by all RowViews/BucketViews. + auto col_count = data->raw->sv_column_count(); + for (size_t i = 0; i < col_count; ++i) { + auto name = data->raw->sv_column_name(i); + data->columns[std::string(name.data(), name.size())] = { + i, static_cast(data->raw->sv_column_type(i))}; + } + out.data_ = std::move(data); return utils::make_ok(); } diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 804e1bbc..9cf20e3d 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -37,3 +37,4 @@ arrow-array = "57.0.0" pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] } jiff = { workspace = true } bigdecimal = "0.4" +indexmap = "2" diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 9c2b7e30..3564d91c 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -351,21 +351,26 @@ async def main(): record_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - # Poll returns List[ScanRecord] with per-record metadata + # Poll returns ScanRecords — records grouped by bucket print("\n--- Testing poll() method (record-by-record) ---") try: - records = record_scanner.poll(5000) - print(f"Number of records: {len(records)}") - - # Show first few records with metadata - for i, record in enumerate(records[:5]): - print(f" Record {i}: offset={record.offset}, " - f"timestamp={record.timestamp}, " - f"change_type={record.change_type}, " - f"row={record.row}") - - if len(records) > 5: - print(f" ... and {len(records) - 5} more records") + scan_records = record_scanner.poll(5000) + print(f"Total records: {scan_records.count()}, buckets: {len(scan_records.buckets())}") + + # Flat iteration over all records (regardless of bucket) + print(f" Flat iteration: {scan_records.count()} records") + for record in scan_records: + print(f" offset={record.offset}, timestamp={record.timestamp}") + + # Per-bucket access + for bucket in scan_records.buckets(): + bucket_recs = scan_records.records(bucket) + print(f" Bucket {bucket}: {len(bucket_recs)} records") + for record in bucket_recs[:3]: + print(f" offset={record.offset}, " + f"timestamp={record.timestamp}, " + f"change_type={record.change_type}, " + f"row={record.row}") except Exception as e: print(f"Error during poll: {e}") diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 47eeb808..4b7fa4e8 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -19,7 +19,7 @@ from enum import IntEnum from types import TracebackType -from typing import Dict, List, Optional, Tuple +from typing import Dict, Iterator, List, Optional, Tuple, Union, overload import pandas as pd import pyarrow as pa @@ -43,12 +43,12 @@ class ChangeType(IntEnum): ... class ScanRecord: - """Represents a single scan record with metadata.""" + """Represents a single scan record with metadata. + + The bucket is the key in ScanRecords, not on the individual record + (matches Rust/Java). + """ - @property - def bucket(self) -> TableBucket: - """The bucket this record belongs to.""" - ... @property def offset(self) -> int: """The position of this record in the log.""" @@ -90,6 +90,47 @@ class RecordBatch: def __str__(self) -> str: ... def __repr__(self) -> str: ... +class ScanRecords: + """A collection of scan records grouped by bucket. + + Returned by ``LogScanner.poll()``. Supports flat iteration + (``for rec in records``) and per-bucket access (``records.records(bucket)``). + """ + + def buckets(self) -> List[TableBucket]: + """List of distinct buckets that have records.""" + ... + def records(self, bucket: TableBucket) -> List[ScanRecord]: + """Get records for a specific bucket. Returns empty list if bucket not present.""" + ... + def count(self) -> int: + """Total number of records across all buckets.""" + ... + def is_empty(self) -> bool: + """Whether the result set is empty.""" + ... + def keys(self) -> List[TableBucket]: + """Mapping protocol: alias for ``buckets()``.""" + ... + def values(self) -> Iterator[List[ScanRecord]]: + """Mapping protocol: lazy iterator over record lists, one per bucket.""" + ... + def items(self) -> Iterator[Tuple[TableBucket, List[ScanRecord]]]: + """Mapping protocol: lazy iterator over ``(bucket, records)`` pairs.""" + ... + def __len__(self) -> int: ... + @overload + def __getitem__(self, index: int) -> ScanRecord: ... + @overload + def __getitem__(self, index: slice) -> List[ScanRecord]: ... + @overload + def __getitem__(self, bucket: TableBucket) -> List[ScanRecord]: ... + def __getitem__(self, key: Union[int, slice, TableBucket]) -> Union[ScanRecord, List[ScanRecord]]: ... + def __contains__(self, bucket: TableBucket) -> bool: ... + def __iter__(self) -> Iterator[ScanRecord]: ... + def __str__(self) -> str: ... + def __repr__(self) -> str: ... + class Config: def __init__(self, properties: Optional[Dict[str, str]] = None) -> None: ... @property @@ -590,7 +631,7 @@ class LogScanner: bucket_id: The bucket ID within the partition """ ... - def poll(self, timeout_ms: int) -> List[ScanRecord]: + def poll(self, timeout_ms: int) -> ScanRecords: """Poll for individual records with metadata. Requires a record-based scanner (created with new_scan().create_log_scanner()). @@ -599,11 +640,12 @@ class LogScanner: timeout_ms: Timeout in milliseconds to wait for records. Returns: - List of ScanRecord objects, each containing bucket, offset, timestamp, - change_type, and row data as a dictionary. + ScanRecords grouped by bucket. Supports flat iteration + (``for rec in records``) and per-bucket access + (``records.buckets()``, ``records.records(bucket)``). Note: - Returns an empty list if no records are available or timeout expires. + Returns an empty ScanRecords if no records are available or timeout expires. """ ... def poll_record_batch(self, timeout_ms: int) -> List[RecordBatch]: diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 553c8a92..ebc0d54c 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -122,6 +122,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index c3ea248e..bc2e956c 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -22,7 +22,14 @@ use arrow_pyarrow::{FromPyArrow, ToPyArrow}; use arrow_schema::SchemaRef; use fluss::record::to_arrow_schema; use fluss::rpc::message::OffsetSpec; -use pyo3::types::IntoPyDict; +use indexmap::IndexMap; +use pyo3::exceptions::{PyIndexError, PyRuntimeError, PyTypeError}; +use pyo3::sync::PyOnceLock; +use pyo3::types::{ + IntoPyDict, PyBool, PyByteArray, PyBytes, PyDate, PyDateAccess, PyDateTime, PyDelta, + PyDeltaAccess, PyDict, PyList, PySequence, PySlice, PyTime, PyTimeAccess, PyTuple, PyType, + PyTzInfo, +}; use pyo3_async_runtimes::tokio::future_into_py; use std::collections::HashMap; use std::sync::Arc; @@ -38,11 +45,12 @@ const MICROS_PER_DAY: i64 = 86_400_000_000; const NANOS_PER_MILLI: i64 = 1_000_000; const NANOS_PER_MICRO: i64 = 1_000; -/// Represents a single scan record with metadata +/// Represents a single scan record with metadata. +/// +/// Matches Rust/Java: offset, timestamp, change_type, row. +/// The bucket is the key in ScanRecords, not on the individual record. #[pyclass] pub struct ScanRecord { - #[pyo3(get)] - bucket: TableBucket, #[pyo3(get)] offset: i64, #[pyo3(get)] @@ -50,21 +58,20 @@ pub struct ScanRecord { #[pyo3(get)] change_type: ChangeType, /// Store row as a Python dict directly - row_dict: Py, + row_dict: Py, } #[pymethods] impl ScanRecord { /// Get the row data as a dictionary #[getter] - pub fn row(&self, py: Python) -> Py { + pub fn row(&self, py: Python) -> Py { self.row_dict.clone_ref(py) } fn __str__(&self) -> String { format!( - "ScanRecord(bucket={}, offset={}, timestamp={}, change_type={})", - self.bucket.__str__(), + "ScanRecord(offset={}, timestamp={}, change_type={})", self.offset, self.timestamp, self.change_type.short_string() @@ -80,13 +87,12 @@ impl ScanRecord { /// Create a ScanRecord from core types pub fn from_core( py: Python, - bucket: &fcore::metadata::TableBucket, record: &fcore::record::ScanRecord, row_type: &fcore::metadata::RowType, ) -> PyResult { let fields = row_type.fields(); let row = record.row(); - let dict = pyo3::types::PyDict::new(py); + let dict = PyDict::new(py); for (pos, field) in fields.iter().enumerate() { let value = datum_to_python_value(py, row, pos, field.data_type())?; @@ -94,7 +100,6 @@ impl ScanRecord { } Ok(ScanRecord { - bucket: TableBucket::from_core(bucket.clone()), offset: record.offset(), timestamp: record.timestamp(), change_type: ChangeType::from_core(*record.change_type()), @@ -155,6 +160,247 @@ impl RecordBatch { } } +/// A collection of scan records grouped by bucket. +/// +/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`. +#[pyclass] +pub struct ScanRecords { + records_by_bucket: IndexMap>>, + total_count: usize, +} + +#[pymethods] +impl ScanRecords { + /// List of distinct buckets that have records in this result. + pub fn buckets(&self) -> Vec { + self.records_by_bucket.keys().cloned().collect() + } + + /// Get records for a specific bucket. + /// + /// Returns an empty list if the bucket is not present (matches Rust/Java behavior). + pub fn records(&self, py: Python, bucket: &TableBucket) -> Vec> { + self.records_by_bucket + .get(bucket) + .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect()) + .unwrap_or_default() + } + + /// Total number of records across all buckets. + pub fn count(&self) -> usize { + self.total_count + } + + /// Whether the result set is empty. + pub fn is_empty(&self) -> bool { + self.total_count == 0 + } + + fn __len__(&self) -> usize { + self.total_count + } + + /// Type-dispatched indexing: + /// records[0] → ScanRecord (flat index) + /// records[-1] → ScanRecord (negative index) + /// records[1:3] → list[ScanRecord] (slice) + /// records[bucket] → list[ScanRecord] (by bucket) + fn __getitem__(&self, py: Python, key: &Bound<'_, PyAny>) -> PyResult> { + // Try integer index first + if let Ok(mut idx) = key.extract::() { + let len = self.total_count as isize; + if idx < 0 { + idx += len; + } + if idx < 0 || idx >= len { + return Err(PyIndexError::new_err(format!( + "index {idx} out of range for ScanRecords of size {len}" + ))); + } + let idx = idx as usize; + let mut offset = 0; + for recs in self.records_by_bucket.values() { + if idx < offset + recs.len() { + return Ok(recs[idx - offset].clone_ref(py).into_any()); + } + offset += recs.len(); + } + return Err(PyRuntimeError::new_err( + "internal error: total_count out of sync with records", + )); + } + // Try slice + if let Ok(slice) = key.downcast::() { + let indices = slice.indices(self.total_count as isize)?; + let mut result: Vec> = Vec::new(); + let mut i = indices.start; + while (indices.step > 0 && i < indices.stop) || (indices.step < 0 && i > indices.stop) { + let idx = i as usize; + let mut offset = 0; + for recs in self.records_by_bucket.values() { + if idx < offset + recs.len() { + result.push(recs[idx - offset].clone_ref(py)); + break; + } + offset += recs.len(); + } + i += indices.step; + } + return Ok(result.into_pyobject(py).unwrap().into_any().unbind()); + } + // Try TableBucket + if let Ok(bucket) = key.extract::() { + let recs = self.records(py, &bucket); + return Ok(recs.into_pyobject(py).unwrap().into_any().unbind()); + } + Err(PyTypeError::new_err( + "index must be int, slice, or TableBucket", + )) + } + + /// Support `bucket in records`. + fn __contains__(&self, bucket: &TableBucket) -> bool { + self.records_by_bucket.contains_key(bucket) + } + + /// Mapping protocol: alias for `buckets()`. + pub fn keys(&self) -> Vec { + self.buckets() + } + + /// Mapping protocol: lazy iterator over record lists, one per bucket. + pub fn values(slf: Bound<'_, Self>) -> ScanRecordsBucketIter { + let this = slf.borrow(); + let bucket_keys: Vec = this.records_by_bucket.keys().cloned().collect(); + drop(this); + ScanRecordsBucketIter { + owner: slf.unbind(), + bucket_keys, + bucket_idx: 0, + with_keys: false, + } + } + + /// Mapping protocol: lazy iterator over `(TableBucket, list[ScanRecord])` pairs. + pub fn items(slf: Bound<'_, Self>) -> ScanRecordsBucketIter { + let this = slf.borrow(); + let bucket_keys: Vec = this.records_by_bucket.keys().cloned().collect(); + drop(this); + ScanRecordsBucketIter { + owner: slf.unbind(), + bucket_keys, + bucket_idx: 0, + with_keys: true, + } + } + + fn __str__(&self) -> String { + format!( + "ScanRecords(records={}, buckets={})", + self.total_count, + self.records_by_bucket.len() + ) + } + + fn __repr__(&self) -> String { + self.__str__() + } + + /// Flat iterator over all records across all buckets (matches Java/Rust). + fn __iter__(slf: Bound<'_, Self>) -> ScanRecordsIter { + let this = slf.borrow(); + let bucket_keys: Vec = this.records_by_bucket.keys().cloned().collect(); + drop(this); + ScanRecordsIter { + owner: slf.unbind(), + bucket_keys, + bucket_idx: 0, + rec_idx: 0, + } + } +} + +#[pyclass] +struct ScanRecordsIter { + owner: Py, + bucket_keys: Vec, + bucket_idx: usize, + rec_idx: usize, +} + +#[pymethods] +impl ScanRecordsIter { + fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + + fn __next__(&mut self, py: Python) -> Option> { + let owner = self.owner.borrow(py); + loop { + if self.bucket_idx >= self.bucket_keys.len() { + return None; + } + let bucket = &self.bucket_keys[self.bucket_idx]; + if let Some(recs) = owner.records_by_bucket.get(bucket) { + if self.rec_idx < recs.len() { + let rec = recs[self.rec_idx].clone_ref(py); + self.rec_idx += 1; + return Some(rec); + } + } + self.bucket_idx += 1; + self.rec_idx = 0; + } + } +} + +/// Lazy iterator for `ScanRecords.items()` and `ScanRecords.values()`. +/// +/// Yields one bucket at a time: `(TableBucket, list[ScanRecord])` for items, +/// or `list[ScanRecord]` for values. Only materializes records for the +/// current bucket on each `__next__` call. +#[pyclass] +pub struct ScanRecordsBucketIter { + owner: Py, + bucket_keys: Vec, + bucket_idx: usize, + with_keys: bool, +} + +#[pymethods] +impl ScanRecordsBucketIter { + fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + + fn __next__(&mut self, py: Python) -> Option> { + if self.bucket_idx >= self.bucket_keys.len() { + return None; + } + let bucket = &self.bucket_keys[self.bucket_idx]; + let owner = self.owner.borrow(py); + let recs = owner + .records_by_bucket + .get(bucket) + .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect::>()) + .unwrap_or_default(); + let bucket = bucket.clone(); + self.bucket_idx += 1; + + if self.with_keys { + Some( + (bucket, recs) + .into_pyobject(py) + .unwrap() + .into_any() + .unbind(), + ) + } else { + Some(recs.into_pyobject(py).unwrap().into_any().unbind()) + } + } +} + /// Represents a Fluss table for data operations #[pyclass] pub struct FlussTable { @@ -763,9 +1009,9 @@ impl AppendWriter { /// Represents different input shapes for a row #[derive(FromPyObject)] enum RowInput<'py> { - Dict(Bound<'py, pyo3::types::PyDict>), - Tuple(Bound<'py, pyo3::types::PyTuple>), - List(Bound<'py, pyo3::types::PyList>), + Dict(Bound<'py, PyDict>), + Tuple(Bound<'py, PyTuple>), + List(Bound<'py, PyList>), } /// Convert Python row (dict/list/tuple) to GenericRow requiring all schema columns. @@ -779,7 +1025,7 @@ pub fn python_to_generic_row( /// Process a Python sequence (list or tuple) into datums at the target column positions. fn process_sequence( - seq: &Bound, + seq: &Bound, target_indices: &[usize], fields: &[fcore::metadata::DataField], datums: &mut [fcore::row::Datum<'static>], @@ -924,7 +1170,7 @@ fn python_value_to_datum( } fcore::metadata::DataType::TinyInt(_) => { // Strict type checking: reject bool for int columns - if value.is_instance_of::() { + if value.is_instance_of::() { return Err(FlussError::new_err( "Expected int for TinyInt column, got bool. Use 0 or 1 explicitly.".to_string(), )); @@ -933,7 +1179,7 @@ fn python_value_to_datum( Ok(Datum::Int8(v)) } fcore::metadata::DataType::SmallInt(_) => { - if value.is_instance_of::() { + if value.is_instance_of::() { return Err(FlussError::new_err( "Expected int for SmallInt column, got bool. Use 0 or 1 explicitly." .to_string(), @@ -943,7 +1189,7 @@ fn python_value_to_datum( Ok(Datum::Int16(v)) } fcore::metadata::DataType::Int(_) => { - if value.is_instance_of::() { + if value.is_instance_of::() { return Err(FlussError::new_err( "Expected int for Int column, got bool. Use 0 or 1 explicitly.".to_string(), )); @@ -952,7 +1198,7 @@ fn python_value_to_datum( Ok(Datum::Int32(v)) } fcore::metadata::DataType::BigInt(_) => { - if value.is_instance_of::() { + if value.is_instance_of::() { return Err(FlussError::new_err( "Expected int for BigInt column, got bool. Use 0 or 1 explicitly.".to_string(), )); @@ -975,9 +1221,9 @@ fn python_value_to_datum( fcore::metadata::DataType::Bytes(_) | fcore::metadata::DataType::Binary(_) => { // Efficient extraction: downcast to specific type and use bulk copy. // PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk copies of the underlying data. - if let Ok(bytes) = value.downcast::() { + if let Ok(bytes) = value.downcast::() { Ok(bytes.as_bytes().to_vec().into()) - } else if let Ok(bytearray) = value.downcast::() { + } else if let Ok(bytearray) = value.downcast::() { Ok(bytearray.to_vec().into()) } else { Err(FlussError::new_err(format!( @@ -1067,11 +1313,11 @@ pub fn datum_to_python_value( } DataType::Bytes(_) => { let b = row.get_bytes(pos); - Ok(pyo3::types::PyBytes::new(py, b).into_any().unbind()) + Ok(PyBytes::new(py, b).into_any().unbind()) } DataType::Binary(binary_type) => { let b = row.get_binary(pos, binary_type.length()); - Ok(pyo3::types::PyBytes::new(py, b).into_any().unbind()) + Ok(PyBytes::new(py, b).into_any().unbind()) } DataType::Decimal(decimal_type) => { let decimal = row.get_decimal( @@ -1113,8 +1359,6 @@ fn rust_decimal_to_python(py: Python, decimal: &fcore::row::Decimal) -> PyResult /// Convert Rust Date (days since epoch) to Python datetime.date fn rust_date_to_python(py: Python, date: fcore::row::Date) -> PyResult> { - use pyo3::types::PyDate; - let days_since_epoch = date.get_inner(); let epoch = jiff::civil::date(1970, 1, 1); let civil_date = epoch + jiff::Span::new().days(days_since_epoch as i64); @@ -1130,8 +1374,6 @@ fn rust_date_to_python(py: Python, date: fcore::row::Date) -> PyResult /// Convert Rust Time (millis since midnight) to Python datetime.time fn rust_time_to_python(py: Python, time: fcore::row::Time) -> PyResult> { - use pyo3::types::PyTime; - let millis = time.get_inner() as i64; let hours = millis / MILLIS_PER_HOUR; let minutes = (millis % MILLIS_PER_HOUR) / MILLIS_PER_MINUTE; @@ -1151,8 +1393,6 @@ fn rust_time_to_python(py: Python, time: fcore::row::Time) -> PyResult /// Convert Rust TimestampNtz to Python naive datetime fn rust_timestamp_ntz_to_python(py: Python, ts: fcore::row::TimestampNtz) -> PyResult> { - use pyo3::types::PyDateTime; - let millis = ts.get_millisecond(); let nanos = ts.get_nano_of_millisecond(); let total_micros = millis * MICROS_PER_MILLI + (nanos as i64 / NANOS_PER_MICRO); @@ -1178,8 +1418,6 @@ fn rust_timestamp_ntz_to_python(py: Python, ts: fcore::row::TimestampNtz) -> PyR /// Convert Rust TimestampLtz to Python timezone-aware datetime (UTC) fn rust_timestamp_ltz_to_python(py: Python, ts: fcore::row::TimestampLtz) -> PyResult> { - use pyo3::types::PyDateTime; - let millis = ts.get_epoch_millisecond(); let nanos = ts.get_nano_of_millisecond(); let total_micros = millis * MICROS_PER_MILLI + (nanos as i64 / NANOS_PER_MICRO); @@ -1212,7 +1450,7 @@ pub fn internal_row_to_dict( ) -> PyResult> { let row_type = table_info.row_type(); let fields = row_type.fields(); - let dict = pyo3::types::PyDict::new(py); + let dict = PyDict::new(py); for (pos, field) in fields.iter().enumerate() { let value = datum_to_python_value(py, row, pos, field.data_type())?; @@ -1224,29 +1462,26 @@ pub fn internal_row_to_dict( /// Cached decimal.Decimal type /// Uses PyOnceLock for thread-safety and subinterpreter compatibility. -static DECIMAL_TYPE: pyo3::sync::PyOnceLock> = - pyo3::sync::PyOnceLock::new(); +static DECIMAL_TYPE: PyOnceLock> = PyOnceLock::new(); /// Cached UTC timezone -static UTC_TIMEZONE: pyo3::sync::PyOnceLock> = pyo3::sync::PyOnceLock::new(); +static UTC_TIMEZONE: PyOnceLock> = PyOnceLock::new(); /// Cached UTC epoch type -static UTC_EPOCH: pyo3::sync::PyOnceLock> = pyo3::sync::PyOnceLock::new(); +static UTC_EPOCH: PyOnceLock> = PyOnceLock::new(); /// Get the cached decimal.Decimal type, importing it once per interpreter. -fn get_decimal_type(py: Python) -> PyResult> { +fn get_decimal_type(py: Python) -> PyResult> { let ty = DECIMAL_TYPE.get_or_try_init(py, || -> PyResult<_> { let decimal_mod = py.import("decimal")?; - let decimal_ty = decimal_mod - .getattr("Decimal")? - .downcast_into::()?; + let decimal_ty = decimal_mod.getattr("Decimal")?.downcast_into::()?; Ok(decimal_ty.unbind()) })?; Ok(ty.bind(py).clone()) } /// Get the cached UTC timezone (datetime.timezone.utc), creating it once per interpreter. -fn get_utc_timezone(py: Python) -> PyResult> { +fn get_utc_timezone(py: Python) -> PyResult> { let tz = UTC_TIMEZONE.get_or_try_init(py, || -> PyResult<_> { let datetime_mod = py.import("datetime")?; let timezone = datetime_mod.getattr("timezone")?; @@ -1254,10 +1489,7 @@ fn get_utc_timezone(py: Python) -> PyResult> { Ok(utc.unbind()) })?; // Downcast to PyTzInfo for use with PyDateTime::new() - Ok(tz - .bind(py) - .clone() - .downcast_into::()?) + Ok(tz.bind(py).clone().downcast_into::()?) } /// Get the cached UTC epoch datetime, creating it once per interpreter. @@ -1313,8 +1545,6 @@ fn python_decimal_to_datum( /// Convert Python datetime.date to Datum::Date. fn python_date_to_datum(value: &Bound) -> PyResult> { - use pyo3::types::{PyDate, PyDateAccess, PyDateTime}; - // Reject datetime.datetime (subclass of date) - use timestamp columns for those if value.downcast::().is_ok() { return Err(FlussError::new_err( @@ -1351,8 +1581,6 @@ fn python_date_to_datum(value: &Bound) -> PyResult) -> PyResult> { - use pyo3::types::{PyTime, PyTimeAccess}; - let time = value.downcast::().map_err(|_| { FlussError::new_err(format!( "Expected datetime.time, got {}", @@ -1411,8 +1639,6 @@ fn python_datetime_to_timestamp_ltz(value: &Bound) -> PyResult) -> PyResult<(i64, i32)> { - use pyo3::types::PyDateTime; - // Try PyDateTime first if let Ok(dt) = value.downcast::() { // Reject tz-aware datetime for NTZ - it's ambiguous what the user wants @@ -1465,8 +1691,6 @@ fn extract_datetime_components_ntz(value: &Bound) -> PyResult<(i64, i32)> /// Extract epoch milliseconds for TimestampLtz (instant in time, UTC-based). /// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC. fn extract_datetime_components_ltz(value: &Bound) -> PyResult<(i64, i32)> { - use pyo3::types::PyDateTime; - // Try PyDateTime first if let Ok(dt) = value.downcast::() { // Check if timezone-aware @@ -1506,11 +1730,7 @@ fn extract_datetime_components_ltz(value: &Bound) -> PyResult<(i64, i32)> } /// Convert datetime components to epoch milliseconds treating them as UTC -fn datetime_to_epoch_millis_as_utc( - dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>, -) -> PyResult<(i64, i32)> { - use pyo3::types::{PyDateAccess, PyTimeAccess}; - +fn datetime_to_epoch_millis_as_utc(dt: &Bound<'_, PyDateTime>) -> PyResult<(i64, i32)> { let year = dt.get_year(); let month = dt.get_month(); let day = dt.get_day(); @@ -1541,11 +1761,7 @@ fn datetime_to_epoch_millis_as_utc( /// Convert timezone-aware datetime to epoch milliseconds using Python's timedelta. /// This correctly handles timezone conversions by computing (dt - UTC_EPOCH). /// The UTC epoch is cached for performance. -fn datetime_to_epoch_millis_utc_aware( - dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>, -) -> PyResult<(i64, i32)> { - use pyo3::types::{PyDelta, PyDeltaAccess}; - +fn datetime_to_epoch_millis_utc_aware(dt: &Bound<'_, PyDateTime>) -> PyResult<(i64, i32)> { let py = dt.py(); let epoch = get_utc_epoch(py)?; @@ -1777,14 +1993,15 @@ impl LogScanner { /// timeout_ms: Timeout in milliseconds to wait for records /// /// Returns: - /// List of ScanRecord objects, each containing bucket, offset, timestamp, - /// change_type, and row data as a dictionary. + /// ScanRecords grouped by bucket. Supports flat iteration + /// (`for rec in records`) and per-bucket access (`records.buckets()`, + /// `records.records(bucket)`, `records[bucket]`). /// /// Note: /// - Requires a record-based scanner (created with new_scan().create_log_scanner()) - /// - Returns an empty list if no records are available - /// - When timeout expires, returns an empty list (NOT an error) - fn poll(&self, py: Python, timeout_ms: i64) -> PyResult> { + /// - Returns an empty ScanRecords if no records are available + /// - When timeout expires, returns an empty ScanRecords (NOT an error) + fn poll(&self, py: Python, timeout_ms: i64) -> PyResult { let scanner = self.scanner.as_record()?; if timeout_ms < 0 { @@ -1798,19 +2015,26 @@ impl LogScanner { .detach(|| TOKIO_RUNTIME.block_on(async { scanner.poll(timeout).await })) .map_err(|e| FlussError::from_core_error(&e))?; - // Convert ScanRecords to Python ScanRecord list - // Use projected_row_type to handle column projection correctly + // Convert core ScanRecords to Python ScanRecords grouped by bucket let row_type = &self.projected_row_type; - let mut result = Vec::new(); + let mut records_by_bucket = IndexMap::new(); + let mut total_count = 0usize; for (bucket, records) in scan_records.into_records_by_buckets() { - for record in records { - let scan_record = ScanRecord::from_core(py, &bucket, &record, row_type)?; - result.push(scan_record); + let py_bucket = TableBucket::from_core(bucket); + let mut py_records = Vec::with_capacity(records.len()); + for record in &records { + let scan_record = ScanRecord::from_core(py, record, row_type)?; + py_records.push(Py::new(py, scan_record)?); + total_count += 1; } + records_by_bucket.insert(py_bucket, py_records); } - Ok(result) + Ok(ScanRecords { + records_by_bucket, + total_count, + }) } /// Poll for batches with metadata. diff --git a/bindings/python/test/test_log_table.py b/bindings/python/test/test_log_table.py index 09586aa8..bfa97897 100644 --- a/bindings/python/test/test_log_table.py +++ b/bindings/python/test/test_log_table.py @@ -492,11 +492,22 @@ async def test_partitioned_table_append_scan(connection, admin): (8, "EU", 800), ] - records = _poll_records(scanner, expected_count=8) - assert len(records) == 8 + # Poll and verify per-bucket grouping + all_records = [] + deadline = time.monotonic() + 10 + while len(all_records) < 8 and time.monotonic() < deadline: + scan_records = scanner.poll(5000) + for bucket, bucket_records in scan_records.items(): + assert bucket.partition_id is not None, "Partitioned table should have partition_id" + # All records in a bucket should belong to the same partition + regions = {r.row["region"] for r in bucket_records} + assert len(regions) == 1, f"Bucket has mixed regions: {regions}" + all_records.extend(bucket_records) + + assert len(all_records) == 8 collected = sorted( - [(r.row["id"], r.row["region"], r.row["value"]) for r in records], + [(r.row["id"], r.row["region"], r.row["value"]) for r in all_records], key=lambda x: x[0], ) assert collected == expected @@ -652,6 +663,70 @@ async def test_partitioned_table_to_arrow(connection, admin): await admin.drop_table(table_path, ignore_if_not_exists=False) +async def test_scan_records_indexing_and_slicing(connection, admin): + """Test ScanRecords indexing, slicing (incl. negative steps), and iteration consistency.""" + table_path = fluss.TablePath("fluss", "py_test_scan_records_indexing") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema([pa.field("id", pa.int32()), pa.field("val", pa.string())]) + ) + await admin.create_table(table_path, fluss.TableDescriptor(schema)) + + table = await connection.get_table(table_path) + writer = table.new_append().create_writer() + writer.write_arrow_batch( + pa.RecordBatch.from_arrays( + [pa.array(list(range(1, 9)), type=pa.int32()), + pa.array([f"v{i}" for i in range(1, 9)])], + schema=pa.schema([pa.field("id", pa.int32()), pa.field("val", pa.string())]), + ) + ) + await writer.flush() + + scanner = await table.new_scan().create_log_scanner() + num_buckets = (await admin.get_table_info(table_path)).num_buckets + scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + + # Poll until we get a non-empty ScanRecords (need ≥2 records for slice tests) + sr = None + deadline = time.monotonic() + 10 + while time.monotonic() < deadline: + sr = scanner.poll(5000) + if len(sr) >= 2: + break + assert sr is not None and len(sr) >= 2, "Expected at least 2 records" + n = len(sr) + offsets = [sr[i].offset for i in range(n)] + + # Iteration and indexing must produce the same order + assert [r.offset for r in sr] == offsets + + # Negative indexing + assert sr[-1].offset == offsets[-1] + assert sr[-n].offset == offsets[0] + + # Verify slices match the same operation on the offsets reference list + test_slices = [ + slice(1, n - 1), # forward subrange + slice(None, None, -1), # [::-1] full reverse + slice(n - 2, 0, -1), # reverse with bounds + slice(n - 1, 0, -2), # reverse with step + slice(None, None, 2), # [::2] + slice(1, None, 3), # [1::3] + slice(2, 2), # empty + ] + for s in test_slices: + result = [r.offset for r in sr[s]] + assert result == offsets[s], f"slice {s}: got {result}, expected {offsets[s]}" + + # Bucket-based indexing + for bucket in sr.buckets(): + assert len(sr[bucket]) > 0 + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -667,6 +742,8 @@ def _poll_records(scanner, expected_count, timeout_s=10): return collected + + def _poll_arrow_ids(scanner, expected_count, timeout_s=10): """Poll a batch scanner and extract 'id' column values.""" all_ids = [] diff --git a/website/docs/user-guide/cpp/api-reference.md b/website/docs/user-guide/cpp/api-reference.md index a07dd6c6..433c5da9 100644 --- a/website/docs/user-guide/cpp/api-reference.md +++ b/website/docs/user-guide/cpp/api-reference.md @@ -250,23 +250,60 @@ Read-only row view for scan results. Provides zero-copy access to string and byt `ScanRecord` is a value type that can be freely copied, stored, and accumulated across multiple `Poll()` calls. It shares ownership of the underlying scan data via reference counting. -| Field | Type | Description | -|----------------|-------------------------|----------------------------------| -| `bucket_id` | `int32_t` | Bucket this record belongs to | -| `partition_id` | `std::optional`| Partition ID (if partitioned) | -| `offset` | `int64_t` | Record offset in the log | -| `timestamp` | `int64_t` | Record timestamp | -| `change_type` | `ChangeType` | Type of change (see `ChangeType`)| -| `row` | `RowView` | Read-only row view for field access | +| Field | Type | Description | +|---------------|--------------|---------------------------------------------------------------------| +| `offset` | `int64_t` | Record offset in the log | +| `timestamp` | `int64_t` | Record timestamp | +| `change_type` | `ChangeType` | Change type (AppendOnly, Insert, UpdateBefore, UpdateAfter, Delete) | +| `row` | `RowView` | Row data (value type, shares ownership via reference counting) | ## `ScanRecords` -| Method | Description | -|----------------------------------------|--------------------------------------------| -| `Size() -> size_t` | Number of records | -| `Empty() -> bool` | Check if empty | -| `operator[](size_t idx) -> ScanRecord` | Access record by index | -| `begin() / end()` | Iterator support for range-based for loops | +### Flat Access + +| Method | Description | +|-----------------------------------------|--------------------------------------------| +| `Count() -> size_t` | Total number of records across all buckets | +| `IsEmpty() -> bool` | Check if empty | +| `begin() / end()` | Iterator support for range-based for loops | + +Flat iteration over all records (regardless of bucket): + +```cpp +for (const auto& rec : records) { + std::cout << "offset=" << rec.offset << std::endl; +} +``` + +### Per-Bucket Access + +| Method | Description | +|-----------------------------------------------------------------|-----------------------------------------------------------------------| +| `BucketCount() -> size_t` | Number of distinct buckets | +| `Buckets() -> std::vector` | List of distinct buckets | +| `Records(const TableBucket& bucket) -> BucketView` | Records for a specific bucket (empty view if bucket not present) | +| `BucketAt(size_t idx) -> BucketView` | Records by bucket index (0-based, O(1)) | + +## `BucketView` + +A view of records within a single bucket. Obtained from `ScanRecords::Records()` or `ScanRecords::BucketAt()`. `BucketView` is a value type — it shares ownership of the underlying scan data via reference counting, so it can safely outlive the `ScanRecords` that produced it. + +| Method | Description | +|------------------------------------------------|--------------------------------------------| +| `Size() -> size_t` | Number of records in this bucket | +| `Empty() -> bool` | Check if empty | +| `Bucket() -> const TableBucket&` | Get the bucket | +| `operator[](size_t idx) -> ScanRecord` | Access record by index within this bucket | +| `begin() / end()` | Iterator support for range-based for loops | + +## `TableBucket` + +| Field / Method | Description | +|---------------------------------------|-------------------------------------------------| +| `table_id -> int64_t` | Table ID | +| `bucket_id -> int32_t` | Bucket ID | +| `partition_id -> std::optional` | Partition ID (empty if non-partitioned) | +| `operator==(const TableBucket&) -> bool` | Equality comparison | ## `LookupResult` diff --git a/website/docs/user-guide/cpp/example/log-tables.md b/website/docs/user-guide/cpp/example/log-tables.md index 3a862c18..0125a4ce 100644 --- a/website/docs/user-guide/cpp/example/log-tables.md +++ b/website/docs/user-guide/cpp/example/log-tables.md @@ -60,6 +60,18 @@ for (const auto& rec : records) { << " timestamp=" << rec.row.GetInt64(2) << " @ offset=" << rec.offset << std::endl; } + +// Or per-bucket access +for (const auto& bucket : records.Buckets()) { + auto view = records.Records(bucket); + std::cout << "Bucket " << bucket.bucket_id << ": " + << view.Size() << " records" << std::endl; + for (const auto& rec : view) { + std::cout << " event_id=" << rec.row.GetInt32(0) + << " event_type=" << rec.row.GetString(1) + << " @ offset=" << rec.offset << std::endl; + } +} ``` **Continuous polling:** diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index af03058a..27a57dc1 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -137,17 +137,69 @@ Builder for creating a `Lookuper`. Obtain via `FlussTable.new_lookup()`. | `.subscribe_partition_buckets(partition_bucket_offsets)` | Subscribe to multiple partition+bucket combos (`{(part_id, bucket_id): offset}`) | | `.unsubscribe(bucket_id)` | Unsubscribe from a bucket (non-partitioned tables) | | `.unsubscribe_partition(partition_id, bucket_id)` | Unsubscribe from a partition bucket | -| `.poll(timeout_ms) -> list[ScanRecord]` | Poll individual records (record scanner only) | +| `.poll(timeout_ms) -> ScanRecords` | Poll individual records (record scanner only) | | `.poll_arrow(timeout_ms) -> pa.Table` | Poll as Arrow Table (batch scanner only) | | `.poll_record_batch(timeout_ms) -> list[RecordBatch]` | Poll batches with metadata (batch scanner only) | | `.to_arrow() -> pa.Table` | Read all subscribed data as Arrow Table (batch scanner only) | | `.to_pandas() -> pd.DataFrame` | Read all subscribed data as DataFrame (batch scanner only) | +## `ScanRecords` + +Returned by `LogScanner.poll()`. Records are grouped by bucket. + +> **Note:** Flat iteration and integer indexing traverse buckets in an arbitrary order that is consistent within a single `ScanRecords` instance but may differ between `poll()` calls. Use per-bucket access (`.items()`, `.records(bucket)`) when bucket ordering matters. + +```python +scan_records = scanner.poll(timeout_ms=5000) + +# Sequence access +scan_records[0] # first record +scan_records[-1] # last record +scan_records[:5] # first 5 records + +# Per-bucket access +for bucket, records in scan_records.items(): + for record in records: + print(f"bucket={bucket.bucket_id}, offset={record.offset}, row={record.row}") + +# Flat iteration +for record in scan_records: + print(record.row) +``` + +### Methods + +| Method | Description | +|----------------------------------------|------------------------------------------------------------------| +| `.buckets() -> list[TableBucket]` | List of distinct buckets | +| `.records(bucket) -> list[ScanRecord]` | Records for a specific bucket (empty list if bucket not present) | +| `.count() -> int` | Total record count across all buckets | +| `.is_empty() -> bool` | Check if empty | + +### Indexing + +| Expression | Returns | Description | +|------------------------------|----------------------|-----------------------------------| +| `scan_records[0]` | `ScanRecord` | Record by flat index | +| `scan_records[-1]` | `ScanRecord` | Negative indexing | +| `scan_records[1:5]` | `list[ScanRecord]` | Slice | +| `scan_records[bucket]` | `list[ScanRecord]` | Records for a bucket | + +### Mapping Protocol + +| Method / Protocol | Description | +|--------------------------------|-------------------------------------------------| +| `.keys()` | Same as `.buckets()` | +| `.values()` | Lazy iterator over record lists, one per bucket | +| `.items()` | Lazy iterator over `(bucket, records)` pairs | +| `len(scan_records)` | Same as `.count()` | +| `bucket in scan_records` | Membership test | +| `for record in scan_records` | Flat iteration over all records | + ## `ScanRecord` | Property | Description | |------------------------------|---------------------------------------------------------------------| -| `.bucket -> TableBucket` | Bucket this record belongs to | | `.offset -> int` | Record offset in the log | | `.timestamp -> int` | Record timestamp | | `.change_type -> ChangeType` | Change type (AppendOnly, Insert, UpdateBefore, UpdateAfter, Delete) | diff --git a/website/docs/user-guide/python/example/log-tables.md b/website/docs/user-guide/python/example/log-tables.md index 6e44e061..adaa162a 100644 --- a/website/docs/user-guide/python/example/log-tables.md +++ b/website/docs/user-guide/python/example/log-tables.md @@ -83,13 +83,20 @@ while True: if result.num_rows > 0: print(result.to_pandas()) -# Record scanner: poll individual records with metadata +# Record scanner: poll individual records scanner = await table.new_scan().create_log_scanner() scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) while True: - for record in scanner.poll(timeout_ms=5000): + scan_records = scanner.poll(timeout_ms=5000) + + for record in scan_records: print(f"offset={record.offset}, change={record.change_type.short_string()}, row={record.row}") + + # Or per-bucket access (dict-like) + for bucket, records in scan_records.items(): + for record in records: + print(f"bucket={bucket.bucket_id}, offset={record.offset}, row={record.row}") ``` ### Unsubscribing diff --git a/website/docs/user-guide/rust/example/log-tables.md b/website/docs/user-guide/rust/example/log-tables.md index 3ba33542..f5a4d0e2 100644 --- a/website/docs/user-guide/rust/example/log-tables.md +++ b/website/docs/user-guide/rust/example/log-tables.md @@ -63,6 +63,21 @@ log_scanner.subscribe(0, 0).await?; // Poll for records let records = log_scanner.poll(Duration::from_secs(10)).await?; +// Per-bucket access +for (bucket, bucket_records) in records.records_by_buckets() { + println!("Bucket {}: {} records", bucket.bucket_id(), bucket_records.len()); + for record in bucket_records { + let row = record.row(); + println!( + " event_id={}, event_type={} @ offset={}", + row.get_int(0), + row.get_string(1), + record.offset() + ); + } +} + +// Or flat iteration (consumes ScanRecords) for record in records { let row = record.row(); println!(