From 7c56af24a42770bbfec817e3d4b9fff09b563ff3 Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Tue, 2 Jun 2026 06:59:59 +0000 Subject: [PATCH] feat: add IO stream infrastructure --- include/paimon/io/buffered_input_stream.h | 91 ++++++ include/paimon/io/byte_array_input_stream.h | 67 +++++ include/paimon/io/data_input_stream.h | 97 +++++++ .../common/io/buffered_input_stream.cpp | 167 +++++++++++ .../common/io/buffered_input_stream_test.cpp | 270 ++++++++++++++++++ .../common/io/byte_array_input_stream.cpp | 107 +++++++ .../io/byte_array_input_stream_test.cpp | 84 ++++++ .../io/data_input_output_stream_test.cpp | 208 ++++++++++++++ src/paimon/common/io/data_input_stream.cpp | 129 +++++++++ src/paimon/common/io/data_output_stream.cpp | 70 +++++ src/paimon/common/io/data_output_stream.h | 79 +++++ .../io/memory_segment_output_stream.cpp | 99 +++++++ .../common/io/memory_segment_output_stream.h | 118 ++++++++ .../io/memory_segment_output_stream_test.cpp | 85 ++++++ src/paimon/common/io/offset_input_stream.cpp | 137 +++++++++ src/paimon/common/io/offset_input_stream.h | 67 +++++ .../common/io/offset_input_stream_test.cpp | 216 ++++++++++++++ 17 files changed, 2091 insertions(+) create mode 100644 include/paimon/io/buffered_input_stream.h create mode 100644 include/paimon/io/byte_array_input_stream.h create mode 100644 include/paimon/io/data_input_stream.h create mode 100644 src/paimon/common/io/buffered_input_stream.cpp create mode 100644 src/paimon/common/io/buffered_input_stream_test.cpp create mode 100644 src/paimon/common/io/byte_array_input_stream.cpp create mode 100644 src/paimon/common/io/byte_array_input_stream_test.cpp create mode 100644 src/paimon/common/io/data_input_output_stream_test.cpp create mode 100644 src/paimon/common/io/data_input_stream.cpp create mode 100644 src/paimon/common/io/data_output_stream.cpp create mode 100644 src/paimon/common/io/data_output_stream.h create mode 100644 src/paimon/common/io/memory_segment_output_stream.cpp create mode 100644 src/paimon/common/io/memory_segment_output_stream.h create mode 100644 src/paimon/common/io/memory_segment_output_stream_test.cpp create mode 100644 src/paimon/common/io/offset_input_stream.cpp create mode 100644 src/paimon/common/io/offset_input_stream.h create mode 100644 src/paimon/common/io/offset_input_stream_test.cpp diff --git a/include/paimon/io/buffered_input_stream.h b/include/paimon/io/buffered_input_stream.h new file mode 100644 index 0000000..2af6b73 --- /dev/null +++ b/include/paimon/io/buffered_input_stream.h @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once +#include +#include +#include +#include + +#include "paimon/fs/file_system.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/visibility.h" + +namespace paimon { +class Bytes; +class MemoryPool; + +/// A buffered input stream that wraps another `InputStream` to provide buffering capabilities. +/// +/// `BufferedInputStream` improves I/O performance by reducing the number of system calls +/// through internal buffering. It reads data from the underlying stream in larger chunks +/// and serves subsequent read requests from the internal buffer when possible. +class PAIMON_EXPORT BufferedInputStream : public InputStream { + public: + /// Creates a new buffered input stream that wraps the provided input stream. + /// The buffer is allocated from the specified memory pool. + /// + /// @param in The underlying input stream to wrap. + /// @param buffer_size Size of the internal buffer in bytes. + /// @param pool Memory pool for buffer allocation. + BufferedInputStream(const std::shared_ptr& in, int32_t buffer_size, + MemoryPool* pool); + + ~BufferedInputStream() noexcept override; + + Status Seek(int64_t offset, SeekOrigin origin) override; + + Result GetPos() const override; + + Result Read(char* buffer, uint32_t size) override; + + Result Read(char* buffer, uint32_t size, uint64_t offset) override; + + void ReadAsync(char* buffer, uint32_t size, uint64_t offset, + std::function&& callback) override; + + Result Length() const override; + + Status Close() override; + + Result GetUri() const override; + + static constexpr int32_t DEFAULT_BUFFER_SIZE = 8192; + + private: + /// Fill the internal buffer from the underlying stream. + Status Fill(); + + /// Internal read implementation. + /// @pre size > 0 + Result InnerRead(char* buffer, int32_t size); + + /// Validate that the expected number of bytes were read. + Status AssertReadLength(int32_t read_length, int32_t actual_read_length) const; + + private: + int32_t buffer_size_; + int32_t pos_ = 0; + int32_t count_ = 0; + std::unique_ptr buffer_; + std::shared_ptr in_; +}; + +} // namespace paimon diff --git a/include/paimon/io/byte_array_input_stream.h b/include/paimon/io/byte_array_input_stream.h new file mode 100644 index 0000000..e66c5e8 --- /dev/null +++ b/include/paimon/io/byte_array_input_stream.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/fs/file_system.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/visibility.h" + +namespace paimon { +/// Input stream for memory buffer, inherits from `InputStream`. +class PAIMON_EXPORT ByteArrayInputStream : public InputStream { + public: + ByteArrayInputStream(const char* buffer, uint64_t length); + ~ByteArrayInputStream() override = default; + + /// @return The raw data pointer of current pos. + const char* GetRawData() const; + + Status Seek(int64_t offset, SeekOrigin origin) override; + + Result GetPos() const override { + return position_; + } + + Result Read(char* buffer, uint32_t size) override; + + Result Read(char* buffer, uint32_t size, uint64_t offset) override; + + void ReadAsync(char* buffer, uint32_t size, uint64_t offset, + std::function&& callback) override; + + Result Length() const override { + return length_; + } + + Status Close() override; + + Result GetUri() const override; + + private: + const char* buffer_; + const uint64_t length_; + int64_t position_; +}; +} // namespace paimon diff --git a/include/paimon/io/data_input_stream.h b/include/paimon/io/data_input_stream.h new file mode 100644 index 0000000..8dc15a2 --- /dev/null +++ b/include/paimon/io/data_input_stream.h @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/io/byte_order.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/visibility.h" + +namespace paimon { +class Bytes; +class InputStream; + +/// `DataInputStream` provides a convenient wrapper around `InputStream` for reading typed data. +/// @note The default byte order is big-endian to maintain compatibility with the Java +/// implementation. +class PAIMON_EXPORT DataInputStream { + public: + /// Constructs a `DataInputStream` wrapping the given `InputStream`. + /// @param input_stream The underlying input stream to read from. + explicit DataInputStream(const std::shared_ptr& input_stream); + + /// Seek to a specific position in the underlying input stream. + /// @param offset The absolute byte offset to seek to. + Status Seek(int64_t offset) const; + + /// Read a typed value from the stream. + /// @return Result containing the read value or an error status. + template + Result ReadValue() const; + + /// Read some bytes to a `Bytes` object from the stream. The length of bytes is the number of + /// bytes read from the stream. + /// @param bytes Buffer to store the read bytes. + Status ReadBytes(Bytes* bytes) const; + + /// Read raw data of specified size from the stream. + /// @param data Buffer to store the read data. + /// @param size Number of bytes to read. + Status Read(char* data, uint32_t size) const; + + /// Read string from the stream. + /// @note First read length (int16), then read string bytes. + Result ReadString() const; + + /// Get the current position in the underlying input stream. + Result GetPos() const; + + /// Get the total length of the underlying input stream. + Result Length() const; + + /// Set the byte order for endianness conversion. + /// @param order The byte order to use `PAIMON_BIG_ENDIAN` or `PAIMON_LITTLE_ENDIAN`. + void SetOrder(ByteOrder order) { + byte_order_ = order; + } + + private: + /// Validate that the expected number of bytes were read. + /// @param read_length Expected number of bytes to read. + /// @param actual_read_length Actual number of bytes read. + Status AssertReadLength(int32_t read_length, int32_t actual_read_length) const; + + /// Check if there are enough bytes available to read. + /// @param need_length Number of bytes needed. + Status AssertBoundary(int32_t need_length) const; + + /// Determine if byte swapping is needed based on current byte order and system endianness. + /// @return `true` if byte swapping is required, `false` otherwise. + bool NeedSwap() const; + + private: + std::shared_ptr input_stream_; + ByteOrder byte_order_ = ByteOrder::PAIMON_BIG_ENDIAN; +}; +} // namespace paimon diff --git a/src/paimon/common/io/buffered_input_stream.cpp b/src/paimon/common/io/buffered_input_stream.cpp new file mode 100644 index 0000000..d2b50b7 --- /dev/null +++ b/src/paimon/common/io/buffered_input_stream.cpp @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/io/buffered_input_stream.h" + +#include +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/memory/bytes.h" + +namespace paimon { +class MemoryPool; + +BufferedInputStream::BufferedInputStream(const std::shared_ptr& in, + int32_t buffer_size, MemoryPool* pool) + : buffer_size_(buffer_size), in_(in) { + assert(buffer_size > 0); + buffer_ = std::make_unique(buffer_size, pool); +} + +BufferedInputStream::~BufferedInputStream() noexcept = default; + +Status BufferedInputStream::Seek(int64_t offset, SeekOrigin origin) { + // Convert all seek origins to an absolute offset so the buffer-hit fast + // path below can work uniformly on absolute positions. + int64_t target_abs_offset = offset; + if (origin == SeekOrigin::FS_SEEK_CUR) { + PAIMON_ASSIGN_OR_RAISE(int64_t cur_pos, GetPos()); + target_abs_offset = cur_pos + offset; + } else if (origin == SeekOrigin::FS_SEEK_END) { + PAIMON_ASSIGN_OR_RAISE(int64_t length, in_->Length()); + target_abs_offset = length + offset; + } + // else: FS_SEEK_SET — target_abs_offset is already absolute. + + // Fast path: if the new absolute offset still falls into the bytes already + // cached in buffer_ (i.e. the window from buf_start_abs to buf_end_abs), just + // adjust pos_ without touching the underlying stream. + if (count_ > 0) { + PAIMON_ASSIGN_OR_RAISE(int64_t in_pos, in_->GetPos()); + const int64_t buf_start_abs = in_pos - count_; + const int64_t buf_end_abs = in_pos; + if (target_abs_offset >= buf_start_abs && target_abs_offset <= buf_end_abs) { + pos_ = static_cast(target_abs_offset - buf_start_abs); + return Status::OK(); + } + } + + // Slow path: the target is outside the current buffer window, fall back to + // a real seek on the underlying stream and invalidate the buffer. + PAIMON_RETURN_NOT_OK(in_->Seek(target_abs_offset, FS_SEEK_SET)); + pos_ = 0; + count_ = 0; + return Status::OK(); +} + +Result BufferedInputStream::GetPos() const { + PAIMON_ASSIGN_OR_RAISE(int64_t in_pos, in_->GetPos()); + return in_pos - count_ + pos_; +} + +Result BufferedInputStream::Read(char* buffer, uint32_t size) { + uint32_t actual_read_len = 0; + while (actual_read_len < size) { + PAIMON_ASSIGN_OR_RAISE(int32_t nread, + InnerRead(buffer + actual_read_len, size - actual_read_len)); + assert(nread > 0); + actual_read_len += nread; + } + PAIMON_RETURN_NOT_OK(AssertReadLength(size, actual_read_len)); + return actual_read_len; +} + +Result BufferedInputStream::Read(char* buffer, uint32_t size, uint64_t offset) { + return Status::Invalid("BufferedInputStream does not support Read from offset"); +} + +void BufferedInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t offset, + std::function&& callback) { + callback(Status::NotImplemented("BufferedInputStream do not support ReadAsync")); +} + +Result BufferedInputStream::Length() const { + return in_->Length(); +} + +Status BufferedInputStream::Close() { + pos_ = 0; + count_ = 0; + buffer_.reset(); + return Status::OK(); +} + +Result BufferedInputStream::GetUri() const { + return in_->GetUri(); +} + +Status BufferedInputStream::Fill() { + pos_ = 0; + count_ = 0; + PAIMON_ASSIGN_OR_RAISE(int64_t in_pos, in_->GetPos()); + PAIMON_ASSIGN_OR_RAISE(int64_t length, in_->Length()); + int64_t left_to_read = std::min((length - in_pos), static_cast(buffer_size_)); + PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_len, in_->Read(buffer_->data(), left_to_read)); + PAIMON_RETURN_NOT_OK(AssertReadLength(left_to_read, actual_read_len)); + count_ = actual_read_len; + return Status::OK(); +} + +Result BufferedInputStream::InnerRead(char* buffer, int32_t size) { + assert(size > 0); + int32_t avail = count_ - pos_; + if (avail <= 0) { + assert(avail == 0); + /* If the requested length is at least as large as the buffer, and + if there is no mark/reset activity, do not bother to copy the + bytes into the local buffer. In this way buffered streams will + cascade harmlessly. */ + if (size >= buffer_size_) { + return in_->Read(buffer, size); + } + PAIMON_RETURN_NOT_OK(Fill()); + avail = count_ - pos_; + if (avail <= 0) { + return Status::Invalid(fmt::format( + "InnerRead failed, after Fill(), still no bytes available (may read eof), but " + "expect read {} bytes", + size)); + } + } + int32_t copy_length = std::min(avail, size); + memcpy(buffer, buffer_->data() + pos_, copy_length); + pos_ += copy_length; + return copy_length; +} + +Status BufferedInputStream::AssertReadLength(int32_t read_length, + int32_t actual_read_length) const { + if (read_length != actual_read_length) { + return Status::Invalid( + fmt::format("assert read length failed: read length not match, read length {}, actual " + "read length {}", + read_length, actual_read_length)); + } + return Status::OK(); +} + +} // namespace paimon diff --git a/src/paimon/common/io/buffered_input_stream_test.cpp b/src/paimon/common/io/buffered_input_stream_test.cpp new file mode 100644 index 0000000..0f6c9ef --- /dev/null +++ b/src/paimon/common/io/buffered_input_stream_test.cpp @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/io/buffered_input_stream.h" + +#include + +#include "gtest/gtest.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(BufferedInputStreamTest, TestSimple) { + auto pool = GetDefaultPool(); + auto output_stream = std::make_unique(/*segment_size=*/8, pool); + std::string str = "abcdef"; + auto bytes = std::make_shared(str, pool.get()); + output_stream->WriteBytes(bytes); + auto out_bytes = MemorySegmentUtils::CopyToBytes(output_stream->Segments(), 0, + output_stream->CurrentSize(), pool.get()); + auto in = std::make_shared(out_bytes->data(), out_bytes->size()); + auto input_stream = std::make_shared(in, /*buffer_size=*/4, pool.get()); + ASSERT_EQ(6, input_stream->Length().value()); + ASSERT_TRUE(input_stream->GetUri().value().empty()); + + // read from pos 0 + std::string value(3, '\0'); + ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value()); + ASSERT_EQ("abc", value); + ASSERT_EQ(3, input_stream->GetPos().value()); + ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value()); + ASSERT_EQ("def", value); + ASSERT_EQ(6, input_stream->GetPos().value()); + + // read from pos 1 + ASSERT_NOK_WITH_MSG(input_stream->Read(value.data(), value.size(), /*offset=*/1), + "BufferedInputStream does not support Read from offset"); + ASSERT_EQ(6, input_stream->GetPos().value()); + + // seek to pos 3 + ASSERT_OK(input_stream->Seek(3, SeekOrigin::FS_SEEK_SET)); + ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value()); + ASSERT_EQ("def", value); + ASSERT_EQ(6, input_stream->GetPos().value()); + + // seek to pos 2 + ASSERT_OK(input_stream->Seek(-4, SeekOrigin::FS_SEEK_END)); + ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value()); + ASSERT_EQ("cde", value); + ASSERT_EQ(5, input_stream->GetPos().value()); + + // seek to pos 1 + ASSERT_OK(input_stream->Seek(-4, SeekOrigin::FS_SEEK_CUR)); + ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value()); + ASSERT_EQ("bcd", value); + ASSERT_EQ(4, input_stream->GetPos().value()); + + // test exceed eof, seek to pos 4, want to read 4 bytes + ASSERT_OK(input_stream->Seek(-2, SeekOrigin::FS_SEEK_END)); + ASSERT_NOK_WITH_MSG(input_stream->Read(value.data(), value.size()), + "InnerRead failed, after Fill(), still no bytes available (may read eof), " + "but expect read 1 bytes"); + + // test invalid seek + ASSERT_NOK_WITH_MSG(input_stream->Seek(100, SeekOrigin::FS_SEEK_CUR), + "invalid seek, after seek, current pos 106, length 6"); + + // test ReadAsync not implemented + bool read_finished = false; + auto callback = [&](Status status) { + ASSERT_TRUE(status.IsNotImplemented()); + read_finished = true; + }; + input_stream->ReadAsync(value.data(), value.size(), /*offset=*/0, callback); + ASSERT_TRUE(read_finished); + + ASSERT_OK(input_stream->Close()); +} + +TEST(BufferedInputStreamTest, TestSeek) { + // Data: "0123456789abcdef" (16 bytes), buffer_size = 8 + auto pool = GetDefaultPool(); + std::string data = "0123456789abcdef"; + auto in = std::make_shared(reinterpret_cast(data.data()), + data.size()); + auto stream = std::make_shared(in, /*buffer_size=*/8, pool.get()); + + // Helper: verify buffer_ content matches expected substring of data + auto check_buffer = [&](const BufferedInputStream& s, const std::string& expected_content, + int32_t expected_pos, int32_t expected_count) { + ASSERT_EQ(s.pos_, expected_pos); + ASSERT_EQ(s.count_, expected_count); + std::string actual(s.buffer_->data(), s.buffer_->data() + expected_count); + ASSERT_EQ(actual, expected_content) << "buffer content mismatch: actual=\"" << actual + << "\", expected=\"" << expected_content << "\""; + }; + + std::string buf(4, '\0'); + + // Initial state: buffer empty + ASSERT_EQ(stream->pos_, 0); + ASSERT_EQ(stream->count_, 0); + + // FS_SEEK_SET slow path (buffer empty, count_==0): seek to pos 4, read "4567" + // First Read calls Fill: reads 8 bytes from pos 4 -> buffer = "456789ab", count_=8. + // Then consumes 4 bytes -> pos_=4. + { + ASSERT_OK(stream->Seek(4, SeekOrigin::FS_SEEK_SET)); + // After seek: slow path, buffer invalidated + ASSERT_EQ(stream->pos_, 0); + ASSERT_EQ(stream->count_, 0); + ASSERT_EQ(4, stream->GetPos().value()); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("4567", buf); + check_buffer(*stream, "456789ab", 4, 8); + } + + // FS_SEEK_CUR buffer hit: pos=8, seek -4 -> target=4, inside buffer [4..12) + // Fast path: only adjusts pos_ to 0, count_ stays 8, buffer unchanged. + { + ASSERT_OK(stream->Seek(-4, SeekOrigin::FS_SEEK_CUR)); + check_buffer(*stream, "456789ab", 0, 8); + ASSERT_EQ(4, stream->GetPos().value()); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("4567", buf); + check_buffer(*stream, "456789ab", 4, 8); + } + + // FS_SEEK_SET buffer hit: pos=8, seek to 5, inside buffer [4..12) + // Fast path: pos_ = 5 - 4 = 1, count_ stays 8, buffer unchanged. + { + ASSERT_OK(stream->Seek(5, SeekOrigin::FS_SEEK_SET)); + check_buffer(*stream, "456789ab", 1, 8); + ASSERT_EQ(5, stream->GetPos().value()); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("5678", buf); + check_buffer(*stream, "456789ab", 5, 8); + } + + // FS_SEEK_END buffer miss: target = 16 + (-6) = 10 + // Current buffer covers [4..12). pos 10 is inside [4..12) -> actually buffer hit! + // Fast path: pos_ = 10 - 4 = 6, count_ stays 8. + { + ASSERT_OK(stream->Seek(-6, SeekOrigin::FS_SEEK_END)); + check_buffer(*stream, "456789ab", 6, 8); + ASSERT_EQ(10, stream->GetPos().value()); + + // Read 4 bytes from pos 10: 2 left in buffer ("ab"), then refill. + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("abcd", buf); + // After consuming "ab" (pos_==count_==8), InnerRead calls Fill from in_ pos 12, + // reads [12..16) -> buffer = "cdef", count_=4, then consumes 2 -> pos_=2. + check_buffer(*stream, "cdef", 2, 4); + } + + // Buffer miss slow path: seek to pos 0, current buffer covers [12..16). + // Target 0 is outside [12..16) -> slow path, buffer invalidated. + { + ASSERT_OK(stream->Seek(0, SeekOrigin::FS_SEEK_SET)); + ASSERT_EQ(stream->pos_, 0); + ASSERT_EQ(stream->count_, 0); + ASSERT_EQ(0, stream->GetPos().value()); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("0123", buf); + // Fill reads [0..8) -> buffer = "01234567", count_=8, pos_=4 + check_buffer(*stream, "01234567", 4, 8); + } + + // Buffer hit at exact boundary start: seek to pos 0 (= buf_start_abs=0) + // Fast path: pos_ = 0, count_ stays 8, buffer unchanged. + { + ASSERT_OK(stream->Seek(0, SeekOrigin::FS_SEEK_SET)); + check_buffer(*stream, "01234567", 0, 8); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("0123", buf); + check_buffer(*stream, "01234567", 4, 8); + } + + // Buffer hit at exact boundary end: seek to pos 8 (= buf_end_abs=8) + // Fast path: pos_ = 8 == count_, buffer unchanged but next Read triggers refill. + { + ASSERT_OK(stream->Seek(8, SeekOrigin::FS_SEEK_SET)); + check_buffer(*stream, "01234567", 8, 8); + ASSERT_EQ(8, stream->GetPos().value()); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("89ab", buf); + // pos_==count_ triggered Fill: buffer = "89abcdef", count_=8, pos_=4 + check_buffer(*stream, "89abcdef", 4, 8); + } + + // FS_SEEK_CUR buffer miss: pos=12, seek -12 -> target=0, outside [8..16) + { + ASSERT_OK(stream->Seek(-12, SeekOrigin::FS_SEEK_CUR)); + ASSERT_EQ(stream->pos_, 0); + ASSERT_EQ(stream->count_, 0); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("0123", buf); + check_buffer(*stream, "01234567", 4, 8); + } + + // FS_SEEK_END buffer hit: fill buffer near file end, then seek within via FS_SEEK_END. + { + ASSERT_OK(stream->Seek(12, SeekOrigin::FS_SEEK_SET)); + // pos 12 is outside current buffer [0..8) -> slow path + ASSERT_EQ(stream->pos_, 0); + ASSERT_EQ(stream->count_, 0); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("cdef", buf); + // Fill from 12: buffer = "cdef", count_=4, pos_=4 + check_buffer(*stream, "cdef", 4, 4); + + // Seek -4 from end -> target = 12, buffer covers [12..16), 12 is inside. + // Fast path: pos_ = 12 - 12 = 0 + ASSERT_OK(stream->Seek(-4, SeekOrigin::FS_SEEK_END)); + check_buffer(*stream, "cdef", 0, 4); + ASSERT_EQ(12, stream->GetPos().value()); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("cdef", buf); + check_buffer(*stream, "cdef", 4, 4); + } + + // Empty buffer (count_==0): fresh stream, seek always takes slow path. + { + auto in2 = std::make_shared( + reinterpret_cast(data.data()), data.size()); + auto fresh = std::make_shared(in2, /*buffer_size=*/8, pool.get()); + ASSERT_EQ(fresh->pos_, 0); + ASSERT_EQ(fresh->count_, 0); + + ASSERT_OK(fresh->Seek(10, SeekOrigin::FS_SEEK_SET)); + ASSERT_EQ(fresh->pos_, 0); + ASSERT_EQ(fresh->count_, 0); + ASSERT_EQ(10, fresh->GetPos().value()); + + ASSERT_EQ(4, fresh->Read(buf.data(), 4).value()); + ASSERT_EQ("abcd", buf); + // Fill from 10: buffer = "abcdef", count_=6, pos_=4 + check_buffer(*fresh, "abcdef", 4, 6); + } +} +} // namespace paimon::test diff --git a/src/paimon/common/io/byte_array_input_stream.cpp b/src/paimon/common/io/byte_array_input_stream.cpp new file mode 100644 index 0000000..a88d6b8 --- /dev/null +++ b/src/paimon/common/io/byte_array_input_stream.cpp @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/io/byte_array_input_stream.h" + +#include +#include +#include + +#include "fmt/format.h" + +namespace paimon { +ByteArrayInputStream::ByteArrayInputStream(const char* buffer, uint64_t length) + : buffer_(buffer), length_(length), position_(0) { + assert(buffer_); +} + +const char* ByteArrayInputStream::GetRawData() const { + return buffer_ + position_; +} + +Status ByteArrayInputStream::Seek(int64_t offset, SeekOrigin origin) { + switch (origin) { + case SeekOrigin::FS_SEEK_SET: { + position_ = offset; + break; + } + case SeekOrigin::FS_SEEK_CUR: { + position_ += offset; + break; + } + case SeekOrigin::FS_SEEK_END: { + PAIMON_ASSIGN_OR_RAISE(uint64_t length, Length()); + position_ = static_cast(length) + offset; + break; + } + default: + return Status::Invalid( + "invalid SeekOrigin, only support FS_SEEK_SET, FS_SEEK_CUR, and FS_SEEK_END"); + } + if (position_ < 0 || position_ > static_cast(length_)) { + return Status::Invalid( + fmt::format("invalid seek, after seek, current pos {}, length {}", position_, length_)); + } + return Status::OK(); +} + +Result ByteArrayInputStream::Read(char* buffer, uint32_t size) { + if (position_ + static_cast(size) > static_cast(length_)) { + return Status::Invalid( + fmt::format("ByteArrayInputStream assert boundary failed: need length {}, current " + "position {}, exceed length {}", + size, position_, length_)); + } + memcpy(buffer, buffer_ + position_, size); + position_ += size; + return size; +} + +Result ByteArrayInputStream::Read(char* buffer, uint32_t size, uint64_t offset) { + if (offset + static_cast(size) > length_) { + return Status::Invalid( + fmt::format("ByteArrayInputStream assert boundary failed: need length {}, read offset " + "{}, exceed length {}", + size, offset, length_)); + } + memcpy(buffer, buffer_ + offset, size); + return size; +} + +void ByteArrayInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t offset, + std::function&& callback) { + Result read_size = Read(buffer, size, offset); + Status status = Status::OK(); + if (read_size.ok() && static_cast(read_size.value()) != size) { + status = Status::Invalid(fmt::format( + "ByteArrayInputStream async read size {} != expected {}", read_size.value(), size)); + } else if (!read_size.ok()) { + status = read_size.status(); + } + callback(status); +} + +Status ByteArrayInputStream::Close() { + return Status::OK(); +} + +Result ByteArrayInputStream::GetUri() const { + return std::string(); +} +} // namespace paimon diff --git a/src/paimon/common/io/byte_array_input_stream_test.cpp b/src/paimon/common/io/byte_array_input_stream_test.cpp new file mode 100644 index 0000000..adac48f --- /dev/null +++ b/src/paimon/common/io/byte_array_input_stream_test.cpp @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/io/byte_array_input_stream.h" + +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(ByteArrayInputStreamTest, TestSimple) { + auto pool = GetDefaultPool(); + auto output_stream = std::make_unique(/*segment_size=*/8, pool); + std::string str = "abcdef"; + auto bytes = std::make_shared(str, pool.get()); + output_stream->WriteBytes(bytes); + auto out_bytes = MemorySegmentUtils::CopyToBytes(output_stream->Segments(), 0, + output_stream->CurrentSize(), pool.get()); + auto input_stream = + std::make_shared(out_bytes->data(), out_bytes->size()); + ASSERT_EQ(6, input_stream->Length().value()); + ASSERT_TRUE(input_stream->GetUri().value().empty()); + + // read from pos 1 + std::string value(4, '\0'); + ASSERT_EQ(4, input_stream->Read(value.data(), value.size(), /*offset=*/1).value()); + ASSERT_EQ("bcde", value); + ASSERT_EQ(0, input_stream->GetPos().value()); + + // seek to pos 2 + ASSERT_OK(input_stream->Seek(2, SeekOrigin::FS_SEEK_SET)); + ASSERT_EQ(4, input_stream->Read(value.data(), value.size()).value()); + ASSERT_EQ("cdef", value); + ASSERT_EQ(6, input_stream->GetPos().value()); + + // although seek to pos 2, read set offset 0 + ASSERT_OK(input_stream->Seek(2, SeekOrigin::FS_SEEK_SET)); + bool read_finished = false; + auto callback = [&](Status status) { + ASSERT_OK(status); + if (status.ok()) { + read_finished = true; + } + }; + input_stream->ReadAsync(value.data(), value.size(), /*offset=*/0, callback); + ASSERT_TRUE(read_finished); + ASSERT_EQ("abcd", value); + ASSERT_EQ(2, input_stream->GetPos().value()); + + // test exceed eof, seek to pos 3, want to read 4 bytes + ASSERT_OK(input_stream->Seek(-3, SeekOrigin::FS_SEEK_END)); + ASSERT_NOK_WITH_MSG( + input_stream->Read(value.data(), value.size()), + "assert boundary failed: need length 4, current position 3, exceed length 6"); + + // test invalid seek + ASSERT_NOK_WITH_MSG(input_stream->Seek(100, SeekOrigin::FS_SEEK_CUR), + "invalid seek, after seek, current pos 103, length 6"); + ASSERT_OK(input_stream->Close()); +} + +} // namespace paimon::test diff --git a/src/paimon/common/io/data_input_output_stream_test.cpp b/src/paimon/common/io/data_input_output_stream_test.cpp new file mode 100644 index 0000000..4e60637 --- /dev/null +++ b/src/paimon/common/io/data_input_output_stream_test.cpp @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/io/data_output_stream.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/io/buffered_input_stream.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/io/byte_order.h" +#include "paimon/io/data_input_stream.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +class DataInputOutputStreamTest : public ::testing::Test, + public ::testing::WithParamInterface { + protected: + void SetUp() override { + pool_ = GetDefaultPool(); + std::vector bytes_big_endian = { + 127, 125, 67, 127, -2, -57, 127, 127, -1, -1, -1, -1, -1, -1, + -3, 1, 0, 39, 84, 104, 105, 115, 32, 105, 115, 32, 97, 32, + 118, 101, 114, 121, 32, 118, 101, 114, 121, 32, 118, 101, 114, 121, + 32, 108, 111, 110, 103, 32, 115, 101, 110, 116, 101, 110, 99, 101, + 46, -26, -120, -111, -26, -104, -81, -28, -72, -128, -28, -72, -86, -25, + -78, -119, -27, -120, -73, -27, -116, -96, -17, -67, -98}; + serialized_bytes_big_endian_ = + std::make_shared(bytes_big_endian.size(), pool_.get()); + memcpy(serialized_bytes_big_endian_->data(), bytes_big_endian.data(), + bytes_big_endian.size()); + + std::vector bytes_little_endian = { + 127, 67, 125, 127, -57, -2, 127, -3, -1, -1, -1, -1, -1, -1, + 127, 1, 39, 0, 84, 104, 105, 115, 32, 105, 115, 32, 97, 32, + 118, 101, 114, 121, 32, 118, 101, 114, 121, 32, 118, 101, 114, 121, + 32, 108, 111, 110, 103, 32, 115, 101, 110, 116, 101, 110, 99, 101, + 46, -26, -120, -111, -26, -104, -81, -28, -72, -128, -28, -72, -86, -25, + -78, -119, -27, -120, -73, -27, -116, -96, -17, -67, -98}; + serialized_bytes_little_endian_ = + std::make_shared(bytes_little_endian.size(), pool_.get()); + memcpy(serialized_bytes_little_endian_->data(), bytes_little_endian.data(), + bytes_little_endian.size()); + } + + template + void WriteValues(T* data_output_stream) const { + (void)data_output_stream->WriteValue(static_cast(127)); // 1 byte + (void)data_output_stream->WriteValue(static_cast(32067)); // 2 bytes + (void)data_output_stream->WriteValue(static_cast(2147403647)); // 4 bytes + (void)data_output_stream->WriteValue(static_cast(9223372036854775805)); // 8 bytes + (void)data_output_stream->WriteValue(true); // 1 byte + std::string str1 = "This is a very very very long sentence."; + if constexpr (std::is_same_v) { + (void)data_output_stream->WriteString(str1); // 39 bytes + 2 bytes len + } else { + (void)data_output_stream->WriteString(str1); // 39 bytes + 2 bytes len + } + std::string str2 = "我是一个粉刷匠~"; // 24 bytes + auto bytes = std::make_shared(str2, pool_.get()); + (void)data_output_stream->WriteBytes(bytes); + } + + void CheckResult(const InputStream* input_stream, + const DataInputStream* data_input_stream) const { + ASSERT_EQ(127, data_input_stream->ReadValue().value()); + ASSERT_EQ(32067, data_input_stream->ReadValue().value()); + ASSERT_EQ((int32_t)2147403647, data_input_stream->ReadValue().value()); + ASSERT_EQ((int64_t)9223372036854775805, data_input_stream->ReadValue().value()); + ASSERT_EQ(true, data_input_stream->ReadValue().value()); + std::string str1 = "This is a very very very long sentence."; + ASSERT_EQ(str1, data_input_stream->ReadString().value()); + std::string str2 = "我是一个粉刷匠~"; // 24 bytes + auto bytes = std::make_shared(str2, pool_.get()); + auto read_bytes = std::make_shared(str2.length(), pool_.get()); + ASSERT_OK(data_input_stream->ReadBytes(read_bytes.get())); + ASSERT_EQ(*bytes, *read_bytes); + // test GetPos + ASSERT_OK_AND_ASSIGN(int64_t in_pos, input_stream->GetPos()); + ASSERT_EQ(1 + 2 + 4 + 8 + 1 + 41 + 24, in_pos); + // read eof, return bad status + ASSERT_NOK(data_input_stream->ReadString()); + // test seek + ASSERT_OK(data_input_stream->Seek(3)); + ASSERT_EQ((int32_t)2147403647, data_input_stream->ReadValue().value()); + } + void TearDown() override {} + + private: + std::shared_ptr pool_; + std::shared_ptr serialized_bytes_big_endian_; + std::shared_ptr serialized_bytes_little_endian_; +}; +INSTANTIATE_TEST_SUITE_P(ByteOrder, DataInputOutputStreamTest, + ::testing::Values(ByteOrder::PAIMON_BIG_ENDIAN, + ByteOrder::PAIMON_LITTLE_ENDIAN)); + +TEST_P(DataInputOutputStreamTest, TestFileStream) { + ByteOrder byte_order = GetParam(); + auto file_system = std::make_unique(); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + std::string test_file = dir->Str() + "/test"; + // prepare output stream + ASSERT_OK_AND_ASSIGN(std::shared_ptr output_stream, + file_system->Create(test_file, /*overwrite=*/true)); + auto data_output_stream = std::make_unique(output_stream); + data_output_stream->SetOrder(byte_order); + WriteValues(data_output_stream.get()); + ASSERT_OK_AND_ASSIGN(int64_t out_pos, output_stream->GetPos()); + ASSERT_EQ(1 + 2 + 4 + 8 + 1 + 41 + 24, out_pos); + ASSERT_OK(output_stream->Close()); + // check file content + ASSERT_OK_AND_ASSIGN(bool exist, file_system->Exists(test_file)); + ASSERT_TRUE(exist); + std::string out_str; + ASSERT_OK(file_system->ReadFile(test_file, &out_str)); + auto out_bytes = std::make_shared(out_str, pool_.get()); + // print_hex(out_str); + if (byte_order == ByteOrder::PAIMON_BIG_ENDIAN) { + ASSERT_EQ(*serialized_bytes_big_endian_, *out_bytes); + } else { + ASSERT_EQ(*serialized_bytes_little_endian_, *out_bytes); + } + + // prepare input stream + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, file_system->Open(test_file)); + auto data_input_stream = std::make_unique(input_stream); + data_input_stream->SetOrder(byte_order); + CheckResult(input_stream.get(), data_input_stream.get()); +} + +TEST_P(DataInputOutputStreamTest, TestInMemoryByteArrayStream) { + ByteOrder byte_order = GetParam(); + // prepare output stream + auto data_output_stream = + std::make_unique(/*segment_size=*/8, pool_); + data_output_stream->SetOrder(byte_order); + WriteValues(data_output_stream.get()); + ASSERT_EQ(1 + 2 + 4 + 8 + 1 + 41 + 24, data_output_stream->CurrentSize()); + auto out_bytes = MemorySegmentUtils::CopyToBytes( + data_output_stream->Segments(), 0, data_output_stream->CurrentSize(), pool_.get()); + if (byte_order == ByteOrder::PAIMON_BIG_ENDIAN) { + ASSERT_EQ(*serialized_bytes_big_endian_, *out_bytes); + } else { + ASSERT_EQ(*serialized_bytes_little_endian_, *out_bytes); + } + // prepare input stream + auto input_stream = + std::make_shared(out_bytes->data(), out_bytes->size()); + auto data_input_stream = std::make_unique(input_stream); + data_input_stream->SetOrder(byte_order); + CheckResult(input_stream.get(), data_input_stream.get()); +} + +TEST_P(DataInputOutputStreamTest, TestBufferedStream) { + ByteOrder byte_order = GetParam(); + // prepare output stream + auto data_output_stream = + std::make_unique(/*segment_size=*/8, pool_); + data_output_stream->SetOrder(byte_order); + WriteValues(data_output_stream.get()); + ASSERT_EQ(1 + 2 + 4 + 8 + 1 + 41 + 24, data_output_stream->CurrentSize()); + auto out_bytes = MemorySegmentUtils::CopyToBytes( + data_output_stream->Segments(), 0, data_output_stream->CurrentSize(), pool_.get()); + if (byte_order == ByteOrder::PAIMON_BIG_ENDIAN) { + ASSERT_EQ(*serialized_bytes_big_endian_, *out_bytes); + } else { + ASSERT_EQ(*serialized_bytes_little_endian_, *out_bytes); + } + // prepare input stream + auto input_stream = + std::make_unique(out_bytes->data(), out_bytes->size()); + auto buffered_input_stream = std::make_shared( + std::move(input_stream), /*buffer_size=*/4, pool_.get()); + auto data_input_stream = std::make_unique(buffered_input_stream); + data_input_stream->SetOrder(byte_order); + CheckResult(buffered_input_stream.get(), data_input_stream.get()); +} +} // namespace paimon::test diff --git a/src/paimon/common/io/data_input_stream.cpp b/src/paimon/common/io/data_input_stream.cpp new file mode 100644 index 0000000..63f8338 --- /dev/null +++ b/src/paimon/common/io/data_input_stream.cpp @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/io/data_input_stream.h" + +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/utils/math.h" +#include "paimon/fs/file_system.h" +#include "paimon/memory/bytes.h" + +namespace paimon { + +DataInputStream::DataInputStream(const std::shared_ptr& input_stream) + : input_stream_(input_stream) { + assert(input_stream_); +} + +Status DataInputStream::Seek(int64_t offset) const { + return input_stream_->Seek(offset, SeekOrigin::FS_SEEK_SET); +} + +template +Result DataInputStream::ReadValue() const { + static_assert(std::is_trivially_copyable_v, "T must be trivially copyable"); + int32_t read_length = sizeof(T); + PAIMON_RETURN_NOT_OK(AssertBoundary(read_length)); + T value; + PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_length, + input_stream_->Read(reinterpret_cast(&value), read_length)); + PAIMON_RETURN_NOT_OK(AssertReadLength(read_length, actual_read_length)); + if (NeedSwap()) { + value = EndianSwapValue(value); + } + return value; +} + +Status DataInputStream::ReadBytes(Bytes* bytes) const { + int32_t read_length = bytes->size(); + PAIMON_RETURN_NOT_OK(AssertBoundary(read_length)); + PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_length, + input_stream_->Read(bytes->data(), read_length)); + PAIMON_RETURN_NOT_OK(AssertReadLength(read_length, actual_read_length)); + return Status::OK(); +} + +Status DataInputStream::Read(char* data, uint32_t size) const { + PAIMON_RETURN_NOT_OK(AssertBoundary(size)); + PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_length, input_stream_->Read(data, size)); + PAIMON_RETURN_NOT_OK(AssertReadLength(size, actual_read_length)); + return Status::OK(); +} + +Result DataInputStream::ReadString() const { + uint16_t read_length = 0; + PAIMON_ASSIGN_OR_RAISE(read_length, ReadValue()); + PAIMON_RETURN_NOT_OK(AssertBoundary(read_length)); + std::string value(read_length, '\0'); + PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_length, + input_stream_->Read(value.data(), read_length)); + PAIMON_RETURN_NOT_OK(AssertReadLength(read_length, actual_read_length)); + return value; +} + +Result DataInputStream::GetPos() const { + return input_stream_->GetPos(); +} + +Result DataInputStream::Length() const { + return input_stream_->Length(); +} + +Status DataInputStream::AssertReadLength(int32_t read_length, int32_t actual_read_length) const { + if (read_length != actual_read_length) { + return Status::Invalid( + fmt::format("assert read length failed: read length not match, read length {}, actual " + "read length {}", + read_length, actual_read_length)); + } + return Status::OK(); +} + +Status DataInputStream::AssertBoundary(int32_t need_length) const { + // TODO(jinli.zjw): Store current_pos and file_length as member variables to reduce the overhead + // of I/O calls. + PAIMON_ASSIGN_OR_RAISE(int64_t pos, input_stream_->GetPos()); + PAIMON_ASSIGN_OR_RAISE(uint64_t length, input_stream_->Length()); + if (pos + need_length > static_cast(length)) { + return Status::Invalid( + fmt::format("DataInputStream assert boundary failed: need length {}, current position " + "{}, exceed length {}", + need_length, pos, length)); + } + return Status::OK(); +} + +bool DataInputStream::NeedSwap() const { + return SystemByteOrder() != byte_order_; +} + +template Result DataInputStream::ReadValue() const; +template Result DataInputStream::ReadValue() const; +template Result DataInputStream::ReadValue() const; +template Result DataInputStream::ReadValue() const; +template Result DataInputStream::ReadValue() const; +template Result DataInputStream::ReadValue() const; +template Result DataInputStream::ReadValue() const; +template Result DataInputStream::ReadValue() const; +template Result DataInputStream::ReadValue() const; +} // namespace paimon diff --git a/src/paimon/common/io/data_output_stream.cpp b/src/paimon/common/io/data_output_stream.cpp new file mode 100644 index 0000000..14d7463 --- /dev/null +++ b/src/paimon/common/io/data_output_stream.cpp @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/io/data_output_stream.h" + +#include "fmt/format.h" +#include "paimon/memory/bytes.h" +#include "paimon/result.h" + +namespace paimon { +DataOutputStream::DataOutputStream(const std::shared_ptr& output_stream) + : output_stream_(output_stream) { + assert(output_stream_); +} + +Status DataOutputStream::WriteBytes(const std::shared_ptr& bytes) { + int32_t write_length = bytes->size(); + PAIMON_ASSIGN_OR_RAISE(int32_t actual_write_length, + output_stream_->Write(bytes->data(), write_length)); + PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, actual_write_length)); + return Status::OK(); +} + +Status DataOutputStream::WriteString(const std::string& value) { + uint16_t write_length = value.size(); + PAIMON_RETURN_NOT_OK(WriteValue(write_length)); + PAIMON_ASSIGN_OR_RAISE(int32_t actual_write_length, + output_stream_->Write(value.data(), write_length)); + PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, actual_write_length)); + return Status::OK(); +} + +Status DataOutputStream::AssertWriteLength(int32_t write_length, + int32_t actual_write_length) const { + if (write_length != actual_write_length) { + return Status::Invalid(fmt::format( + "assert write length failed: write length not match, write length {}, actual " + "write length {}", + write_length, actual_write_length)); + } + return Status::OK(); +} + +bool DataOutputStream::NeedSwap() const { + return SystemByteOrder() != byte_order_; +} + +template Status DataOutputStream::WriteValue(const bool&); +template Status DataOutputStream::WriteValue(const char&); +template Status DataOutputStream::WriteValue(const int16_t&); +template Status DataOutputStream::WriteValue(const uint16_t&); +template Status DataOutputStream::WriteValue(const int32_t&); +template Status DataOutputStream::WriteValue(const int64_t&); +} // namespace paimon diff --git a/src/paimon/common/io/data_output_stream.h b/src/paimon/common/io/data_output_stream.h new file mode 100644 index 0000000..ace5976 --- /dev/null +++ b/src/paimon/common/io/data_output_stream.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/common/utils/math.h" +#include "paimon/fs/file_system.h" +#include "paimon/io/byte_order.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { +class Bytes; +class OutputStream; + +// data output stream, support WriteValue() and WriteString() from OutputStream, also do big-endian +// conversion to ensure cross-language compatibility +class PAIMON_EXPORT DataOutputStream { + public: + explicit DataOutputStream(const std::shared_ptr& output_stream); + + template + Status WriteValue(const T& value) { + static_assert(std::is_trivially_copyable_v, "T must be trivially copyable"); + T write_value = value; + if (NeedSwap()) { + write_value = EndianSwapValue(value); + } + int32_t write_length = sizeof(T); + PAIMON_ASSIGN_OR_RAISE( + int32_t actual_write_length, + output_stream_->Write(reinterpret_cast(&write_value), write_length)); + PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, actual_write_length)); + return Status::OK(); + } + + Status WriteBytes(const std::shared_ptr& bytes); + + /// First write length (int16), then write string bytes. + Status WriteString(const std::string& value); + + void SetOrder(ByteOrder order) { + byte_order_ = order; + } + + private: + Status AssertWriteLength(int32_t write_length, int32_t actual_write_length) const; + + bool NeedSwap() const; + + private: + std::shared_ptr output_stream_; + + ByteOrder byte_order_ = ByteOrder::PAIMON_BIG_ENDIAN; +}; + +} // namespace paimon diff --git a/src/paimon/common/io/memory_segment_output_stream.cpp b/src/paimon/common/io/memory_segment_output_stream.cpp new file mode 100644 index 0000000..498b5ab --- /dev/null +++ b/src/paimon/common/io/memory_segment_output_stream.cpp @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/io/memory_segment_output_stream.h" + +#include + +#include "paimon/memory/bytes.h" + +namespace paimon { +class MemoryPool; + +MemorySegmentOutputStream::MemorySegmentOutputStream(int32_t segment_size, + const std::shared_ptr& pool) + : segment_size_(segment_size), pool_(pool) { + Advance(); +} + +void MemorySegmentOutputStream::Advance() { + current_segment_ = NextSegment(); + position_in_segment_ = 0; +} + +MemorySegment MemorySegmentOutputStream::NextSegment() { + MemorySegment memory_segment = MemorySegment::AllocateHeapMemory(segment_size_, pool_.get()); + memory_segments_.push_back(memory_segment); + return memory_segment; +} + +void MemorySegmentOutputStream::WriteBytes(const std::shared_ptr& bytes) { + auto segment = MemorySegment::Wrap(bytes); + Write(segment, 0, segment.Size()); +} + +void MemorySegmentOutputStream::WriteString(const std::string& str) { + WriteValue(str.length()); + Write(str.data(), str.size()); +} + +void MemorySegmentOutputStream::Write(const char* data, uint32_t size) { + auto bytes = std::make_shared(size, pool_.get()); + memcpy(bytes->data(), data, size); + auto segment = MemorySegment::Wrap(bytes); + Write(segment, 0, segment.Size()); +} + +void MemorySegmentOutputStream::Write(const MemorySegment& segment, int32_t offset, int32_t len) { + int32_t remaining = segment_size_ - position_in_segment_; + if (remaining >= len) { + segment.CopyTo(offset, ¤t_segment_, position_in_segment_, len); + position_in_segment_ += len; + } else { + if (remaining == 0) { + Advance(); + remaining = segment_size_ - position_in_segment_; + } + while (true) { + int32_t to_put = std::min(remaining, len); + segment.CopyTo(offset, ¤t_segment_, position_in_segment_, to_put); + offset += to_put; + len -= to_put; + + if (len > 0) { + position_in_segment_ = segment_size_; + Advance(); + remaining = segment_size_ - position_in_segment_; + } else { + position_in_segment_ += to_put; + break; + } + } + } +} + +int64_t MemorySegmentOutputStream::CurrentSize() const { + return segment_size_ * (memory_segments_.size() - 1) + CurrentPositionInSegment(); +} + +bool MemorySegmentOutputStream::NeedSwap() const { + return SystemByteOrder() != byte_order_; +} + +} // namespace paimon diff --git a/src/paimon/common/io/memory_segment_output_stream.h b/src/paimon/common/io/memory_segment_output_stream.h new file mode 100644 index 0000000..dbe3816 --- /dev/null +++ b/src/paimon/common/io/memory_segment_output_stream.h @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/common/memory/memory_segment.h" +#include "paimon/common/utils/math.h" +#include "paimon/io/byte_order.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +namespace paimon { +class Bytes; +class MemoryPool; + +// TODO(xinyu.lxy): use DataOutputStream to do big-endian conversion +class PAIMON_EXPORT MemorySegmentOutputStream { + public: + MemorySegmentOutputStream(int32_t segment_size, const std::shared_ptr& pool); + + static constexpr int32_t DEFAULT_SEGMENT_SIZE = 64 * 1024; + + void Write(const MemorySegment& segment, int32_t offset, int32_t len); + + template + void WriteValue(T v); + + /// First write length (int16), then write string bytes. + void WriteString(const std::string& str); + + void WriteBytes(const std::shared_ptr& bytes); + + void Write(const char* data, uint32_t size); + + int32_t CurrentPositionInSegment() const { + return position_in_segment_; + } + + int64_t CurrentSize() const; + + const std::vector& Segments() const { + return memory_segments_; + } + + void SetOrder(ByteOrder order) { + byte_order_ = order; + } + + private: + template + void WriteValueImpl(T v); + + void Advance(); + MemorySegment NextSegment(); + + bool NeedSwap() const; + + private: + int32_t segment_size_; + int32_t position_in_segment_; + std::shared_ptr pool_; + MemorySegment current_segment_; + std::vector memory_segments_; + + ByteOrder byte_order_ = ByteOrder::PAIMON_BIG_ENDIAN; +}; + +template +void MemorySegmentOutputStream::WriteValueImpl(T v) { + if (position_in_segment_ <= segment_size_ - static_cast(sizeof(T))) { + current_segment_.PutValue(position_in_segment_, v); + position_in_segment_ += sizeof(T); + } else if (position_in_segment_ == segment_size_) { + Advance(); + WriteValueImpl(v); + } else { + for (size_t i = 0; i < sizeof(T); i++) { + // because of endian swap, just copy the bytes of input v + uint8_t onebyte = 0; + memcpy(&onebyte, (reinterpret_cast(&v)) + i, sizeof(onebyte)); + WriteValueImpl(onebyte); + } + } +} + +template +void MemorySegmentOutputStream::WriteValue(T v) { + static_assert(std::is_trivially_copyable_v, "T must be trivially copyable"); + if (NeedSwap()) { + v = EndianSwapValue(v); + } + return WriteValueImpl(v); +} + +} // namespace paimon diff --git a/src/paimon/common/io/memory_segment_output_stream_test.cpp b/src/paimon/common/io/memory_segment_output_stream_test.cpp new file mode 100644 index 0000000..61fbfe3 --- /dev/null +++ b/src/paimon/common/io/memory_segment_output_stream_test.cpp @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/io/memory_segment_output_stream.h" + +#include + +#include "gtest/gtest.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/io/data_input_stream.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::test { + +class MemorySegmentOutputStreamTest : public ::testing::Test, + public ::testing::WithParamInterface {}; + +INSTANTIATE_TEST_SUITE_P(ByteOrder, MemorySegmentOutputStreamTest, + ::testing::Values(ByteOrder::PAIMON_BIG_ENDIAN, + ByteOrder::PAIMON_LITTLE_ENDIAN)); + +TEST_P(MemorySegmentOutputStreamTest, TestSimple) { + ByteOrder byte_order = GetParam(); + auto pool = GetDefaultPool(); + MemorySegmentOutputStream out(/*segment_size=*/8, pool); + out.SetOrder(byte_order); + out.WriteValue(static_cast(127)); // 1 byte + out.WriteValue(static_cast(32067)); // 2 bytes + out.WriteValue(static_cast(2147403647)); // 4 bytes + // move to next segment + out.WriteValue(static_cast(9223372036854775805)); // 8 bytes + out.WriteValue(true); // 1 byte + std::string str1 = "This is a very very very long sentence."; + out.WriteString(str1); // 39 bytes + 2 bytes len + ASSERT_EQ(out.CurrentSize(), 1 + 1 + 2 + 4 + 8 + 41); + std::string str2 = "yes"; + out.WriteString(str2); // 3 bytes + 2 bytes len + std::string str3 = "I have a dream."; + out.WriteString(str3); // 15 bytes + 2 bytes len + std::string str4 = "hello"; + out.WriteValue(str4.size()); // 4 bytes + out.Write(str4.data(), str4.size()); // 5 bytes + + ASSERT_EQ(out.CurrentSize(), 1 + 1 + 2 + 4 + 8 + 41 + 5 + 17 + 4 + 5); + + auto bytes = MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); + + auto input_stream = std::make_shared(bytes->data(), bytes->size()); + DataInputStream in(input_stream); + in.SetOrder(byte_order); + ASSERT_EQ(127, in.ReadValue().value()); + ASSERT_EQ(32067, in.ReadValue().value()); + ASSERT_EQ(2147403647, in.ReadValue().value()); + ASSERT_EQ(9223372036854775805l, in.ReadValue().value()); + ASSERT_EQ(true, in.ReadValue().value()); + ASSERT_EQ(str1, in.ReadString().value()); + ASSERT_EQ(str2, in.ReadString().value()); + ASSERT_EQ(str3, in.ReadString().value()); + ASSERT_EQ(str4.size(), in.ReadValue().value()); + std::string read_str4(str4.size(), '\0'); + ASSERT_OK(in.Read(read_str4.data(), str4.size())); + ASSERT_EQ(read_str4, str4); + ASSERT_EQ(out.CurrentSize(), input_stream->GetPos().value()); +} + +} // namespace paimon::test diff --git a/src/paimon/common/io/offset_input_stream.cpp b/src/paimon/common/io/offset_input_stream.cpp new file mode 100644 index 0000000..69593cd --- /dev/null +++ b/src/paimon/common/io/offset_input_stream.cpp @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/io/offset_input_stream.h" + +#include + +#include "fmt/format.h" +#include "paimon/macros.h" + +namespace paimon { +Result> OffsetInputStream::Create( + const std::shared_ptr& wrapped, int64_t length, int64_t offset) { + if (PAIMON_UNLIKELY(wrapped == nullptr)) { + return Status::Invalid("input stream is null pointer"); + } + if (PAIMON_UNLIKELY(offset < 0)) { + return Status::Invalid(fmt::format("offset {} is less than 0", offset)); + } + if (PAIMON_UNLIKELY(length < -1)) { + return Status::Invalid(fmt::format("length {} is less than -1", length)); + } + PAIMON_ASSIGN_OR_RAISE(uint64_t total_length, wrapped->Length()); + if (PAIMON_UNLIKELY((uint64_t)offset > total_length)) { + return Status::Invalid( + fmt::format("offset {} exceed total length {}", offset, total_length)); + } + if (length == -1) { + // length == -1 means it's dynamic length, should read to the end + length = total_length - offset; + } + if (PAIMON_UNLIKELY((uint64_t)offset + (uint64_t)length > total_length)) { + return Status::Invalid(fmt::format("offset {} + length {} exceed total length {}", offset, + length, total_length)); + } + PAIMON_RETURN_NOT_OK(wrapped->Seek(offset, SeekOrigin::FS_SEEK_SET)); + return std::unique_ptr( + new OffsetInputStream(std::move(wrapped), length, offset)); +} + +OffsetInputStream::OffsetInputStream(const std::shared_ptr& wrapped, int64_t length, + int64_t offset) + : wrapped_(wrapped), length_(length), offset_(offset) {} + +Status OffsetInputStream::Seek(int64_t offset, SeekOrigin origin) { + switch (origin) { + case SeekOrigin::FS_SEEK_SET: { + inner_position_ = offset; + PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_)); + return wrapped_->Seek(offset_ + inner_position_, SeekOrigin::FS_SEEK_SET); + } + case SeekOrigin::FS_SEEK_CUR: { + inner_position_ += offset; + PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_)); + return wrapped_->Seek(offset, SeekOrigin::FS_SEEK_CUR); + } + case SeekOrigin::FS_SEEK_END: { + inner_position_ = length_ + offset; + PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_)); + return wrapped_->Seek(offset_ + inner_position_, SeekOrigin::FS_SEEK_SET); + } + default: + return Status::Invalid( + "invalid SeekOrigin, only support FS_SEEK_SET, FS_SEEK_CUR, and FS_SEEK_END"); + } + return Status::OK(); +} + +Result OffsetInputStream::Read(char* buffer, uint32_t size) { + PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_ + size)); + inner_position_ += size; + return wrapped_->Read(buffer, size); +} + +Result OffsetInputStream::Read(char* buffer, uint32_t size, uint64_t offset) { + PAIMON_RETURN_NOT_OK(AssertBoundary(offset)); + PAIMON_RETURN_NOT_OK(AssertBoundary(offset + size)); + return wrapped_->Read(buffer, size, offset_ + offset); +} + +void OffsetInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t offset, + std::function&& callback) { + auto status = AssertBoundary(offset); + if (!status.ok()) { + callback(status); + return; + } + status = AssertBoundary(offset + size); + if (!status.ok()) { + callback(status); + return; + } + wrapped_->ReadAsync(buffer, size, offset_ + offset, std::move(callback)); +} + +Status OffsetInputStream::Close() { + return wrapped_->Close(); +} + +Result OffsetInputStream::GetUri() const { + return wrapped_->GetUri(); +} + +Result OffsetInputStream::GetPos() const { + return inner_position_; +} + +Result OffsetInputStream::Length() const { + return length_; +} + +Status OffsetInputStream::AssertBoundary(int32_t inner_pos) const { + if (inner_pos < 0 || inner_pos > length_) { + return Status::Invalid( + fmt::format("OffsetInputStream assert boundary failed: inner pos {} exceed length {}", + inner_pos, length_)); + } + return Status::OK(); +} + +} // namespace paimon diff --git a/src/paimon/common/io/offset_input_stream.h b/src/paimon/common/io/offset_input_stream.h new file mode 100644 index 0000000..38f97ef --- /dev/null +++ b/src/paimon/common/io/offset_input_stream.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/fs/file_system.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/visibility.h" + +namespace paimon { +/// A `InputStream` wrapping another `InputStream` with offset and length. +class PAIMON_EXPORT OffsetInputStream : public InputStream { + public: + static Result> Create( + const std::shared_ptr& wrapped, int64_t length, int64_t offset); + ~OffsetInputStream() override = default; + + Status Seek(int64_t offset, SeekOrigin origin) override; + + Result GetPos() const override; + + Result Read(char* buffer, uint32_t size) override; + + Result Read(char* buffer, uint32_t size, uint64_t offset) override; + + void ReadAsync(char* buffer, uint32_t size, uint64_t offset, + std::function&& callback) override; + + Result Length() const override; + + Status Close() override; + + Result GetUri() const override; + + private: + OffsetInputStream(const std::shared_ptr& wrapped, int64_t length, int64_t offset); + Status AssertBoundary(int32_t inner_pos) const; + + private: + std::shared_ptr wrapped_; + const int64_t length_; + const int64_t offset_; + int64_t inner_position_ = 0; +}; +} // namespace paimon diff --git a/src/paimon/common/io/offset_input_stream_test.cpp b/src/paimon/common/io/offset_input_stream_test.cpp new file mode 100644 index 0000000..3734650 --- /dev/null +++ b/src/paimon/common/io/offset_input_stream_test.cpp @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/io/offset_input_stream.h" + +#include +#include + +#include "gtest/gtest.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(OffsetInputStreamTest, TestBasicConstruction) { + auto inner_stream = std::make_unique("abcdefghij", /*length=*/10); + ASSERT_OK_AND_ASSIGN(auto offset_stream, OffsetInputStream::Create(std::move(inner_stream), + /*length=*/6, /*offset=*/2)); + + ASSERT_OK_AND_ASSIGN(auto length, offset_stream->Length()); + ASSERT_EQ(6, length); + + ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos()); + ASSERT_EQ(0, pos); + + ASSERT_OK_AND_ASSIGN(auto uri, offset_stream->GetUri()); + ASSERT_EQ("", uri); +} + +TEST(OffsetInputStreamTest, TestSeekOperations) { + auto inner_stream = std::make_unique("abcdefghij", /*length=*/10); + ASSERT_OK_AND_ASSIGN(auto offset_stream, OffsetInputStream::Create(std::move(inner_stream), + /*length=*/6, /*offset=*/2)); + + // Test FS_SEEK_SET + ASSERT_OK(offset_stream->Seek(3, SeekOrigin::FS_SEEK_SET)); + ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos()); + ASSERT_EQ(3, pos); + + // Test FS_SEEK_CUR + ASSERT_OK(offset_stream->Seek(1, SeekOrigin::FS_SEEK_CUR)); + ASSERT_OK_AND_ASSIGN(pos, offset_stream->GetPos()); + ASSERT_EQ(4, pos); + + // Test FS_SEEK_END + ASSERT_OK(offset_stream->Seek(-2, SeekOrigin::FS_SEEK_END)); + ASSERT_OK_AND_ASSIGN(pos, offset_stream->GetPos()); + ASSERT_EQ(4, pos); + + // Test boundary conditions + ASSERT_NOK(offset_stream->Seek(-10, SeekOrigin::FS_SEEK_SET)); + ASSERT_NOK(offset_stream->Seek(10, SeekOrigin::FS_SEEK_SET)); + ASSERT_NOK(offset_stream->Seek(10, SeekOrigin::FS_SEEK_CUR)); + ASSERT_NOK(offset_stream->Seek(10, SeekOrigin::FS_SEEK_END)); +} + +TEST(OffsetInputStreamTest, TestReadOperations) { + auto inner_stream = std::make_unique("abcdefghij", /*length=*/10); + ASSERT_OK_AND_ASSIGN(auto offset_stream, OffsetInputStream::Create(std::move(inner_stream), + /*length=*/6, /*offset=*/2)); + + // Test sequential read + std::string buffer(4, '\0'); + ASSERT_OK_AND_ASSIGN(auto bytes_read, offset_stream->Read(buffer.data(), /*size=*/4)); + ASSERT_EQ(4, bytes_read); + ASSERT_EQ("cdef", buffer); + + ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos()); + ASSERT_EQ(4, pos); + + // Test read with offset + std::string buffer2(2, '\0'); + ASSERT_OK_AND_ASSIGN(bytes_read, offset_stream->Read(buffer2.data(), /*size=*/2, /*offset=*/1)); + ASSERT_EQ(2, bytes_read); + ASSERT_EQ("de", buffer2); + + // Position should not change after offset read + ASSERT_OK_AND_ASSIGN(pos, offset_stream->GetPos()); + ASSERT_EQ(4, pos); + + // Test close + ASSERT_OK(offset_stream->Close()); +} + +TEST(OffsetInputStreamTest, TestReadAsync) { + auto inner_stream = std::make_unique("abcdefghij", /*length=*/10); + ASSERT_OK_AND_ASSIGN(auto offset_stream, OffsetInputStream::Create(std::move(inner_stream), + /*length=*/6, /*offset=*/2)); + + std::string buffer1(3, '\0'), buffer2(2, '\0'); + bool callback_called1 = false, callback_called2 = false; + Status callback_status1, callback_status2; + + auto callback1 = [&](Status status) { + callback_called1 = true; + callback_status1 = status; + }; + auto callback2 = [&](Status status) { + callback_called2 = true; + callback_status2 = status; + }; + + offset_stream->ReadAsync(buffer1.data(), 3, 1, std::move(callback1)); + offset_stream->ReadAsync(buffer2.data(), 2, 3, std::move(callback2)); + + ASSERT_TRUE(callback_called1); + ASSERT_OK(callback_status1); + ASSERT_EQ("def", buffer1); + ASSERT_TRUE(callback_called2); + ASSERT_OK(callback_status2); + ASSERT_EQ("fg", buffer2); + + // Position should not change after offset read + ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos()); + ASSERT_EQ(0, pos); +} + +TEST(OffsetInputStreamTest, TestBoundaryValidation) { + auto inner_stream = std::make_unique("abcdefghij", /*length=*/10); + ASSERT_OK_AND_ASSIGN(auto offset_stream, OffsetInputStream::Create(std::move(inner_stream), + /*length=*/6, /*offset=*/2)); + + // Test read beyond boundary + std::string buffer(10, '\0'); + ASSERT_NOK_WITH_MSG(offset_stream->Read(buffer.data(), /*size=*/10), + "assert boundary failed: inner pos 10 exceed length 6"); + + // Test offset read beyond boundary + ASSERT_NOK_WITH_MSG(offset_stream->Read(buffer.data(), /*size=*/4, /*offset=*/5), + "assert boundary failed: inner pos 9 exceed length 6"); +} + +TEST(OffsetInputStreamTest, TestReadWithUnspecifiedLength) { + auto inner_stream = std::make_unique("abcdefghij", /*length=*/10); + // Use -1 for length to test dynamic length calculation + ASSERT_OK_AND_ASSIGN( + auto offset_stream, + OffsetInputStream::Create(std::move(inner_stream), /*length=*/-1, /*offset=*/2)); + + // Test that length is calculated correctly + ASSERT_OK_AND_ASSIGN(auto length, offset_stream->Length()); + // Should be total length (10) minus offset (2) = 8 + ASSERT_EQ(8, length); + + // Test sequential read within the calculated bounds + std::string buffer(4, '\0'); + ASSERT_OK_AND_ASSIGN(auto bytes_read, offset_stream->Read(buffer.data(), /*size=*/4)); + ASSERT_EQ(4, bytes_read); + ASSERT_EQ("cdef", buffer); + + ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos()); + ASSERT_EQ(4, pos); + + // Test read with offset within the calculated bounds + std::string buffer2(3, '\0'); + ASSERT_OK_AND_ASSIGN(bytes_read, offset_stream->Read(buffer2.data(), /*size=*/3, /*offset=*/5)); + ASSERT_EQ(3, bytes_read); + ASSERT_EQ("hij", buffer2); + + // Position should not change after offset read + ASSERT_OK_AND_ASSIGN(pos, offset_stream->GetPos()); + ASSERT_EQ(4, pos); + + // Test boundary validation with dynamic length + std::string buffer3(10, '\0'); + ASSERT_NOK_WITH_MSG(offset_stream->Read(buffer3.data(), /*size=*/10), + "assert boundary failed: inner pos 14 exceed length 8"); +} + +TEST(OffsetInputStreamTest, TestInvalidParameters) { + // Test null wrapped stream + ASSERT_NOK_WITH_MSG(OffsetInputStream::Create(nullptr, /*length=*/6, /*offset=*/2), + "input stream is null pointer"); + + // Test negative offset + auto inner_stream = std::make_unique("abcdefghij", /*length=*/10); + ASSERT_NOK_WITH_MSG( + OffsetInputStream::Create(std::move(inner_stream), /*length=*/6, /*offset=*/-1), + "offset -1 is less than 0"); + + // Test length less than -1 + inner_stream = std::make_unique("abcdefghij", /*length=*/10); + ASSERT_NOK_WITH_MSG( + OffsetInputStream::Create(std::move(inner_stream), /*length=*/-2, /*offset=*/2), + "length -2 is less than -1"); + + // Test length + offset beyond wrapped stream length + inner_stream = std::make_unique("abcdefghij", /*length=*/10); + ASSERT_NOK_WITH_MSG( + OffsetInputStream::Create(std::move(inner_stream), /*length=*/8, /*offset=*/7), + "offset 7 + length 8 exceed total length 10"); + + // Test dynamic length with offset beyond wrapped stream length + inner_stream = std::make_unique("abcdefghij", /*length=*/10); + ASSERT_NOK_WITH_MSG( + OffsetInputStream::Create(std::move(inner_stream), /*length=*/-1, /*offset=*/15), + "offset 15 exceed total length 10"); +} + +} // namespace paimon::test