diff --git a/src/paimon/fs/local/local_file.cpp b/src/paimon/fs/local/local_file.cpp new file mode 100644 index 0000000..a7c26cc --- /dev/null +++ b/src/paimon/fs/local/local_file.cpp @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/fs/local/local_file.h" + +#include +#include +#include + +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/factories/io_hook.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/fs/local/local_file_status.h" + +namespace paimon { + +// TODO(yonghao.fyh): move io_hook.h to test/test_util and add a HookLocalFileSystem only for test +#define CHECK_HOOK() \ + if (hook_) { \ + PAIMON_RETURN_NOT_OK(hook_->Try(path_)); \ + } + +LocalFile::LocalFile(const std::string& path) : path_(path), hook_(IOHook::GetInstance()) {} + +Result LocalFile::Exists() const { + CHECK_HOOK(); + if (access(path_.c_str(), F_OK) == 0) { + return true; + } else if (errno == ENOENT) { + return false; + } + int32_t cur_errno = errno; + return Status::IOError( + fmt::format("path '{}' check exists fail, ec: {}", path_, std::strerror(cur_errno))); +} + +Result LocalFile::IsFile() const { + CHECK_HOOK(); + bool is_file = false; + struct stat buf; + if (stat(path_.c_str(), &buf) < 0) { + return Status::IOError( + fmt::format("path '{}' check isFile fail, ec: {}", path_, std::strerror(errno))); + } + if (S_ISREG(buf.st_mode)) { + is_file = true; + } + return is_file; +} + +Result LocalFile::IsDir() const { + CHECK_HOOK(); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file_status, GetFileStatus()); + return file_status->IsDir(); +} + +Status LocalFile::List(std::vector* file_list) const { + CHECK_HOOK(); + file_list->clear(); + DIR* dp; + struct dirent* ep; + dp = opendir(path_.c_str()); + if (dp == nullptr) { + int32_t cur_errno = errno; + return Status::IOError( + fmt::format("list path '{}' fail, ec: {}", path_, std::strerror(cur_errno))); + } + + while ((ep = readdir(dp)) != nullptr) { + if (strcmp(ep->d_name, ".") == 0 || strcmp(ep->d_name, "..") == 0) { + continue; + } + file_list->push_back(ep->d_name); + } + if (closedir(dp) < 0) { + file_list->clear(); + int32_t cur_errno = errno; + return Status::IOError( + fmt::format("list path '{}' fail, ec: {}", path_, std::strerror(cur_errno))); + } + return Status::OK(); +} + +Status LocalFile::ListFiles(std::vector* file_list) const { + CHECK_HOOK(); + file_list->clear(); + std::vector file_names; + PAIMON_RETURN_NOT_OK(List(&file_names)); + for (const auto& file_name : file_names) { + file_list->emplace_back(PathUtil::JoinPath(path_, file_name)); + } + return Status::OK(); +} + +Status LocalFile::Delete() const { + CHECK_HOOK(); + PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists()); + if (is_exist) { + if (::remove(path_.c_str()) != 0) { + if (errno != ENOENT) { + return Status::IOError( + fmt::format("delete path '{}' fail, ec: {}", path_, std::strerror(errno))); + } + } + } + return Status::OK(); +} + +Result LocalFile::Mkdir() const { + CHECK_HOOK(); + return mkdir(path_.c_str(), 0755) == 0; +} + +Result> LocalFile::GetFileStatus() const { + CHECK_HOOK(); + struct stat buf; + if (stat(path_.c_str(), &buf) < 0) { + int32_t cur_errno = errno; + return Status::IOError( + fmt::format("get file '{}' status failed, ec: {}", path_, std::strerror(cur_errno))); + } + return std::make_unique(path_, buf.st_size, buf.st_mtime * 1000, + S_ISDIR(buf.st_mode)); +} + +Result LocalFile::Length() const { + CHECK_HOOK(); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file_status, GetFileStatus()); + return file_status->GetLen(); +} + +Result LocalFile::LastModifiedTimeMs() const { + CHECK_HOOK(); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file_status, GetFileStatus()); + return file_status->GetModificationTime(); +} + +LocalFile LocalFile::GetParentFile() const { + size_t pos = path_.rfind('/'); + if (pos == std::string::npos) { + return LocalFile(""); + } else { + std::string parent_dir = path_.substr(0, pos); + return LocalFile(parent_dir); + } +} + +const std::string& LocalFile::GetAbsolutePath() const { + return path_; +} + +Result LocalFile::Read(char* buffer, uint32_t length, uint64_t offset) { + if (file_) { + CHECK_HOOK(); + int32_t fd = fileno(file_); + auto more = static_cast(length); + if (more < 0) { + return Status::IOError(fmt::format( + "pread file '{}' fail, length overflow int32_t, ec: EC_BADARGS", path_)); + } + + uint64_t off = 0; + int32_t ret = 0; + while (more > 0) { + ret = ::pread(fd, buffer + off, more, offset + off); + if (ret == -1) { + return Status::IOError( + fmt::format("pread file '{}' fail at off {}, with error {}, ec: {}", path_, off, + strerror(errno), std::strerror(errno))); + } + if (ret == 0) { + break; + } + more -= ret; + off += ret; + } + return off; + } + return Status::IOError(fmt::format( + "read file '{}' fail, can not read file which is opened fail, ec: EBADF", path_)); +} + +Result LocalFile::Read(char* buffer, uint32_t length) { + if (file_) { + CHECK_HOOK(); + auto more = static_cast(length); + if (more < 0) { + return Status::IOError( + fmt::format("fileName '{}', length '{}', ec: EC_BADARGS", path_, length)); + } + + int32_t ret = 0; + uint64_t off = 0; + while (more > 0) { + ret = fread(buffer + off, 1, more, file_); + if (ferror(file_) != 0) { + return Status::IOError( + fmt::format("read file '{}' fail at off {}, with error {}, ec: {}", path_, off, + strerror(errno), std::strerror(errno))); + } + more -= ret; + off += ret; + if (feof(file_)) { + break; + } + } + return off; + } + + return Status::IOError(fmt::format( + "read file '{}' fail, can not read file which is opened fail, ec: EBADF", path_)); +} + +Result LocalFile::Write(const char* buffer, uint32_t length) { + if (file_) { + CHECK_HOOK(); + auto more = static_cast(length); + if (more < 0) { + return Status::IOError(fmt::format( + "write file '{}' fail, length overflow int32_t, ec: EC_BADARGS", path_)); + } + + int32_t ret = 0; + uint64_t off = 0; + while (more > 0) { + ret = fwrite(buffer + off, 1, more, file_); + if (ferror(file_) != 0) { + return Status::IOError(fmt::format("write file '{}' fail, with error {}, ec: {}", + path_, off, strerror(errno), + std::strerror(errno))); + } + more -= ret; + off += ret; + } + return off; + } + + return Status::IOError( + fmt::format("write file '{}' fail, can not write file which not opened, ec: EBADF", path_)); +} + +Status LocalFile::Flush() { + if (file_) { + CHECK_HOOK(); + int32_t ret = fflush(file_); + if (0 == ret) { + CHECK_HOOK(); + int32_t fd = fileno(file_); + ret |= fsync(fd); + } + if (0 != ret) { + return Status::IOError( + fmt::format("flush '{}' fail, ec: {}", path_, std::strerror(errno))); + } + return Status::OK(); + } + return Status::IOError( + fmt::format("flush '{}' fail, can not flush file which not opened, ec: EBADF", path_)); +} + +Status LocalFile::OpenFile(bool is_read_file) { + if (is_read_file) { + PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists()); + if (!is_exist) { + return Status::IOError( + fmt::format("direct openFile '{}' fail, file not exist, ec: ENOENT", path_)); + } + PAIMON_ASSIGN_OR_RAISE(bool is_dir, IsDir()); + if (is_dir) { + return Status::IOError(fmt::format( + "direct openFile '{}' fail, cannot open a directory, ec: EISDIR", path_)); + } + CHECK_HOOK(); + file_ = fopen(path_.c_str(), "r"); + } else { + CHECK_HOOK(); + file_ = fopen(path_.c_str(), "w"); + } + if (file_ == nullptr) { + return Status::IOError(fmt::format("open '{}' fail, ec: {}", path_, std::strerror(errno))); + } + return Status::OK(); +} + +Status LocalFile::Close() { + if (file_) { + CHECK_HOOK(); + if (fclose(file_) != 0) { + file_ = nullptr; + return Status::IOError( + fmt::format("close '{}' fail, ec: {}", path_, std::strerror(errno))); + } + file_ = nullptr; + } + return Status::OK(); +} + +Status LocalFile::Seek(int64_t offset, int32_t seek_origin) { + if (file_) { + CHECK_HOOK(); + int32_t ret = 0; + ret = fseek(file_, offset, seek_origin); + if (ret != 0) { + return Status::IOError( + fmt::format("seek '{}' fail, ec: {}", path_, std::strerror(errno))); + } + return Status::OK(); + } + return Status::IOError( + fmt::format("seek '{}' fail, can not read file which not opened, ec: EBADF", path_)); +} + +Result LocalFile::Tell() const { + if (file_) { + CHECK_HOOK(); + int64_t ret = ftell(file_); + if (ret < 0) { + return Status::IOError( + fmt::format("tell '{}' fail, ec: {}", path_, std::strerror(errno))); + } + return ret; + } + return Status::IOError( + fmt::format("tell '{}' fail, can not read file which not opened, ec: EBADF", path_)); +} + +} // namespace paimon diff --git a/src/paimon/fs/local/local_file.h b/src/paimon/fs/local/local_file.h new file mode 100644 index 0000000..b53a34d --- /dev/null +++ b/src/paimon/fs/local/local_file.h @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +class IOHook; +class LocalFileStatus; + +class LocalFile { + public: + explicit LocalFile(const std::string& path); + ~LocalFile() = default; + + Result Exists() const; + Result IsFile() const; + Result IsDir() const; + Status List(std::vector* file_list) const; + Status ListFiles(std::vector* file_list) const; + Status Delete() const; + const std::string& GetAbsolutePath() const; + LocalFile GetParentFile() const; + Result Mkdir() const; + Result> GetFileStatus() const; + Result Length() const; + Result LastModifiedTimeMs() const; + Status OpenFile(bool is_read_file); + Result Read() { + return Status::NotImplemented(""); + } + Result Read(char* buffer, uint32_t length); + Result Read(char* buffer, uint32_t length, uint64_t offset); + Result Write(const char* buffer, uint32_t length); + Status Flush(); + Status Close(); + Status Seek(int64_t offset, int32_t seek_origin); + Result Tell() const; + + bool IsEmpty() const { + return path_.empty(); + } + + private: + const std::string path_; + FILE* file_ = nullptr; + IOHook* hook_; +}; + +} // namespace paimon diff --git a/src/paimon/fs/local/local_file_status.h b/src/paimon/fs/local/local_file_status.h new file mode 100644 index 0000000..eb1af1f --- /dev/null +++ b/src/paimon/fs/local/local_file_status.h @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/fs/file_system.h" + +namespace paimon { + +class LocalBasicFileStatus : public BasicFileStatus { + public: + LocalBasicFileStatus(const std::string& path, bool is_dir) : path_(path), is_dir_(is_dir) {} + + std::string GetPath() const override { + return path_; + } + + bool IsDir() const override { + return is_dir_; + } + + private: + std::string path_; + bool is_dir_; +}; + +class LocalFileStatus : public FileStatus { + public: + LocalFileStatus(const std::string& path, uint64_t length, int64_t last_modification_time, + bool is_dir) + : path_(path), + length_(length), + last_modification_time_(last_modification_time), + is_dir_(is_dir) {} + + std::string GetPath() const override { + return path_; + } + + uint64_t GetLen() const override { + return length_; + } + + int64_t GetModificationTime() const override { + return last_modification_time_; + } + + bool IsDir() const override { + return is_dir_; + } + + private: + std::string path_; + uint64_t length_; + int64_t last_modification_time_; + bool is_dir_; +}; + +} // namespace paimon diff --git a/src/paimon/fs/local/local_file_system.cpp b/src/paimon/fs/local/local_file_system.cpp new file mode 100644 index 0000000..f5c0af3 --- /dev/null +++ b/src/paimon/fs/local/local_file_system.cpp @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/fs/local/local_file_system.h" + +#include + +#include +#include +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/fs/local/local_file_status.h" + +namespace paimon { + +Result LocalFileSystem::ToFile(const std::string& path_string) const { + // local file system does not support path_string with scheme, e.g., "file:/tmp" will be + // rewritten to "/tmp" + PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(path_string)); + if (!path.scheme.empty() && StringUtils::ToLowerCase(path.scheme) != "file") { + return Status::Invalid(fmt::format("invalid scheme {} for local file system", path.scheme)); + } + return LocalFile(path.path); +} + +Result LocalFileSystem::Exists(const std::string& path) const { + PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); + return file.Exists(); +} + +Result> LocalFileSystem::Open(const std::string& path) const { + PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists(path)); + if (!is_exist) { + return Status::NotExist(fmt::format("File '{}' not exists", path)); + } + PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr in, LocalInputStream::Create(file)); + return in; +} + +Result> LocalFileSystem::Create(const std::string& path, + bool overwrite) const { + PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists(path)); + if (is_exist && !overwrite) { + return Status::Invalid( + fmt::format("do not allow overwrite, but the file {} already exists", path)); + } + PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); + LocalFile parent = file.GetParentFile(); + PAIMON_RETURN_NOT_OK(Mkdirs(parent.GetAbsolutePath())); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr out, LocalOutputStream::Create(file)); + return out; +} + +Status LocalFileSystem::Mkdirs(const std::string& path) const { + PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); + return MkdirsInternal(file); +} + +Status LocalFileSystem::MkdirsInternal(const LocalFile& file) const { + // Important: The 'Exists()' check above must come before the 'IsDir()' + // check to be safe when multiple parallel instances try to create the directory + PAIMON_ASSIGN_OR_RAISE(bool is_exist, file.Exists()); + if (is_exist) { + PAIMON_ASSIGN_OR_RAISE(bool is_dir, file.IsDir()); + if (is_dir) { + return Status::OK(); + } else { + // exists and is not a directory -> is a regular file + return Status::IOError(fmt::format("file {} already exists and is not a directory", + file.GetAbsolutePath())); + } + } + + auto parent = file.GetParentFile(); + if (!parent.IsEmpty()) { + PAIMON_RETURN_NOT_OK(MkdirsInternal(parent)); + } + PAIMON_ASSIGN_OR_RAISE(bool success, file.Mkdir()); + if (!success) { + PAIMON_ASSIGN_OR_RAISE(bool is_dir, file.IsDir()); + if (is_dir) { + return Status::OK(); + } else { + return Status::IOError( + fmt::format("create directory '{}' failed", file.GetAbsolutePath())); + } + } + return Status::OK(); +} + +Result> LocalFileSystem::GetFileStatus(const std::string& path) const { + PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); + PAIMON_ASSIGN_OR_RAISE(bool is_exist, file.Exists()); + if (is_exist) { + return file.GetFileStatus(); + } else { + return Status::NotExist( + fmt::format("File {} does not exist or the user running " + "Paimon has insufficient permissions to access it.", + file.GetAbsolutePath())); + } +} + +Status LocalFileSystem::ListDir( + const std::string& directory, + std::vector>* file_status_list) const { + PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(directory)); + PAIMON_ASSIGN_OR_RAISE(bool is_exist, file.Exists()); + if (!is_exist) { + return Status::OK(); + } + PAIMON_ASSIGN_OR_RAISE(bool is_file, file.IsFile()); + if (is_file) { + return Status::IOError( + fmt::format("file {} already exists and is not a directory", file.GetAbsolutePath())); + } else { + std::vector file_list; + PAIMON_RETURN_NOT_OK(file.List(&file_list)); + file_status_list->reserve(file_status_list->size() + file_list.size()); + for (const auto& f : file_list) { + Result> file_status = + GetFileStatus(PathUtil::JoinPath(directory, f)); + if (!file_status.ok() && !file_status.status().IsNotExist()) { + return file_status.status(); + } else if (file_status.ok()) { + file_status_list->emplace_back(std::make_unique( + file_status.value()->GetPath(), file_status.value()->IsDir())); + } + } + return Status::OK(); + } +} + +Status LocalFileSystem::ListFileStatus( + const std::string& path, std::vector>* file_status_list) const { + PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); + PAIMON_ASSIGN_OR_RAISE(bool is_exist, file.Exists()); + if (!is_exist) { + return Status::OK(); + } + PAIMON_ASSIGN_OR_RAISE(bool is_file, file.IsFile()); + if (is_file) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file_status, file.GetFileStatus()); + file_status_list->emplace_back(std::move(file_status)); + } else { + std::vector file_list; + PAIMON_RETURN_NOT_OK(file.List(&file_list)); + file_status_list->reserve(file_status_list->size() + file_list.size()); + for (const auto& f : file_list) { + Result> file_status = + GetFileStatus(PathUtil::JoinPath(path, f)); + if (!file_status.ok() && !file_status.status().IsNotExist()) { + return file_status.status(); + } else if (file_status.ok()) { + file_status_list->emplace_back(std::move(file_status).value()); + } + } + } + return Status::OK(); +} + +Status LocalFileSystem::Delete(const std::string& path, bool recursive) const { + PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); + PAIMON_ASSIGN_OR_RAISE(bool is_file, file.IsFile()); + if (is_file) { + return file.Delete(); + } + return Delete(file, recursive); +} + +Status LocalFileSystem::Delete(const LocalFile& f, bool recursive) const { + PAIMON_ASSIGN_OR_RAISE(bool is_dir, f.IsDir()); + if (is_dir) { + std::vector files; + PAIMON_RETURN_NOT_OK(f.ListFiles(&files)); + if (recursive == false && !files.empty()) { + return Status::IOError( + fmt::format("cannot delete {}, directory is not empty", f.GetAbsolutePath())); + } + for (const auto& file : files) { + PAIMON_RETURN_NOT_OK(Delete(file)); + } + } + // Now directory is empty + return f.Delete(); +} + +Status LocalFileSystem::Rename(const std::string& src, const std::string& dst) const { + std::string err_msg = fmt::format("rename '{}' to '{}' failed, because: ", src, dst); + PAIMON_ASSIGN_OR_RAISE(bool is_src_exist, Exists(src)); + if (!is_src_exist) { + return Status::NotExist(err_msg, "src file not exist"); + } + PAIMON_ASSIGN_OR_RAISE(bool is_dst_exist, Exists(dst)); + if (is_dst_exist) { + return Status::Invalid(err_msg, "dst file already exist"); + } + PAIMON_ASSIGN_OR_RAISE(LocalFile src_file, ToFile(src)); + PAIMON_ASSIGN_OR_RAISE(bool is_file, src_file.IsFile()); + std::string new_file_name = dst; + + if (is_file && new_file_name[new_file_name.length() - 1] == '/') { + return Status::Invalid(err_msg, "src file is not a dir"); + } + PAIMON_ASSIGN_OR_RAISE(LocalFile dst_file, ToFile(dst)); + auto parent = dst_file.GetParentFile(); + PAIMON_RETURN_NOT_OK(Mkdirs(parent.GetAbsolutePath())); + if (::rename(src.c_str(), dst.c_str()) != 0) { + int32_t cur_errno = errno; + return Status::IOError(err_msg, std::strerror(cur_errno)); + } + return Status::OK(); +} + +// input stream +Result> LocalInputStream::Create(LocalFile& file) { + PAIMON_RETURN_NOT_OK(file.OpenFile(/*is_read_file=*/true)); + return std::unique_ptr(new LocalInputStream(file)); +} + +LocalInputStream::LocalInputStream(const LocalFile& file) : file_(file) {} + +Status LocalInputStream::Seek(int64_t offset, SeekOrigin origin) { + if (origin != FS_SEEK_SET && origin != FS_SEEK_CUR && origin != FS_SEEK_END) { + return Status::Invalid( + "invalid SeekOrigin, only support FS_SEEK_SET, FS_SEEK_CUR, and FS_SEEK_END"); + } + auto convert_origin = [](SeekOrigin origin) -> int32_t { + switch (origin) { + case FS_SEEK_SET: + return SEEK_SET; + case FS_SEEK_CUR: + return SEEK_CUR; + case FS_SEEK_END: + return SEEK_END; + default: + return SEEK_SET; + } + }; + return file_.Seek(offset, convert_origin(origin)); +} + +Result LocalInputStream::GetPos() const { + return file_.Tell(); +} + +Result LocalInputStream::Read(char* buffer, uint32_t size) { + PAIMON_ASSIGN_OR_RAISE(int32_t read_length, file_.Read(buffer, size)); + if (read_length != static_cast(size)) { + return Status::IOError(fmt::format("file '{}' read size {} != expected {}", + file_.GetAbsolutePath(), read_length, size)); + } + return read_length; +} + +Result LocalInputStream::Read(char* buffer, uint32_t size, uint64_t offset) { + PAIMON_ASSIGN_OR_RAISE(int32_t read_length, file_.Read(buffer, size, offset)); + if (read_length != static_cast(size)) { + return Status::IOError(fmt::format("file '{}' read size {} != expected {}", + file_.GetAbsolutePath(), read_length, size)); + } + return read_length; +} + +void LocalInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t offset, + std::function&& callback) { + Result read_size = Read(buffer, size, offset); + Status status = Status::OK(); + if (!read_size.ok()) { + status = read_size.status(); + } else { + assert(read_size.value() == static_cast(size)); + } + callback(status); +} + +Result LocalInputStream::Length() const { + return file_.Length(); +} + +Status LocalInputStream::Close() { + return file_.Close(); +} + +// output stream +Result> LocalOutputStream::Create(LocalFile& file) { + PAIMON_RETURN_NOT_OK(file.OpenFile(/*is_read_file=*/false)); + return std::unique_ptr(new LocalOutputStream(file)); +} + +LocalOutputStream::LocalOutputStream(const LocalFile& file) : file_(file) {} + +Result LocalOutputStream::GetPos() const { + return file_.Tell(); +} +Result LocalOutputStream::Write(const char* buffer, uint32_t size) { + return file_.Write(buffer, size); +} +Status LocalOutputStream::Flush() { + return file_.Flush(); +} +Status LocalOutputStream::Close() { + return file_.Close(); +} + +} // namespace paimon diff --git a/src/paimon/fs/local/local_file_system.h b/src/paimon/fs/local/local_file_system.h new file mode 100644 index 0000000..0fe6abd --- /dev/null +++ b/src/paimon/fs/local/local_file_system.h @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "paimon/fs/file_system.h" +#include "paimon/fs/local/local_file.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +// `FileSystem` for local file. +class LocalFileSystem : public FileSystem { + public: + LocalFileSystem() = default; + ~LocalFileSystem() override = default; + + Result> Open(const std::string& path) const override; + Result> Create(const std::string& path, + bool overwrite) const override; + + Status Mkdirs(const std::string& path) const override; + Status Rename(const std::string& src, const std::string& dst) const override; + Status Delete(const std::string& path, bool recursive = true) const override; + Result> GetFileStatus(const std::string& path) const override; + Status ListDir(const std::string& directory, + std::vector>* file_status_list) const override; + Status ListFileStatus( + const std::string& path, + std::vector>* file_status_list) const override; + Result Exists(const std::string& path) const override; + + /// Converts the given %Path to a File for this file system. If the path is empty, + /// we will return `new File(".")` instead of `new File("")`, since + /// the latter returns `false` for `isDirectory` judgement. + Result ToFile(const std::string& path) const; + + private: + // the lock to ensure atomic renaming + static const std::mutex RENAME_LOCK; + + Status Delete(const LocalFile& f, bool recursive = true) const; + std::string GetParentPath(const std::string& path) const; + Status MkdirsInternal(const LocalFile& file) const; +}; + +class LocalInputStream : public InputStream { + public: + static Result> Create(LocalFile& file); + + Status Seek(int64_t offset, SeekOrigin origin) override; + Result GetPos() const override; + Result Read(char* buffer, uint32_t size) override; + Result Read(char* buffer, uint32_t size, uint64_t offset) override; + void ReadAsync(char* buffer, uint32_t size, uint64_t offset, + std::function&& callback) override; + + Status Close() override; + Result GetUri() const override { + return file_.GetAbsolutePath(); + } + Result Length() const override; + + private: + explicit LocalInputStream(const LocalFile& file); + + LocalFile file_; +}; + +class LocalOutputStream : public OutputStream { + public: + static Result> Create(LocalFile& file); + + Result GetPos() const override; + Result Write(const char* buffer, uint32_t size) override; + Status Flush() override; + Status Close() override; + Result GetUri() const override { + return file_.GetAbsolutePath(); + } + + private: + explicit LocalOutputStream(const LocalFile& file); + + LocalFile file_; +}; + +} // namespace paimon diff --git a/src/paimon/fs/local/local_file_system_factory.cpp b/src/paimon/fs/local/local_file_system_factory.cpp new file mode 100644 index 0000000..affac39 --- /dev/null +++ b/src/paimon/fs/local/local_file_system_factory.cpp @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/fs/local/local_file_system_factory.h" + +#include "paimon/factories/factory.h" + +namespace paimon { + +const char LocalFileSystemFactory::IDENTIFIER[] = "local"; + +REGISTER_PAIMON_FACTORY(LocalFileSystemFactory); + +} // namespace paimon diff --git a/src/paimon/fs/local/local_file_system_factory.h b/src/paimon/fs/local/local_file_system_factory.h new file mode 100644 index 0000000..1a3f763 --- /dev/null +++ b/src/paimon/fs/local/local_file_system_factory.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/fs/file_system_factory.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/result.h" + +namespace paimon { +class FileSystem; + +class LocalFileSystemFactory : public FileSystemFactory { + public: + static const char IDENTIFIER[]; + + const char* Identifier() const override { + return IDENTIFIER; + } + + Result> Create( + const std::string& path, const std::map& options) const override { + return std::make_unique(); + } +}; + +} // namespace paimon diff --git a/src/paimon/fs/local/local_file_test.cpp b/src/paimon/fs/local/local_file_test.cpp new file mode 100644 index 0000000..8b4b9d7 --- /dev/null +++ b/src/paimon/fs/local/local_file_test.cpp @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/fs/local/local_file.h" + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(LocalFileTest, TestReadWriteEmptyContent) { + auto test_root_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + LocalFile dir = LocalFile(test_root); + if (dir.Exists().ok()) { + ASSERT_TRUE(dir.Delete().ok()); + } + ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_TRUE(success); + std::string path = test_root + "/test.txt"; + LocalFile file = LocalFile(path); + if (file.Exists().ok()) { + ASSERT_TRUE(file.Delete().ok()); + } + ASSERT_OK_AND_ASSIGN(bool is_exist, file.Exists()); + ASSERT_FALSE(is_exist); + + ASSERT_OK(file.OpenFile(/*is_read_file=*/false)); + + const char* str = ""; + const int32_t str_size = 0; + ASSERT_OK_AND_ASSIGN(int32_t write_size, file.Write(str, str_size)); + ASSERT_EQ(write_size, str_size); + + ASSERT_OK(file.Flush()); + ASSERT_TRUE(file.Exists().value()); + + ASSERT_OK(file.Close()); + + LocalFile file2(path); + ASSERT_OK(file2.OpenFile(/*is_read_file=*/true)); + char buffer[10]; + ASSERT_OK_AND_ASSIGN(int32_t read_len, file2.Read(buffer, 10)); + ASSERT_EQ(0, read_len); +} + +TEST(LocalFileTest, TestSimple) { + auto test_root_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + LocalFile dir = LocalFile(test_root); + if (dir.Exists().ok()) { + ASSERT_OK(dir.Delete()); + } + ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_TRUE(success); + std::string path = test_root + "/test.txt"; + LocalFile file = LocalFile(path); + if (file.Exists().ok()) { + ASSERT_OK(file.Delete()); + } + ASSERT_OK_AND_ASSIGN(bool is_exist, file.Exists()); + ASSERT_FALSE(is_exist); + + ASSERT_OK(file.OpenFile(/*is_read_file=*/false)); + + const char* str = "test_data"; + const int32_t str_size = 9; + ASSERT_OK_AND_ASSIGN(int32_t write_size, file.Write(str, str_size)); + ASSERT_EQ(write_size, str_size); + + ASSERT_OK(file.Flush()); + ASSERT_OK(file.Close()); + + ASSERT_OK_AND_ASSIGN(bool is_file, file.IsFile()); + ASSERT_TRUE(is_file); + + ASSERT_OK_AND_ASSIGN(bool is_dir, file.IsDir()); + ASSERT_FALSE(is_dir); + + std::vector file_list; + ASSERT_NOK(file.List(&file_list)); + + ASSERT_OK_AND_ASSIGN(size_t len, file.Length()); + ASSERT_EQ(len, str_size); + + LocalFile file2 = LocalFile(path); + ASSERT_OK(file2.Exists()); + + ASSERT_OK(file2.OpenFile(true)); + char str_read[str_size + 1]; + { + ASSERT_OK_AND_ASSIGN(int32_t read_size, file2.Read(str_read, 4)); + ASSERT_EQ(read_size, 4); + str_read[read_size] = '\0'; + ASSERT_EQ(strcmp(str_read, "test"), 0); + } + { + ASSERT_OK_AND_ASSIGN(int64_t pos, file2.Tell()); + ASSERT_EQ(pos, 4); + ASSERT_OK(file2.Seek(5, SEEK_SET)); + ASSERT_OK_AND_ASSIGN(int32_t read_size, file2.Read(str_read, 4)); + ASSERT_EQ(read_size, 4); + str_read[read_size] = '\0'; + ASSERT_EQ(strcmp(str_read, "data"), 0); + } + { + ASSERT_OK_AND_ASSIGN(int32_t read_size, file2.Read(str_read, str_size, 0)); + ASSERT_EQ(read_size, str_size); + str_read[read_size] = '\0'; + ASSERT_EQ(strcmp(str_read, "test_data"), 0); + } + + // dir already exists + ASSERT_OK_AND_ASSIGN(success, dir.Mkdir()); + ASSERT_FALSE(success); + + ASSERT_OK(file2.Delete()); + ASSERT_FALSE(file2.Exists().value()); +} + +TEST(LocalFileTest, TestUsage) { + std::string test_root = "local_file_test_usage"; + LocalFile dir = LocalFile(test_root); + ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_TRUE(success); + std::vector file_list; + ASSERT_OK(dir.List(&file_list)); + std::string path_deep_dir = test_root + "/tmp2"; + LocalFile deep_dir = LocalFile(path_deep_dir); + ASSERT_OK_AND_ASSIGN(success, deep_dir.Mkdir()); + ASSERT_TRUE(success); + LocalFile parent_deep_dir = deep_dir.GetParentFile(); + ASSERT_EQ(parent_deep_dir.GetAbsolutePath(), test_root); + ASSERT_OK(deep_dir.Delete()); + ASSERT_OK(parent_deep_dir.Delete()); + ASSERT_OK(dir.Delete()); +} + +TEST(LocalFileTest, TestOpenFile) { + auto test_root_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + LocalFile dir = LocalFile(test_root); + if (dir.Exists().ok()) { + ASSERT_OK(dir.Delete()); + } + ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_TRUE(success); + std::string path = test_root + "/test.txt"; + LocalFile file = LocalFile(path); + if (file.Exists().ok()) { + ASSERT_OK(file.Delete()); + } + ASSERT_OK_AND_ASSIGN(bool is_exist, file.Exists()); + ASSERT_FALSE(is_exist); + + ASSERT_NOK_WITH_MSG(file.OpenFile(/*is_read_file=*/true), "file not exist"); + ASSERT_NOK_WITH_MSG(dir.OpenFile(/*is_read_file=*/true), "cannot open a directory"); + + std::string path3 = "test.txt"; + LocalFile file3 = LocalFile(path3); + ASSERT_OK(file3.OpenFile(/*is_read_file=*/false)); + ASSERT_OK_AND_ASSIGN(int64_t modify_time, file3.LastModifiedTimeMs()); + ASSERT_GE(modify_time, -1); + + LocalFile dir2 = LocalFile("/"); + ASSERT_OK_AND_ASSIGN(success, dir2.Mkdir()); + ASSERT_FALSE(success); + LocalFile dir3 = LocalFile(test_root + "/"); + ASSERT_OK_AND_ASSIGN(success, dir3.Mkdir()); + ASSERT_FALSE(success); +} + +TEST(LocalFileTest, TestMkdir) { + auto test_root_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + { + LocalFile dir = LocalFile(test_root + "tmp/local/f/1"); + ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_FALSE(success); + } + { + LocalFile dir = LocalFile(test_root + "tmp1"); + ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_TRUE(success); + } + { + LocalFile dir = LocalFile(test_root + "tmp1/f2/"); + ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_TRUE(success); + } + { + LocalFile dir = LocalFile("/"); + ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_FALSE(success); + } + { + LocalFile dir = LocalFile(""); + ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_FALSE(success); + } +} + +} // namespace paimon::test