From 0c28172ac57906543c4b9956bf2d1e3e95615e0e Mon Sep 17 00:00:00 2001 From: YangZhou Date: Tue, 7 Feb 2023 20:04:06 +0800 Subject: [PATCH] rm threadpool --- runtime/engine/common/utils/threadpool.cc | 118 -------------- runtime/engine/common/utils/threadpool.h | 151 ------------------ .../engine/common/utils/threadpool_test.cc | 65 -------- 3 files changed, 334 deletions(-) delete mode 100644 runtime/engine/common/utils/threadpool.cc delete mode 100644 runtime/engine/common/utils/threadpool.h delete mode 100644 runtime/engine/common/utils/threadpool_test.cc diff --git a/runtime/engine/common/utils/threadpool.cc b/runtime/engine/common/utils/threadpool.cc deleted file mode 100644 index 6544cda9b..000000000 --- a/runtime/engine/common/utils/threadpool.cc +++ /dev/null @@ -1,118 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. */ - -#include "utils/threadpool.h" - -#include - -#include "gflags/gflags.h" -#include "glog/logging.h" - -DEFINE_int32(io_threadpool_size, - 100, - "number of threads used for doing IO, default 100"); - -DECLARE_int32(dist_threadpool_size); - -namespace paddle { -namespace framework { -std::unique_ptr ThreadPool::threadpool_(nullptr); -std::once_flag ThreadPool::init_flag_; - -ThreadPool* ThreadPool::GetInstance() { - std::call_once(init_flag_, &ThreadPool::Init); - return threadpool_.get(); -} - -void ThreadPool::Init() { - if (threadpool_.get() == nullptr) { - // TODO(Yancey1989): specify the max threads number - int num_threads = std::thread::hardware_concurrency(); - if (FLAGS_dist_threadpool_size > 0 && - num_threads > FLAGS_dist_threadpool_size) { - num_threads = FLAGS_dist_threadpool_size; - VLOG(1) << "set dist_threadpool_size to " << num_threads; - } - CHECK_GT(num_threads, 0); - threadpool_.reset(new ThreadPool(num_threads)); - } -} - -ThreadPool::ThreadPool(int num_threads) : running_(true) { - threads_.resize(num_threads); - for (auto& thread : threads_) { - // TODO(Yancey1989): binding the thread on the specify CPU number - thread.reset(new std::thread(std::bind(&ThreadPool::TaskLoop, this))); - } -} - -ThreadPool::~ThreadPool() { - { - // notify all threads to stop running - std::unique_lock l(mutex_); - running_ = false; - } - scheduled_.notify_all(); - - for (auto& t : threads_) { - t->join(); - t.reset(nullptr); - } -} - -void ThreadPool::TaskLoop() { - while (true) { - Task task; - - { - std::unique_lock lock(mutex_); - scheduled_.wait(lock, [this] { - return !this->tasks_.empty() || !this->running_; - }); - - if (!running_ && tasks_.empty()) { - return; - } - - if (tasks_.empty()) { - LOG(ERROR) << "Current thread has no task to Run."; - throw std::bad_function_call(); - } - - // pop a task from the task queue - task = std::move(tasks_.front()); - tasks_.pop(); - } - // run the task - task(); - } -} - -std::unique_ptr ThreadPoolIO::io_threadpool_(nullptr); -std::once_flag ThreadPoolIO::io_init_flag_; - -ThreadPool* ThreadPoolIO::GetInstanceIO() { - std::call_once(io_init_flag_, &ThreadPoolIO::InitIO); - return io_threadpool_.get(); -} - -void ThreadPoolIO::InitIO() { - if (io_threadpool_.get() == nullptr) { - // TODO(typhoonzero1986): make this configurable - io_threadpool_.reset(new ThreadPool(FLAGS_io_threadpool_size)); - } -} - -} // namespace framework -} // namespace paddle diff --git a/runtime/engine/common/utils/threadpool.h b/runtime/engine/common/utils/threadpool.h deleted file mode 100644 index 29d0261ab..000000000 --- a/runtime/engine/common/utils/threadpool.h +++ /dev/null @@ -1,151 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#pragma once - -#include // NOLINT -#include -#include // NOLINT -#include -#include // NOLINT -#include -#include // NOLINT -#include -#include - -#include "glog/logging.h" - -namespace paddle { -namespace framework { - -struct ExceptionHandler { - mutable std::future> future_; - explicit ExceptionHandler( - std::future>&& f) - : future_(std::move(f)) {} - void operator()() const { - auto ex = this->future_.get(); - if (ex != nullptr) { - PADDLE_THROW(platform::errors::Fatal( - "The exception is thrown inside the thread pool. You " - "should use RunAndGetException to handle the exception." - "The exception is:\n %s.", - ex->what())); - } - } -}; - -// ThreadPool maintains a queue of tasks, and runs them using a fixed -// number of threads. -class ThreadPool { - public: - explicit ThreadPool(int num_threads); - - using Task = std::packaged_task()>; - - // Returns the singleton of ThreadPool. - static ThreadPool* GetInstance(); - - ~ThreadPool(); - - // Run pushes a function to the task queue and returns a std::future - // object. To wait for the completion of the task, call - // std::future::wait(). - template - std::future Run(Callback fn) { - auto f = this->RunAndGetException(fn); - return std::async(std::launch::deferred, - ExceptionHandler(std::move(f))); - } - - template - std::future> RunAndGetException( - Callback fn) { - Task task([fn]() -> std::unique_ptr { - try { - fn(); - } catch (platform::EnforceNotMet& ex) { - return std::unique_ptr( - new platform::EnforceNotMet(ex)); - } catch (const std::exception& e) { - PADDLE_THROW(platform::errors::Fatal( - "Unexpected exception is catched in thread pool. All " - "throwable exception in Paddle should be an EnforceNotMet." - "The exception is:\n %s.", - e.what())); - } - return nullptr; - }); - std::future> f = - task.get_future(); - { - std::unique_lock lock(mutex_); - if (!running_) { - PADDLE_THROW(platform::errors::Unavailable( - "Task is enqueued into stopped ThreadPool.")); - } - tasks_.push(std::move(task)); - } - scheduled_.notify_one(); - return f; - } - - private: - DISABLE_COPY_AND_ASSIGN(ThreadPool); - - // The constructor starts threads to run TaskLoop, which retrieves - // and runs tasks from the queue. - void TaskLoop(); - - // Init is called by GetInstance. - static void Init(); - - private: - static std::unique_ptr threadpool_; - static std::once_flag init_flag_; - - std::vector> threads_; - - std::queue tasks_; - std::mutex mutex_; - bool running_; - std::condition_variable scheduled_; -}; - -class ThreadPoolIO : ThreadPool { - public: - static ThreadPool* GetInstanceIO(); - static void InitIO(); - - private: - // NOTE: threadpool in base will be inhereted here. - static std::unique_ptr io_threadpool_; - static std::once_flag io_init_flag_; -}; - -// Run a function asynchronously. -// NOTE: The function must return void. If the function need to return a value, -// you can use lambda to capture a value pointer. -template -std::future Async(Callback callback) { - return ThreadPool::GetInstance()->Run(callback); -} - -template -std::future AsyncIO(Callback callback) { - return ThreadPoolIO::GetInstanceIO()->Run(callback); -} - -} // namespace framework -} // namespace paddle diff --git a/runtime/engine/common/utils/threadpool_test.cc b/runtime/engine/common/utils/threadpool_test.cc deleted file mode 100644 index f6865c128..000000000 --- a/runtime/engine/common/utils/threadpool_test.cc +++ /dev/null @@ -1,65 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "utils/threadpool.h" - -#include - -#include - -namespace framework = paddle::framework; - -void do_sum(std::vector>* fs, - std::mutex* mu, - std::atomic* sum, - int cnt) { - for (int i = 0; i < cnt; ++i) { - std::lock_guard l(*mu); - fs->push_back(framework::Async([sum]() { sum->fetch_add(1); })); - } -} - -TEST(ThreadPool, ConcurrentInit) { - framework::ThreadPool* pool; - int n = 50; - std::vector threads; - for (int i = 0; i < n; ++i) { - std::thread t( - [&pool]() { pool = framework::ThreadPool::GetInstance(); }); - threads.push_back(std::move(t)); - } - for (auto& t : threads) { - t.join(); - } -} - -TEST(ThreadPool, ConcurrentRun) { - std::atomic sum(0); - std::vector threads; - std::vector> fs; - std::mutex fs_mu; - int n = 50; - // sum = (n * (n + 1)) / 2 - for (int i = 1; i <= n; ++i) { - std::thread t(do_sum, &fs, &fs_mu, &sum, i); - threads.push_back(std::move(t)); - } - for (auto& t : threads) { - t.join(); - } - for (auto& t : fs) { - t.wait(); - } - EXPECT_EQ(sum, ((n + 1) * n) / 2); -}