From b74434f0b4e31c44a313e1cb4b1997b55183dec6 Mon Sep 17 00:00:00 2001 From: "jinli.zjw" Date: Tue, 2 Jun 2026 16:24:28 +0800 Subject: [PATCH] feat(fs): introduce file system tests --- src/paimon/common/fs/external_path_provider.h | 74 + .../common/fs/external_path_provider_test.cpp | 68 + src/paimon/common/fs/file_system_test.cpp | 1407 +++++++++++++++++ .../common/fs/resolving_file_system_test.cpp | 235 +++ 4 files changed, 1784 insertions(+) create mode 100644 src/paimon/common/fs/external_path_provider.h create mode 100644 src/paimon/common/fs/external_path_provider_test.cpp create mode 100644 src/paimon/common/fs/file_system_test.cpp create mode 100644 src/paimon/common/fs/resolving_file_system_test.cpp diff --git a/src/paimon/common/fs/external_path_provider.h b/src/paimon/common/fs/external_path_provider.h new file mode 100644 index 0000000..568d3ac --- /dev/null +++ b/src/paimon/common/fs/external_path_provider.h @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/common/utils/path_util.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { +/// Provider for external data paths. +class ExternalPathProvider { + public: + static Result> Create( + const std::vector& external_table_paths, + const std::string& relative_bucket_path) { + if (external_table_paths.empty()) { + return Status::Invalid("external table paths cannot be empty"); + } + return std::unique_ptr( + new ExternalPathProvider(external_table_paths, relative_bucket_path)); + } + + /// Get the next external data path. + /// + /// @return the next external data path + std::string GetNextExternalDataPath(const std::string& file_name) { + position_++; + if (position_ == external_table_paths_.size()) { + position_ = 0; + } + return PathUtil::JoinPath( + PathUtil::JoinPath(external_table_paths_[position_], relative_bucket_path_), file_name); + } + + private: + ExternalPathProvider(const std::vector& external_table_paths, + const std::string& relative_bucket_path) + : external_table_paths_(external_table_paths), relative_bucket_path_(relative_bucket_path) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dist(0, external_table_paths_.size() - 1); + position_ = dist(gen); + } + + private: + std::vector external_table_paths_; + std::string relative_bucket_path_; + size_t position_; +}; +} // namespace paimon diff --git a/src/paimon/common/fs/external_path_provider_test.cpp b/src/paimon/common/fs/external_path_provider_test.cpp new file mode 100644 index 0000000..6893bfb --- /dev/null +++ b/src/paimon/common/fs/external_path_provider_test.cpp @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/fs/external_path_provider.h" + +#include + +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(ExternalPathProviderTest, TestInvalidExternalDataPath) { + std::vector external_table_paths; + std::string relative_bucket_path = "p0=1/p1=0/bucket-0"; + + auto provider = ExternalPathProvider::Create(external_table_paths, relative_bucket_path); + ASSERT_NOK_WITH_MSG(provider.status(), "external table paths cannot be empty"); +} + +TEST(ExternalPathProviderTest, TestGetNextExternalDataPath) { + std::vector external_table_paths; + external_table_paths.emplace_back("/tmp/external_path/"); + std::string relative_bucket_path = "p0=1/p1=0/bucket-0"; + + ASSERT_OK_AND_ASSIGN(std::unique_ptr provider, + ExternalPathProvider::Create(external_table_paths, relative_bucket_path)); + ASSERT_EQ(provider->GetNextExternalDataPath("file.orc"), + "/tmp/external_path/p0=1/p1=0/bucket-0/file.orc"); +} + +TEST(ExternalPathProviderTest, TestGetNextExternalDataPath2) { + std::vector external_table_paths; + external_table_paths.emplace_back("/tmp/external_path_a/"); + external_table_paths.emplace_back("/tmp/external_path_b/"); + external_table_paths.emplace_back("/tmp/external_path_c/"); + std::string relative_bucket_path = "p0=1/p1=0/bucket-0"; + + ASSERT_OK_AND_ASSIGN(std::unique_ptr provider, + ExternalPathProvider::Create(external_table_paths, relative_bucket_path)); + + std::set result_data_paths; + result_data_paths.insert(provider->GetNextExternalDataPath("file.orc")); + result_data_paths.insert(provider->GetNextExternalDataPath("file.orc")); + result_data_paths.insert(provider->GetNextExternalDataPath("file.orc")); + + ASSERT_EQ(result_data_paths, std::set({ + "/tmp/external_path_a/p0=1/p1=0/bucket-0/file.orc", + "/tmp/external_path_b/p0=1/p1=0/bucket-0/file.orc", + "/tmp/external_path_c/p0=1/p1=0/bucket-0/file.orc", + })); +} +} // namespace paimon::test diff --git a/src/paimon/common/fs/file_system_test.cpp b/src/paimon/common/fs/file_system_test.cpp new file mode 100644 index 0000000..815b983 --- /dev/null +++ b/src/paimon/common/fs/file_system_test.cpp @@ -0,0 +1,1407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/fs/file_system.h" + +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/executor/future.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/uuid.h" +#include "paimon/executor.h" +#include "paimon/factories/factory_creator.h" +#include "paimon/fs/file_system_factory.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class FileSystemTest : public ::testing::Test, public ::testing::WithParamInterface { + public: + void SetUp() override { + std::string file_system = GetParam(); + dir_ = paimon::test::UniqueTestDirectory::Create(file_system); + ASSERT_TRUE(dir_); + test_root_ = dir_->Str(); + fs_ = dir_->GetFileSystem(); + } + + void TearDown() override { + dir_.reset(); + fs_.reset(); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + /// Creates a random string with a length within the given interval. The string contains + /// only characters that can be represented as a single code point. + /// + /// @param min_length The minimum string length. + /// @param max_length The maximum string length (inclusive). + /// @param min_value The minimum character value to occur. + /// @param max_value The maximum character value to occur. + /// @return A random String. + static std::string GetRandomString(int32_t min_length, int32_t max_length, char min_value, + char max_value) { + int32_t len = std::rand() % (max_length - min_length + 1) + min_length; + + std::vector data; + data.resize(len); + int32_t diff = max_value - min_value + 1; + + for (int32_t i = 0; i < len; i++) { + data[i] = static_cast(std::rand() % diff + min_value); + } + return std::string(data.data(), data.size()); + } + + static std::string RandomName() { + return GetRandomString(16, 16, 'a', 'z'); + } + + std::string MakeDir(const std::string& test_root) { + EXPECT_OK_AND_ASSIGN(bool exist, fs_->Exists(test_root)); + if (!exist) { + EXPECT_OK(fs_->Mkdirs(test_root)); + } + return test_root; + } + + void CreateFile(const std::string& file) { + ASSERT_OK_AND_ASSIGN(std::unique_ptr out, + fs_->Create(file, /*overwrite=*/true)); + std::string input = "paimon"; + char chars[8] = {1, 2, 3, 4, 5, 6, 7, 8}; + ASSERT_OK_AND_ASSIGN(int32_t size, out->Write(chars, input.size())); + ASSERT_EQ(size, input.size()); + ASSERT_OK(out->Flush()); + ASSERT_OK(out->Close()); + } + + std::string CreateRandomFileInDirectory(const std::string& test_root) { + std::string directory = MakeDir(test_root); + std::string file_path = PathUtil::JoinPath(directory, RandomName()); + CreateFile(file_path); + return file_path; + } + + std::string RemoveLastSlashInPath(const std::string& path) const { + std::string new_path = path; + PathUtil::TrimLastDelim(&new_path); + return new_path; + } + + void CheckFileStatus(const std::vector>& actual_statuses, + const std::set& expected_files, + const std::set& expected_dirs) const { + ASSERT_EQ(actual_statuses.size(), expected_files.size() + expected_dirs.size()); + std::set actual_files; + std::set actual_dirs; + for (const auto& file_status : actual_statuses) { + if (file_status->IsDir()) { + actual_dirs.insert(RemoveLastSlashInPath(file_status->GetPath())); + } else { + actual_files.insert(file_status->GetPath()); + ASSERT_GT(file_status->GetLen(), 0); + int64_t modification_time = file_status->GetModificationTime(); + ASSERT_GT(modification_time, 10000000000L); // MIN_VALID_FILE_MODIFICATION_MS + ASSERT_LT(modification_time, 10000000000000L); // MAX_VALID_FILE_MODIFICATION_MS + } + } + std::set normalized_expected_dirs; + for (const auto& path : expected_dirs) { + normalized_expected_dirs.insert(RemoveLastSlashInPath(path)); + } + ASSERT_EQ(actual_files, expected_files); + ASSERT_EQ(actual_dirs, normalized_expected_dirs); + } + + void CheckBasicFileStatus(const std::vector>& actual_statuses, + const std::set& expected_files, + const std::set& expected_dirs) const { + ASSERT_EQ(actual_statuses.size(), expected_files.size() + expected_dirs.size()); + std::set actual_files; + std::set actual_dirs; + for (const auto& file_status : actual_statuses) { + if (file_status->IsDir()) { + actual_dirs.insert(RemoveLastSlashInPath(file_status->GetPath())); + } else { + actual_files.insert(file_status->GetPath()); + } + } + std::set normalized_expected_dirs; + for (const auto& path : expected_dirs) { + normalized_expected_dirs.insert(RemoveLastSlashInPath(path)); + } + ASSERT_EQ(actual_files, expected_files); + ASSERT_EQ(actual_dirs, normalized_expected_dirs); + } + + std::string GetTestDir() const { + std::string file_system = GetParam(); + if (file_system == "local") { + return paimon::test::GetDataDir(); + } else if (file_system == "jindo") { + return "oss://paimon-unittest/test_data/"; + } + assert(false); + return ""; + } + + private: + std::shared_ptr fs_; + std::unique_ptr dir_; + std::string test_root_; +}; + +TEST(FileSystemStaticTest, TestNoneFileSystemFactory) { + std::map fs_options; + Result> fs = + FileSystemFactory::Get("not exist", "/tmp", fs_options); + ASSERT_TRUE(fs.status().IsInvalid()); +} + +TEST_P(FileSystemTest, TestFactoryCreator) { + std::vector factory_registered_type = + FactoryCreator::GetInstance()->GetRegisteredType(); + + auto test_registered = [&](const std::string& identifier) { + bool is_exist = false; + for (auto registered_type : factory_registered_type) { + if (registered_type == identifier) { + is_exist = true; + } + } + ASSERT_TRUE(is_exist); + }; + test_registered(GetParam()); +} + +// --- create +TEST_P(FileSystemTest, TestCreate) { + std::string path = PathUtil::JoinPath(test_root_, "/test_file"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr out, fs_->Create(path, /*overwrite=*/true)); + ASSERT_TRUE(out); + std::string input = "paimon"; + ASSERT_OK_AND_ASSIGN(int32_t size, out->Write(input.data(), input.size())); + ASSERT_EQ(size, input.size()); + ASSERT_OK(out->Close()); + + ASSERT_NOK_WITH_MSG(fs_->Create(path, /*overwrite=*/false), "already exists"); +} + +// --- write&read +TEST_P(FileSystemTest, TestSimpleWriteAndRead) { + std::string content = "abcdefghijk"; + std::string file_path = test_root_ + "/file.data"; + // write process + ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, /*overwrite=*/true)); + ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(content.data(), content.size())); + ASSERT_EQ(write_len, content.size()); + + ASSERT_OK(out_stream->Flush()); + ASSERT_OK_AND_ASSIGN(int64_t pos, out_stream->GetPos()); + ASSERT_EQ(pos, content.size()); + + ASSERT_OK_AND_ASSIGN(std::string uri, out_stream->GetUri()); + ASSERT_EQ(uri, file_path); + ASSERT_OK(out_stream->Close()); + + // read process + ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path)); + ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos()); + ASSERT_EQ(pos, 0); + + // read from cur pos + std::string read_content(content.size(), '\0'); + ASSERT_OK_AND_ASSIGN(int32_t read_len, + in_stream->Read(read_content.data(), read_content.size())); + ASSERT_EQ(read_len, read_content.size()); + ASSERT_EQ(content, read_content); + + // read from offset + ASSERT_OK_AND_ASSIGN(read_len, in_stream->Read(read_content.data(), read_content.size(), 0)); + ASSERT_EQ(read_len, read_content.size()); + ASSERT_EQ(content, read_content); + + ASSERT_OK_AND_ASSIGN(uri, in_stream->GetUri()); + ASSERT_EQ(uri, file_path); + ASSERT_OK_AND_ASSIGN(uint64_t file_len, in_stream->Length()); + ASSERT_EQ(file_len, content.size()); + + ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos()); + ASSERT_EQ(pos, content.size()); + ASSERT_OK(in_stream->Close()); +} + +TEST_P(FileSystemTest, TestWriteMultipleTimes) { + std::vector content_vec = {"abc", "defg", "hi", "j", "k"}; + std::string content = "abcdefghijk"; + std::string file_path = test_root_ + "/file.data"; + // write process + ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, /*overwrite=*/true)); + for (const auto& str : content_vec) { + ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(str.data(), str.size())); + ASSERT_EQ(write_len, str.size()); + } + ASSERT_OK(out_stream->Flush()); + ASSERT_OK_AND_ASSIGN(int64_t pos, out_stream->GetPos()); + ASSERT_EQ(pos, content.size()); + ASSERT_OK(out_stream->Close()); + + // read process + ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path)); + std::string read_content(content.size(), '\0'); + ASSERT_OK_AND_ASSIGN(int32_t read_len, + in_stream->Read(read_content.data(), read_content.size())); + ASSERT_EQ(read_len, read_content.size()); + ASSERT_EQ(content, read_content); +} + +TEST_P(FileSystemTest, TestWriteInNotExistDir) { + std::string file_path = test_root_ + "/no_exist/file.data"; + // write process + std::string content = "abcdefghijk"; + ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, /*overwrite=*/true)); + ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t write_len, + out_stream->Write(content.data(), content.size())); + ASSERT_OK(out_stream->Flush()); + ASSERT_OK(out_stream->Close()); + + // read process + ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path)); + std::string read_content(content.size(), '\0'); + ASSERT_OK_AND_ASSIGN(int32_t read_len, + in_stream->Read(read_content.data(), read_content.size())); + ASSERT_EQ(read_len, read_content.size()); + ASSERT_EQ(content, read_content); + + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(test_root_ + "/no_exist/")); + ASSERT_TRUE(is_exist); +} + +TEST_P(FileSystemTest, TestWriteEmptyFile) { + std::string file_path = test_root_ + "/file.data"; + // write process + std::string content = ""; + ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, /*overwrite=*/true)); + ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(content.data(), content.size())); + ASSERT_EQ(write_len, 0); + ASSERT_OK(out_stream->Flush()); + ASSERT_OK(out_stream->Close()); + + // get file status + ASSERT_OK_AND_ASSIGN(auto st, fs_->GetFileStatus(file_path)); + ASSERT_EQ(st->GetPath(), file_path); + ASSERT_FALSE(st->IsDir()); + ASSERT_EQ(st->GetLen(), 0); + auto modification_time = st->GetModificationTime(); + ASSERT_GT(modification_time, 10000000000L); + ASSERT_LT(modification_time, 10000000000000L); + + // read process + ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path)); + std::string read_content(content.size(), '\0'); + ASSERT_OK_AND_ASSIGN(int32_t read_len, + in_stream->Read(read_content.data(), read_content.size())); + ASSERT_EQ(read_len, read_content.size()); + ASSERT_EQ(content, read_content); +} + +TEST_P(FileSystemTest, TestWriteWithOverwrite) { + std::string content = "abcdefghijk"; + std::string file_path = test_root_ + "/file.data"; + // write process + ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, /*overwrite=*/true)); + ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(content.data(), content.size())); + ASSERT_EQ(write_len, content.size()); + ASSERT_OK(out_stream->Flush()); + ASSERT_OK(out_stream->Close()); + + // write file which already exist + { + std::string new_content = "helloworld"; + ASSERT_OK_AND_ASSIGN(auto out_stream2, fs_->Create(file_path, /*overwrite=*/true)); + ASSERT_OK_AND_ASSIGN(write_len, out_stream2->Write(new_content.data(), new_content.size())); + ASSERT_EQ(write_len, new_content.size()); + ASSERT_OK(out_stream2->Flush()); + ASSERT_OK(out_stream2->Close()); + + // read process + ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path)); + std::string read_content(new_content.size(), '\0'); + ASSERT_OK_AND_ASSIGN(int32_t read_len, + in_stream->Read(read_content.data(), read_content.size())); + ASSERT_EQ(read_len, read_content.size()); + ASSERT_EQ(new_content, read_content); + } + { + // test file exist and overwrite = false + ASSERT_NOK_WITH_MSG(fs_->Create(file_path, /*overwrite=*/false), "do not allow overwrite"); + } +} + +TEST_P(FileSystemTest, TestAsyncRead) { + std::string content = "abcdefghijk"; + std::string file_path = test_root_ + "/file.data"; + // write process + ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, /*overwrite=*/true)); + ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t write_len, + out_stream->Write(content.data(), content.size())); + ASSERT_OK(out_stream->Flush()); + ASSERT_OK(out_stream->Close()); + + // read process + ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path)); + std::string read_content(content.size(), '\0'); + bool read_finished = false; + std::promise promise; + std::future future = promise.get_future(); + auto callback = [&](Status status) { + EXPECT_OK(status); + if (status.ok()) { + read_finished = true; + promise.set_value(10); + } else { + read_finished = false; + promise.set_value(20); + } + }; + in_stream->ReadAsync(read_content.data(), read_content.size(), /*offset=*/0, callback); + ASSERT_EQ(future.get(), 10); + ASSERT_TRUE(read_finished); + ASSERT_EQ(content, read_content); + ASSERT_OK(in_stream->Close()); +} + +TEST_P(FileSystemTest, TestInvalidRead) { + std::string content = "abcdefghijk"; + std::string file_path = test_root_ + "/file.data"; + // write process + ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, /*overwrite=*/true)); + ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t write_len, + out_stream->Write(content.data(), content.size())); + ASSERT_OK(out_stream->Flush()); + ASSERT_OK(out_stream->Close()); + { + // seek to end, then read + ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path)); + ASSERT_OK(in_stream->Seek(/*offset=*/11, SeekOrigin::FS_SEEK_SET)); + ASSERT_OK_AND_ASSIGN(auto pos, in_stream->GetPos()); + ASSERT_EQ(pos, 11); + // read from cur pos + std::string read_content(3, '\0'); + ASSERT_NOK(in_stream->Read(read_content.data(), read_content.size())); + ASSERT_OK_AND_ASSIGN(size_t actual_read, in_stream->Read(read_content.data(), 0)); + ASSERT_EQ(actual_read, 0); + } + { + // read invalid bulk data + ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path)); + std::string read_content(20, '\0'); + ASSERT_NOK(in_stream->Read(read_content.data(), read_content.size())); + ASSERT_OK_AND_ASSIGN(auto pos, in_stream->GetPos()); + ASSERT_EQ(pos, 11); + } + { + // read invalid with oversize offset + ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path)); + std::string read_content(4, '\0'); + ASSERT_NOK(in_stream->Read(read_content.data(), read_content.size(), /*offset=*/20)); + } +} + +TEST_P(FileSystemTest, TestInvalidAsyncRead) { + std::string content = "abcdefghijk"; + std::string file_path = test_root_ + "/file.data"; + // write process + ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, /*overwrite=*/true)); + ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t write_len, + out_stream->Write(content.data(), content.size())); + ASSERT_OK(out_stream->Flush()); + ASSERT_OK(out_stream->Close()); + + { + // test read overflow + ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path)); + std::string read_content(20, '\0'); + bool read_finished = false; + std::promise promise; + std::future future = promise.get_future(); + auto callback = [&](Status status) { + EXPECT_NOK(status); + if (status.ok()) { + read_finished = true; + promise.set_value(10); + } else { + read_finished = false; + promise.set_value(20); + } + }; + // invalid async read + in_stream->ReadAsync(read_content.data(), read_content.size(), /*offset=*/0, callback); + ASSERT_EQ(future.get(), 20); + ASSERT_FALSE(read_finished); + ASSERT_NE(read_content, content); + ASSERT_OK(in_stream->Close()); + } + { + // test read with invalid offset + ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path)); + std::string read_content(content.size(), '\0'); + bool read_finished = false; + std::promise promise; + std::future future = promise.get_future(); + auto callback = [&](Status status) { + EXPECT_NOK(status); + if (status.ok()) { + read_finished = true; + promise.set_value(10); + } else { + read_finished = false; + promise.set_value(20); + } + }; + // invalid async read + in_stream->ReadAsync(read_content.data(), read_content.size(), /*offset=*/20, callback); + ASSERT_EQ(future.get(), 20); + ASSERT_FALSE(read_finished); + ASSERT_NE(read_content, content); + ASSERT_OK(in_stream->Close()); + } +} + +TEST_P(FileSystemTest, TestReadAndWriteAndAtomicStoreFile) { + std::string file_path = test_root_ + "/file.data"; + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_FALSE(is_exist); + + std::string content = "content"; + ASSERT_OK(fs_->AtomicStore(file_path, content)); + std::string read_content; + ASSERT_OK(fs_->ReadFile(file_path, &read_content)); + ASSERT_EQ(read_content, content); + + std::string new_content = "hello world"; + ASSERT_OK(fs_->WriteFile(file_path, new_content, /*overwrite=*/true)); + ASSERT_OK(fs_->ReadFile(file_path, &read_content)); + ASSERT_EQ(read_content, new_content); + + ASSERT_NOK_WITH_MSG(fs_->WriteFile(file_path, content, /*overwrite=*/false), + "do not allow overwrite"); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + + ASSERT_NOK_WITH_MSG(fs_->AtomicStore(file_path, content), "dst file already exist"); + ASSERT_OK(fs_->ReadFile(file_path, &read_content)); + ASSERT_EQ(read_content, new_content); +} + +TEST(FileSystemStaticTest, TestIsObjectStore) { + ASSERT_OK_AND_ASSIGN(bool is_object_store, FileSystem::IsObjectStore("file:///tmp/test.txt")); + ASSERT_FALSE(is_object_store); + ASSERT_OK_AND_ASSIGN(is_object_store, FileSystem::IsObjectStore("/tmp/test.txt")); + ASSERT_FALSE(is_object_store); + ASSERT_OK_AND_ASSIGN(is_object_store, FileSystem::IsObjectStore("dfs://tmp/test.txt")); + ASSERT_FALSE(is_object_store); + ASSERT_OK_AND_ASSIGN(is_object_store, FileSystem::IsObjectStore("hdfs://tmp/test.txt")); + ASSERT_FALSE(is_object_store); + + ASSERT_OK_AND_ASSIGN(is_object_store, FileSystem::IsObjectStore("oss://bucket/test.txt")); + ASSERT_TRUE(is_object_store); + ASSERT_OK_AND_ASSIGN(is_object_store, FileSystem::IsObjectStore("s3://bucket/test.txt")); + ASSERT_TRUE(is_object_store); +} + +// --- seek +TEST_P(FileSystemTest, TestSeek) { + std::string path = PathUtil::JoinPath(test_root_, "/test_file"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr out, fs_->Create(path, /*overwrite=*/true)); + std::string input = "paimon"; + ASSERT_OK_AND_ASSIGN(int32_t size, out->Write(input.data(), input.size())); + ASSERT_EQ(size, input.size()); + ASSERT_OK(out->Close()); + + ASSERT_OK_AND_ASSIGN(std::unique_ptr in, fs_->Open(path)); + ASSERT_OK(in->Seek(/*offset=*/1, SeekOrigin::FS_SEEK_SET)); + ASSERT_OK_AND_ASSIGN(int64_t pos, in->GetPos()); + ASSERT_EQ(pos, 1); + + ASSERT_OK(in->Seek(/*offset=*/1, SeekOrigin::FS_SEEK_CUR)); + ASSERT_OK_AND_ASSIGN(int64_t pos2, in->GetPos()); + ASSERT_EQ(pos2, 2); + + ASSERT_OK(in->Seek(/*offset=*/-5, SeekOrigin::FS_SEEK_END)); + ASSERT_OK_AND_ASSIGN(int64_t pos3, in->GetPos()); + ASSERT_EQ(pos3, 1); + + std::string read_content(3, '\0'); + ASSERT_OK_AND_ASSIGN(int32_t read_len, in->Read(read_content.data(), read_content.size())); + ASSERT_EQ(read_len, read_content.size()); + ASSERT_EQ("aim", read_content); + + // read from offset + ASSERT_OK_AND_ASSIGN(read_len, + in->Read(read_content.data(), read_content.size(), /*offset=*/2)); + ASSERT_EQ(read_len, read_content.size()); + ASSERT_EQ("imo", read_content); + + ASSERT_OK_AND_ASSIGN(pos, in->GetPos()); + ASSERT_EQ(pos, 4); +} + +TEST_P(FileSystemTest, TestSeek2) { + std::string content = "abcdefghijk"; + std::string file_path = test_root_ + "/file.data"; + // write process + ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, /*overwrite=*/true)); + ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(content.data(), content.size())); + ASSERT_EQ(write_len, content.size()); + ASSERT_OK(out_stream->Flush()); + ASSERT_OK(out_stream->Close()); + + // read process + ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path)); + ASSERT_OK_AND_ASSIGN(auto pos, in_stream->GetPos()); + ASSERT_EQ(pos, 0); + + // valid seek + ASSERT_OK(in_stream->Seek(/*offset=*/2, SeekOrigin::FS_SEEK_SET)); + ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos()); + ASSERT_EQ(pos, 2); + + ASSERT_OK(in_stream->Seek(/*offset=*/4, SeekOrigin::FS_SEEK_CUR)); + ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos()); + ASSERT_EQ(pos, 6); + + ASSERT_OK(in_stream->Seek(/*offset=*/-3, SeekOrigin::FS_SEEK_END)); + ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos()); + ASSERT_EQ(pos, 8); + + // read from cur pos + std::string read_content(3, '\0'); + ASSERT_OK_AND_ASSIGN(int32_t read_len, + in_stream->Read(read_content.data(), read_content.size())); + ASSERT_EQ(read_len, read_content.size()); + ASSERT_EQ("ijk", read_content); + + // read from offset + ASSERT_OK_AND_ASSIGN(read_len, + in_stream->Read(read_content.data(), read_content.size(), /*offset=*/4)); + ASSERT_EQ(read_len, read_content.size()); + ASSERT_EQ("efg", read_content); + + ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos()); + ASSERT_EQ(pos, 11); + ASSERT_OK(in_stream->Close()); +} + +// --- rename +TEST_P(FileSystemTest, TestRename) { + std::string path = PathUtil::JoinPath(test_root_, "/test_file"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr out, fs_->Create(path, /*overwrite=*/true)); + ASSERT_TRUE(out); + std::string input = "paimon"; + ASSERT_OK_AND_ASSIGN(int32_t size, out->Write(input.data(), input.size())); + ASSERT_EQ(size, input.size()); + ASSERT_OK(out->Flush()); + ASSERT_OK(out->Close()); + ASSERT_OK_AND_ASSIGN(std::unique_ptr in, fs_->Open(path)); + ASSERT_TRUE(in); + char* data = new char[input.size() * 2]; + ASSERT_OK_AND_ASSIGN(int32_t size_read, in->Read(data, input.size(), /*offset=*/0)); + ASSERT_EQ(size_read, input.size()); + std::string read_data(data, input.size()); + ASSERT_EQ(read_data, input); + delete[] data; + ASSERT_OK(in->Close()); + + std::string path2 = PathUtil::JoinPath(test_root_, "/test_file_renamed"); + ASSERT_OK(fs_->Rename(path, path2)); + + std::string no_exist_path = PathUtil::JoinPath(test_root_, "/no_exist_file"); + ASSERT_NOK_WITH_MSG( + fs_->Rename(no_exist_path, PathUtil::JoinPath(test_root_, "/no_exist_file_renamed")), + "src file not exist"); + + ASSERT_NOK_WITH_MSG( + fs_->Rename(path2, PathUtil::JoinPath(test_root_, "/wrong_path_file_renamed/")), + "src file is not a dir"); +} + +TEST_P(FileSystemTest, TestRename2) { + { + // test rename dir + std::string dir_path = test_root_ + "/file_dir/"; + std::string dir_path2 = test_root_ + "/file_dir2/"; + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + ASSERT_OK(fs_->Rename(/*src=*/dir_path, /*dst=*/dir_path2)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_FALSE(is_exist); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2)); + ASSERT_TRUE(is_exist); + } + { + // test rename itself + std::string dir_path = test_root_ + "/file_dir_itself/"; + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/dir_path, /*dst=*/dir_path), + "dst file already exist"); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + } + { + // test rename from non-exist dir + std::string dir_path = test_root_ + "/non_exist_file_dir/"; + std::string dir_path2 = test_root_ + "/non_exist_file_dir2/"; + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_FALSE(is_exist); + ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/dir_path, /*dst=*/dir_path2), "src file not exist"); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2)); + ASSERT_FALSE(is_exist); + } + { + // test rename to exist dir + std::string dir_path = test_root_ + "/file_src_dir/"; + std::string dir_path2 = test_root_ + "/file_dst_dir/"; + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK(fs_->Mkdirs(dir_path2)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2)); + ASSERT_TRUE(is_exist); + + ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/dir_path, /*dst=*/dir_path2), + "dst file already exist"); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2)); + ASSERT_TRUE(is_exist); + } + { + // test rename file + std::string file_path = test_root_ + "/file1/file2/file3"; + ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + std::string file_path2 = test_root_ + "/file1/file4"; + ASSERT_OK(fs_->Rename(/*src=*/file_path, /*dst=*/file_path2)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path)); + ASSERT_FALSE(is_exist); + + std::string data; + ASSERT_OK(fs_->ReadFile(file_path2, &data)); + ASSERT_EQ(data, "content"); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + "/file1/file2/")); + ASSERT_TRUE(is_exist); + } + { + // test rename file to exist file + std::string file_path = test_root_ + "/file11/file12/file13"; + ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + std::string file_path2 = test_root_ + "/file15/file16/file13"; + ASSERT_OK(fs_->WriteFile(file_path2, "HelloWorld", /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path2)); + ASSERT_TRUE(is_exist); + + ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/file_path, /*dst=*/file_path2), + "dst file already exist"); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + std::string data; + ASSERT_OK(fs_->ReadFile(file_path2, &data)); + ASSERT_EQ(data, "HelloWorld"); + } + { + // test rename from non-exist file + std::string file_path = test_root_ + "/non_exist_file"; + std::string file_path2 = test_root_ + "/non_exist_file2"; + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_FALSE(is_exist); + ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/file_path, /*dst=*/file_path2), + "src file not exist"); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path2)); + ASSERT_FALSE(is_exist); + } +} + +// --- exists +TEST_P(FileSystemTest, TestFileExists) { + std::string file_path = CreateRandomFileInDirectory(test_root_); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); +} + +TEST_P(FileSystemTest, TestFileDoesNotExist) { + std::string path = PathUtil::JoinPath(test_root_, RandomName()); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(path)); + ASSERT_FALSE(is_exist); +} + +TEST_P(FileSystemTest, TestExists) { + std::string file_path = test_root_ + "/file.data"; + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_FALSE(is_exist); + + ASSERT_OK(fs_->WriteFile(file_path, "/content", /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + + std::string dir_path = test_root_ + "/file_dir/"; + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_FALSE(is_exist); + + ASSERT_OK(fs_->WriteFile(dir_path + "/file.data", "content", /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); +} + +// --- delete +TEST_P(FileSystemTest, TestExistingFileDeletion) { + auto check = [&](bool recursive) { + std::string file_path = CreateRandomFileInDirectory(test_root_); + ASSERT_OK(fs_->Delete(file_path, recursive)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_FALSE(is_exist); + }; + check(true); + check(false); +} + +TEST_P(FileSystemTest, TestNotExistingFileDeletion) { + auto check = [&](bool recursive) { + std::string path = PathUtil::JoinPath(test_root_, RandomName()); + ASSERT_TRUE(fs_->Delete(path, recursive).IsIOError()); + }; + check(true); + check(false); +} + +TEST_P(FileSystemTest, TestExistingEmptyDirectoryDeletion) { + auto check = [&](bool recursive) { + std::string path = PathUtil::JoinPath(test_root_, RandomName()); + ASSERT_OK(fs_->Mkdirs(path)); + ASSERT_OK(fs_->Delete(path, recursive)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(path)); + ASSERT_FALSE(is_exist); + }; + check(true); + check(false); +} + +TEST_P(FileSystemTest, TestExistingNonEmptyDirectoryRecursiveDeletion) { + { + std::string file_path = CreateRandomFileInDirectory(test_root_); + ASSERT_OK(fs_->Delete(test_root_, /*recursive=*/true)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_FALSE(is_exist); + } + { + std::string file_path = CreateRandomFileInDirectory(test_root_); + ASSERT_NOK(fs_->Delete(test_root_, /*recursive=*/false)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + } +} + +TEST_P(FileSystemTest, TestExistingNonEmptyDirectoryWithSubDirRecursiveDeletion) { + { + std::string level1_subdir_with_file = PathUtil::JoinPath(test_root_, RandomName()); + const std::string file_in_level1_subdir = + CreateRandomFileInDirectory(level1_subdir_with_file); + std::string level2_subdir_with_file = + PathUtil::JoinPath(level1_subdir_with_file, RandomName()); + const std::string file_in_level2_subdir = + CreateRandomFileInDirectory(level2_subdir_with_file); + ASSERT_OK(fs_->Delete(test_root_, /*recursive=*/true)); + ASSERT_FALSE(fs_->Exists(file_in_level1_subdir).value()); + ASSERT_FALSE(fs_->Exists(level2_subdir_with_file).value()); + ASSERT_FALSE(fs_->Exists(file_in_level2_subdir).value()); + } + { + std::string level1_subdir_with_file = PathUtil::JoinPath(test_root_, RandomName()); + const std::string file_in_level1_subdir = + CreateRandomFileInDirectory(level1_subdir_with_file); + std::string level2_subdir_with_file = + PathUtil::JoinPath(level1_subdir_with_file, RandomName()); + const std::string file_in_level2_subdir = + CreateRandomFileInDirectory(level2_subdir_with_file); + ASSERT_NOK(fs_->Delete(test_root_, /*recursive=*/false)); + ASSERT_TRUE(fs_->Exists(file_in_level1_subdir).value()); + ASSERT_TRUE(fs_->Exists(level2_subdir_with_file).value()); + ASSERT_TRUE(fs_->Exists(file_in_level2_subdir).value()); + } +} + +TEST_P(FileSystemTest, TestDelete) { + { + std::string dir_path = test_root_ + "/file_dir/"; + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_FALSE(is_exist); + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + ASSERT_OK(fs_->Delete(dir_path, /*recursive=*/true)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_FALSE(is_exist); + } + { + // test recursive delete + std::string dir_path = test_root_ + "/file_dir1/file_dir2/file_dir3"; + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_FALSE(is_exist); + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + + // rm file_dir1/file_dir2/file_dir3/ + ASSERT_OK(fs_->Delete(dir_path, /*recursive=*/true)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_FALSE(is_exist); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + "/file_dir1/")); + ASSERT_TRUE(is_exist); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + "/file_dir1/file_dir2")); + ASSERT_TRUE(is_exist); + + // rm file_dir1/ + ASSERT_OK(fs_->Delete(test_root_ + "/file_dir1/", /*recursive=*/true)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + "/file_dir1/")); + ASSERT_FALSE(is_exist); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + "/file_dir1/file_dir2")); + ASSERT_FALSE(is_exist); + } + { + // test delete file + std::string file_path = test_root_ + "/file1/file2/file3"; + ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + + ASSERT_OK(fs_->Delete(file_path, /*recursive=*/true)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path)); + ASSERT_FALSE(is_exist); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + "/file1/file2/")); + ASSERT_TRUE(is_exist); + + ASSERT_NOK_WITH_MSG(fs_->Delete(test_root_ + "/file1", /*recursive=*/false), + "is not empty"); + } + { + // test recursive delete + std::string file_path = test_root_ + "/file1/file2/file3"; + ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + + ASSERT_OK(fs_->Delete(test_root_ + "/file1/", /*recursive=*/true)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path)); + ASSERT_FALSE(is_exist); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + "/file1/file2/")); + ASSERT_FALSE(is_exist); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + "/file1/")); + ASSERT_FALSE(is_exist); + } + { + // test recursive delete + std::string dir_path = test_root_ + "/file_recursive_dir1/file_dir2/file_dir3"; + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_FALSE(is_exist); + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + + ASSERT_NOK_WITH_MSG(fs_->Delete(test_root_ + "/file_recursive_dir1", /*recursive=*/false), + "is not empty"); + } + { + // test delete non-exist file + std::string file_path = test_root_ + "/file_non_exist"; + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_FALSE(is_exist); + ASSERT_NOK(fs_->Delete(file_path, /*recursive=*/false)); + } +} + +// --- mkdirs +TEST_P(FileSystemTest, TestMkdirsReturnsTrueWhenCreatingDirectory) { + ASSERT_OK(fs_->Mkdirs(test_root_)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(test_root_)); + ASSERT_TRUE(is_exist); +} + +TEST_P(FileSystemTest, TestMkdirsCreatesParentDirectories) { + std::string deep_path = PathUtil::JoinPath(test_root_, RandomName()); + ASSERT_OK(fs_->Mkdirs(deep_path)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(deep_path)); + ASSERT_TRUE(is_exist); +} + +TEST_P(FileSystemTest, TestMkdirsReturnsTrueForExistingDirectory) { + std::string file_path = CreateRandomFileInDirectory(test_root_); + ASSERT_OK(fs_->Mkdirs(test_root_)); +} + +TEST_P(FileSystemTest, TestMkdirsFailsForExistingFile) { + std::string file_path = CreateRandomFileInDirectory(test_root_); + auto status = fs_->Mkdirs(file_path); + ASSERT_TRUE(status.IsIOError()); +} + +TEST_P(FileSystemTest, TestMkdirsFailsWithExistingParentFile) { + std::string file_path = CreateRandomFileInDirectory(test_root_); + std::string dir_under_file = PathUtil::JoinPath(file_path, RandomName()); + ASSERT_TRUE(fs_->Mkdirs(dir_under_file).IsIOError()); +} + +TEST_P(FileSystemTest, TestMkdir) { + ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp.txt/tmpB")); + ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmpA/tmpB/")); + + ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp/local/f/1")); + ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1")); + ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1/f2/")); + ASSERT_OK(fs_->Mkdirs("/")); + ASSERT_NOK_WITH_MSG(fs_->Mkdirs(""), "path is an empty string."); +} + +TEST_P(FileSystemTest, TestMkdir2) { + { + std::string dir_path = test_root_ + "/file_dir/"; + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_FALSE(is_exist); + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + } + { + // test recursive mkdir + std::string dir_path = test_root_ + "/file_dir1/file_dir2/file_dir3"; + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_FALSE(is_exist); + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + "/file_dir1/")); + ASSERT_TRUE(is_exist); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + "/file_dir1/file_dir2")); + ASSERT_TRUE(is_exist); + } +} + +// test for create multi dir such as "/table/partition1/bucket1" and "/table/partition1/bucket2" +TEST_P(FileSystemTest, TestMkdirMultiThreadWithSameNonExistParentDir) { + uint32_t runs_count = 10; + uint32_t thread_count = 10; + auto executor = CreateDefaultExecutor(thread_count); + + for (uint32_t i = 0; i < runs_count; i++) { + std::string uuid; + ASSERT_TRUE(UUID::Generate(&uuid)); + std::vector> futures; + for (uint32_t thread_idx = 0; thread_idx < thread_count; thread_idx++) { + futures.push_back(Via(executor.get(), [this, &uuid, thread_idx]() -> void { + std::string dir_path = + PathUtil::JoinPath(test_root_, uuid + "/" + std::to_string(thread_idx)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_FALSE(is_exist); + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + })); + } + Wait(futures); + } +} + +// test for create multi dir such as "/table/partition1" and "/table/partition1" +TEST_P(FileSystemTest, TestMkdirMultiThreadWithSameName) { + uint32_t runs_count = 10; + uint32_t thread_count = 10; + auto executor = CreateDefaultExecutor(thread_count); + + for (uint32_t i = 0; i < runs_count; i++) { + std::string uuid; + ASSERT_TRUE(UUID::Generate(&uuid)); + std::vector> futures; + for (uint32_t thread_idx = 0; thread_idx < thread_count; thread_idx++) { + futures.push_back(Via(executor.get(), [this, &uuid]() -> void { + std::string dir_path = PathUtil::JoinPath(test_root_, uuid); + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + })); + } + Wait(futures); + } +} + +// test for create multi dir such as "partition1" and "partition1" (relative path) +TEST_P(FileSystemTest, TestMkdirMultiThreadWithSameNameWithRelativePath) { + uint32_t runs_count = 10; + uint32_t thread_count = 10; + auto executor = CreateDefaultExecutor(thread_count); + + for (uint32_t i = 0; i < runs_count; i++) { + std::string uuid; + ASSERT_TRUE(UUID::Generate(&uuid)); + std::vector> futures; + for (uint32_t thread_idx = 0; thread_idx < thread_count; thread_idx++) { + futures.push_back(Via(executor.get(), [this, &uuid]() -> void { + std::string dir_path = uuid; + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + })); + } + Wait(futures); + } +} + +TEST_P(FileSystemTest, TestInvalidMkdir) { + { + // test mkdir with one exist dir + std::string dir_path = test_root_ + "/file_dir/"; + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + } + { + // test mkdir with one exist file with same name + std::string file_path = test_root_ + "/file_dir/file.data"; + ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + + ASSERT_NOK_WITH_MSG(fs_->Mkdirs(file_path), "already exists"); + } +} + +// --- file status +TEST_P(FileSystemTest, TestGetFileStatus1) { + { + // test dir simple + std::string dir_path = test_root_ + "/file_dir/"; + ASSERT_OK(fs_->Mkdirs(dir_path)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_TRUE(is_exist); + ASSERT_OK_AND_ASSIGN(std::unique_ptr st, fs_->GetFileStatus(dir_path)); + ASSERT_EQ(RemoveLastSlashInPath(st->GetPath()), RemoveLastSlashInPath(dir_path)); + ASSERT_TRUE(st->IsDir()); + auto modification_time = st->GetModificationTime(); + ASSERT_GT(modification_time, 10000000000L); + ASSERT_LT(modification_time, 10000000000000L); + + std::string file_path = test_root_ + "/file_dir/file.data"; + ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + + // check meta in dir + ASSERT_OK_AND_ASSIGN(st, fs_->GetFileStatus(dir_path)); + ASSERT_EQ(RemoveLastSlashInPath(st->GetPath()), RemoveLastSlashInPath(dir_path)); + ASSERT_TRUE(st->IsDir()); + modification_time = st->GetModificationTime(); + ASSERT_GT(modification_time, 10000000000L); + ASSERT_LT(modification_time, 10000000000000L); + } + { + // test non-exist dir + std::string dir_path = test_root_ + "/non_exist_file_dir/"; + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_FALSE(is_exist); + ASSERT_NOK(fs_->GetFileStatus(dir_path)); + } + { + // test file simple + std::string content = "content"; + std::string file_path = test_root_ + "/file.data"; + ASSERT_OK(fs_->WriteFile(file_path, content, /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + + ASSERT_OK_AND_ASSIGN(std::unique_ptr st, fs_->GetFileStatus(file_path)); + ASSERT_EQ(st->GetPath(), file_path); + ASSERT_FALSE(st->IsDir()); + ASSERT_EQ(st->GetLen(), content.size()); + auto modification_time = st->GetModificationTime(); + ASSERT_GT(modification_time, 10000000000L); + ASSERT_LT(modification_time, 10000000000000L); + } + { + // test non-exist file + std::string dir_path = test_root_ + "/non_exist_file"; + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path)); + ASSERT_FALSE(is_exist); + ASSERT_NOK(fs_->GetFileStatus(dir_path)); + } +} + +TEST_P(FileSystemTest, TestGetFileStatus2) { + const std::string test_path = GetTestDir() + "orc/append_09.db/append_09"; + { + // input is a dir, with a trailing '/' + std::string dir_name = test_path + "/"; + std::vector> status_list; + ASSERT_OK_AND_ASSIGN(std::unique_ptr file_status, fs_->GetFileStatus(dir_name)); + status_list.emplace_back(std::move(file_status)); + CheckFileStatus(status_list, /*expected_files=*/{}, /*expected_dirs=*/{dir_name}); + } + { + // input is a dir, without a trailing '/' + std::vector> status_list; + ASSERT_OK_AND_ASSIGN(std::unique_ptr file_status, + fs_->GetFileStatus(test_path)); + status_list.emplace_back(std::move(file_status)); + CheckFileStatus(status_list, /*expected_files=*/{}, /*expected_dirs=*/{test_path}); + } + { + // input is a file + std::vector> status_list; + std::string file_name = PathUtil::JoinPath(test_path, "README"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr file_status, + fs_->GetFileStatus(file_name)); + status_list.emplace_back(std::move(file_status)); + CheckFileStatus(status_list, /*expected_files=*/{file_name}, /*expected_dirs=*/{}); + } + { + // input is not exist + std::vector> status_list; + ASSERT_NOK(fs_->GetFileStatus(PathUtil::JoinPath(test_path, "NOT_EXIST"))); + } +} + +TEST_P(FileSystemTest, TestInvalidListFileStatus) { + { + // list non exist dir will return ok + std::vector> file_status_list; + ASSERT_OK(fs_->ListFileStatus(test_root_ + "/non-exist/", &file_status_list)); + ASSERT_TRUE(file_status_list.empty()); + } + { + // test data path + std::string file_path = test_root_ + "/file.data"; + ASSERT_OK(fs_->WriteFile(file_path, "hello", /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + std::vector> file_status_list; + ASSERT_OK(fs_->ListFileStatus(file_path, &file_status_list)); + ASSERT_EQ(file_status_list[0]->GetPath(), file_path); + ASSERT_EQ(file_status_list[0]->GetLen(), 5); + ASSERT_FALSE(file_status_list[0]->IsDir()); + } +} + +TEST_P(FileSystemTest, TestListFileStatus1) { + std::string file_path = test_root_ + "/file_dir1/file.data"; + ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + + std::vector> file_status_list; + ASSERT_OK(fs_->ListFileStatus(test_root_, &file_status_list)); + ASSERT_EQ(file_status_list.size(), 1); + std::set expected_dirs = {test_root_ + "/file_dir1/"}; + CheckFileStatus(file_status_list, /*expected_files=*/{}, expected_dirs); + + auto dir_path2 = test_root_ + "/file_dir2/"; + ASSERT_OK(fs_->Mkdirs(dir_path2)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2)); + ASSERT_TRUE(is_exist); + + auto dir_path3 = test_root_ + "/file_dir1/file_dir3/"; + ASSERT_OK(fs_->Mkdirs(dir_path3)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path3)); + ASSERT_TRUE(is_exist); + + std::string file_path2 = test_root_ + "/file_dir1/second_file.data"; + ASSERT_OK(fs_->WriteFile(file_path2, "hello!", /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path2)); + ASSERT_TRUE(is_exist); + + std::vector> file_status_list2; + ASSERT_OK(fs_->ListFileStatus(test_root_, &file_status_list2)); + ASSERT_EQ(file_status_list2.size(), 2); + expected_dirs = {test_root_ + "/file_dir1/", dir_path2}; + CheckFileStatus(file_status_list2, /*expected_files=*/{}, expected_dirs); + + std::vector> file_status_list3; + ASSERT_OK(fs_->ListFileStatus(test_root_ + "/file_dir1/", &file_status_list3)); + ASSERT_EQ(file_status_list3.size(), 3); + expected_dirs = {dir_path3}; + std::set expected_files = {file_path, file_path2}; + CheckFileStatus(file_status_list3, expected_files, expected_dirs); +} + +TEST_P(FileSystemTest, TestListFileStatus2) { + const std::string test_path = GetTestDir() + "orc/append_09.db/append_09"; + const std::set expected_files = {PathUtil::JoinPath(test_path, "README")}; + const std::set expected_dirs = { + PathUtil::JoinPath(test_path, "f1=10"), PathUtil::JoinPath(test_path, "commit_messages"), + PathUtil::JoinPath(test_path, "f1=20"), PathUtil::JoinPath(test_path, "data_splits"), + PathUtil::JoinPath(test_path, "manifest"), PathUtil::JoinPath(test_path, "schema"), + PathUtil::JoinPath(test_path, "snapshot")}; + { + // input is a dir, with a trailing '/' + std::vector> status_list; + ASSERT_OK(fs_->ListFileStatus(test_path + "/", &status_list)); + CheckFileStatus(status_list, expected_files, expected_dirs); + } + { + // input is a dir, without a trailing '/' + std::vector> status_list; + ASSERT_OK(fs_->ListFileStatus(test_path, &status_list)); + CheckFileStatus(status_list, expected_files, expected_dirs); + } + { + // input is a file + std::vector> status_list; + ASSERT_OK(fs_->ListFileStatus(PathUtil::JoinPath(test_path, "README"), &status_list)); + CheckFileStatus(status_list, expected_files, /*expected_dirs=*/{}); + } + { + // input is not exist + std::vector> status_list; + ASSERT_OK(fs_->ListFileStatus(PathUtil::JoinPath(test_path, "NOT_EXIST"), &status_list)); + CheckFileStatus(status_list, /*expected_files=*/{}, /*expected_dirs=*/{}); + } +} + +TEST_P(FileSystemTest, TestListDir1) { + std::string file_path = test_root_ + "/file_dir1/file.data"; + ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); + + auto dir_path1 = test_root_ + "/file_dir1/"; + std::vector> file_status_list; + ASSERT_OK(fs_->ListDir(test_root_, &file_status_list)); + ASSERT_EQ(file_status_list.size(), 1); + std::set expected_dirs = {dir_path1}; + CheckBasicFileStatus(file_status_list, std::set(), expected_dirs); + + auto dir_path2 = test_root_ + "/file_dir2/"; + ASSERT_OK(fs_->Mkdirs(dir_path2)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2)); + ASSERT_TRUE(is_exist); + + auto dir_path3 = test_root_ + "/file_dir1/file_dir3/"; + ASSERT_OK(fs_->Mkdirs(dir_path3)); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path3)); + ASSERT_TRUE(is_exist); + + std::vector> file_status_list2; + ASSERT_OK(fs_->ListDir(test_root_, &file_status_list2)); + ASSERT_EQ(file_status_list2.size(), 2); + expected_dirs = {dir_path1, dir_path2}; + CheckBasicFileStatus(file_status_list2, std::set(), expected_dirs); + + std::vector> file_status_list3; + ASSERT_OK(fs_->ListDir(dir_path1, &file_status_list3)); + ASSERT_EQ(file_status_list3.size(), 2); + std::set expected_files = {file_path}; + expected_dirs = {dir_path3}; + CheckBasicFileStatus(file_status_list3, expected_files, expected_dirs); + + // list non exist dir will return ok + std::vector> file_status_list4; + ASSERT_OK(fs_->ListDir(test_root_ + "/non-exist/", &file_status_list4)); + ASSERT_TRUE(file_status_list4.empty()); + + // list invalid path, a data file path + std::vector> file_status_list5; + ASSERT_NOK_WITH_MSG(fs_->ListDir(file_path, &file_status_list5), "is not a directory"); +} + +TEST_P(FileSystemTest, TestListDir2) { + const std::string test_path = GetTestDir() + "orc/append_09.db/append_09"; + const std::set expected_files = {PathUtil::JoinPath(test_path, "README")}; + const std::set expected_dirs = { + PathUtil::JoinPath(test_path, "f1=10"), PathUtil::JoinPath(test_path, "commit_messages"), + PathUtil::JoinPath(test_path, "f1=20"), PathUtil::JoinPath(test_path, "data_splits"), + PathUtil::JoinPath(test_path, "manifest"), PathUtil::JoinPath(test_path, "schema"), + PathUtil::JoinPath(test_path, "snapshot")}; + { + // input is a dir, with a trailing '/' + std::vector> status_list; + ASSERT_OK(fs_->ListDir(test_path + "/", &status_list)); + CheckBasicFileStatus(status_list, expected_files, expected_dirs); + } + { + // input is a dir, without a trailing '/' + std::vector> status_list; + ASSERT_OK(fs_->ListDir(test_path, &status_list)); + CheckBasicFileStatus(status_list, expected_files, expected_dirs); + } + { + // input is a file + std::vector> status_list; + ASSERT_NOK_WITH_MSG(fs_->ListDir(PathUtil::JoinPath(test_path, "README"), &status_list), + "file " + PathUtil::JoinPath(test_path, "README") + + " already exists and is not a directory"); + } + { + // input is not exist + std::vector> status_list; + ASSERT_OK(fs_->ListDir(PathUtil::JoinPath(test_path, "NOT_EXIST"), &status_list)); + CheckBasicFileStatus(status_list, /*expected_files=*/{}, + /*expected_dirs=*/{}); + } +} + +// --- exception +TEST_P(FileSystemTest, TestGetNotExistFileStatus) { + std::string path = PathUtil::JoinPath(test_root_, RandomName()); + ASSERT_NOK(fs_->GetFileStatus(path)); +} + +// --- atomic store +TEST_P(FileSystemTest, TestAtomicStore) { + std::string path = PathUtil::JoinPath(test_root_, RandomName()); + std::string content = "test_content"; + ASSERT_OK(fs_->AtomicStore(path, content)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(path)); + ASSERT_TRUE(is_exist); +} + +TEST_P(FileSystemTest, TestAtomicStoreAlreadyExist) { + std::string file_path = CreateRandomFileInDirectory(test_root_); + std::string content = "test_content"; + ASSERT_NOK(fs_->AtomicStore(file_path, content)); + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path)); + ASSERT_TRUE(is_exist); +} + +INSTANTIATE_TEST_SUITE_P(UseLocal, FileSystemTest, ::testing::Values("local" /*, "jindo"*/)); + +} // namespace paimon::test diff --git a/src/paimon/common/fs/resolving_file_system_test.cpp b/src/paimon/common/fs/resolving_file_system_test.cpp new file mode 100644 index 0000000..c459c00 --- /dev/null +++ b/src/paimon/common/fs/resolving_file_system_test.cpp @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/fs/resolving_file_system.h" + +#include +#include +#include +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "paimon/common/executor/future.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/executor.h" +#include "paimon/factories/factory_creator.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/file_system_factory.h" +#include "paimon/fs/local/local_file_system_factory.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class GmockFileSystem : public FileSystem { + public: + GmockFileSystem(const std::string& identifier, const std::string& authority) + : identifier_(identifier), authority_(authority) {} + + MOCK_METHOD(Result>, Open, (const std::string& path), + (const, override)); + MOCK_METHOD(Result>, Create, + (const std::string& path, bool overwrite), (const, override)); + MOCK_METHOD(Status, Mkdirs, (const std::string& path), (const, override)); + MOCK_METHOD(Status, Rename, (const std::string& src, const std::string& dst), + (const, override)); + MOCK_METHOD(Status, Delete, (const std::string& path, bool recursive), (const, override)); + MOCK_METHOD(Result>, GetFileStatus, (const std::string& path), + (const, override)); + MOCK_METHOD(Status, ListDir, + (const std::string& directory, + std::vector>* file_status_list), + (const, override)); + MOCK_METHOD(Status, ListFileStatus, + (const std::string& path, + std::vector>* file_status_list), + (const, override)); + MOCK_METHOD(Result, Exists, (const std::string& path), (const, override)); + + const std::string& GetIdentifier() const { + return identifier_; + } + const std::string& GetAuthority() const { + return authority_; + } + + private: + std::string identifier_; + std::string authority_; +}; + +class GmockFileSystemFactory : public FileSystemFactory { + public: + explicit GmockFileSystemFactory(const std::string& identifier) : identifier_(identifier) {} + + const char* Identifier() const override { + return identifier_.c_str(); + } + + Result> Create( + const std::string& path_str, + const std::map& options) const override { + PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(path_str)); + return std::make_unique>(identifier_, path.authority); + } + + private: + std::string identifier_; +}; + +class ResolvingFileSystemTest : public testing::Test { + public: + void SetUp() override { + auto factory_creator = FactoryCreator::GetInstance(); + + // Register mock factories + factory_creator->Register("mock_local", new GmockFileSystemFactory("mock_local")); + factory_creator->Register("mock_hdfs", new GmockFileSystemFactory("mock_hdfs")); + factory_creator->Register("mock_oss", new GmockFileSystemFactory("mock_oss")); + } + + void TearDown() override { + auto factory_creator = FactoryCreator::GetInstance(); + factory_creator->TEST_Unregister("mock_local"); + factory_creator->TEST_Unregister("mock_hdfs"); + factory_creator->TEST_Unregister("mock_oss"); + } +}; + +TEST_F(ResolvingFileSystemTest, GetRealFileSystemWithFileScheme) { + std::map scheme_mapping = { + {"file", "mock_local"}, {"hdfs", "mock_hdfs"}, {"oss", "mock_oss"}}; + ResolvingFileSystem resolving_fs(scheme_mapping, "mock_local", /*options=*/{}); + + auto check_result = [&](const std::string& uri, const std::string& expected_identifier) { + ASSERT_OK_AND_ASSIGN(auto real_fs, resolving_fs.GetRealFileSystem(uri)); + auto casted_fs = dynamic_cast(real_fs.get()); + ASSERT_TRUE(casted_fs); + ASSERT_EQ(casted_fs->GetIdentifier(), expected_identifier); + }; + check_result("file:///tmp/test.txt", "mock_local"); + check_result("hdfs://node:9000/tmp/test.txt", "mock_hdfs"); + check_result("oss://bucket/tmp/test.txt", "mock_oss"); + // Empty scheme should map to "file" scheme + check_result("/tmp/test.txt", "mock_local"); + check_result("tmp/test.txt", "mock_local"); + // Unknown scheme should use default file system + check_result("s3://bucket/tmp/test.txt", "mock_local"); +} + +TEST_F(ResolvingFileSystemTest, EmptySchemeMapping) { + std::map empty_scheme_mapping; + ResolvingFileSystem resolving_fs(empty_scheme_mapping, "mock_local", /*options=*/{}); + + // Should use default file system for all schemes + ASSERT_OK_AND_ASSIGN(auto real_fs, resolving_fs.GetRealFileSystem("any_scheme://path/test")); + auto casted_fs = dynamic_cast(real_fs.get()); + ASSERT_TRUE(casted_fs); + ASSERT_EQ(casted_fs->GetIdentifier(), "mock_local"); +} + +TEST_F(ResolvingFileSystemTest, FileSystemCaching) { + std::map scheme_mapping = {{"file", "mock_local"}}; + ResolvingFileSystem resolving_fs(scheme_mapping, "mock_local", /*options=*/{}); + + // First call should create and cache the file system + ASSERT_OK_AND_ASSIGN(auto fs1, resolving_fs.GetRealFileSystem("file:///tmp/test1.txt")); + // Second call with same scheme should use cached file system + ASSERT_OK_AND_ASSIGN(auto fs2, resolving_fs.GetRealFileSystem("file:///tmp/test2.txt")); + ASSERT_EQ(fs1, fs2); +} + +TEST_F(ResolvingFileSystemTest, DifferentAuthoritiesCacheSeparately) { + std::map scheme_mapping = {{"hdfs", "mock_hdfs"}}; + ResolvingFileSystem resolving_fs(scheme_mapping, "mock_local", /*options=*/{}); + + // Different authorities should create separate file system instances + ASSERT_OK_AND_ASSIGN(auto fs1, resolving_fs.GetRealFileSystem("hdfs://node1:9000/test.txt")); + ASSERT_OK_AND_ASSIGN(auto fs2, resolving_fs.GetRealFileSystem("hdfs://node2:9000/test.txt")); + ASSERT_NE(fs1, fs2); +} + +TEST_F(ResolvingFileSystemTest, ThreadSafety) { + std::map scheme_mapping = {{"file", "mock_local"}}; + ResolvingFileSystem resolving_fs(scheme_mapping, "mock_local", /*options=*/{}); + const int32_t num_threads = 100; + const int32_t operations_per_thread = 10; + + auto executor = CreateDefaultExecutor(); + std::vector>>> futures; + for (int32_t i = 0; i < num_threads; ++i) { + futures.push_back( + Via(executor.get(), [&resolving_fs, i]() -> std::vector> { + std::vector> fs_list; + for (int32_t j = 0; j < operations_per_thread; ++j) { + std::string path = "file:///tmp/thread_" + std::to_string(i) + "_" + + std::to_string(j) + ".txt"; + EXPECT_OK_AND_ASSIGN(auto fs, resolving_fs.GetRealFileSystem(path)); + EXPECT_TRUE(fs); + fs_list.push_back(fs); + } + return fs_list; + })); + } + + // Verify that all threads should get the same file system instance + auto results = CollectAll(futures); + auto fs0 = results[0][0]; + for (const auto& fs_list : results) { + for (const auto& fs : fs_list) { + ASSERT_EQ(fs, fs0); + } + } +} + +TEST_F(ResolvingFileSystemTest, ThreadSafety2) { + std::map scheme_mapping = {{"oss", "mock_oss"}}; + ResolvingFileSystem resolving_fs(scheme_mapping, "mock_oss", /*options=*/{}); + const int32_t num_threads = 100; + const int32_t operations_per_thread = 10; + + auto executor = CreateDefaultExecutor(); + std::vector>>> futures; + for (int32_t i = 0; i < num_threads; ++i) { + futures.push_back( + Via(executor.get(), [&resolving_fs, i]() -> std::vector> { + std::vector> fs_list; + for (int32_t j = 0; j < operations_per_thread; ++j) { + std::string path = + "oss://bucket_" + std::to_string(i) + "/" + std::to_string(j) + ".txt"; + EXPECT_OK_AND_ASSIGN(auto fs, resolving_fs.GetRealFileSystem(path)); + EXPECT_TRUE(fs); + fs_list.push_back(fs); + } + return fs_list; + })); + } + + auto results = CollectAll(futures); + for (size_t i = 0; i < results.size(); ++i) { + for (size_t j = 0; j < results[i].size(); ++j) { + auto casted_fs = dynamic_cast(results[i][j].get()); + ASSERT_TRUE(casted_fs); + ASSERT_EQ(casted_fs->GetAuthority(), "bucket_" + std::to_string(i)); + } + } +} + +} // namespace paimon::test