#pragma once
#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/port/detail/ThreadFiber.h"
#include "object_pool.h"
#include "type_traits.h"
namespace td {
class PoolCatchupInterface {
public:
virtual ~PoolCatchupInterface() = default;
virtual void resume() = 0;
};
class PoolCatchup {
public:
template
explicit PoolCatchup(T &&p) : thread_fiber(std::forward(p)) {
if (thread_fiber) {
thread_fiber->resume();
}
}
template
PoolCatchup(const T &p) : thread_fiber(const_cast(p)) {
if (thread_fiber) {
thread_fiber->resume();
}
}
template
~PoolCatchup() {
if (thread_fiber) {
thread_fiber->stop_loop();
thread_fiber.reset();
}
}
private:
ThreadFiber thread_fiber;
/// Only ThreadFiber may call this method.
template
void resume() {
if (thread_fiber) {
thread_fiber->resume();
}
}
};
class CatchupInterface {
public:
virtual ~CatchupInterface() = default;
virtual PoolCatchupInterface &get_pool_catchup() = 0;
};
class Pool {
public:
template
static Pool *new_threadsafe(T &&t) {
return new ThreadSafePool(std::forward(t));
}
template
static Pool *new_threadsafe() {
return new ThreadSafePool();
}
};
class PoolCatchup : public CatchupInterface {
public:
PoolCatchup() {
thread_fiber.reset(new ThreadFiber(true));
}
~PoolCatchup() {
if (thread_fiber) {
thread_fiber->stop_loop();
}
}
CatchupInterface &get_pool_catchup() {
return thread_fiber;
}
private:
thread_fiber_id id;
ThreadFiber thread_fiber;
};
class PoolNoCatchup : public CatchupInterface {
public:
PoolNoCatchup() {
thread_fiber.reset(new ThreadFiber(false));
}
~PoolNoCatchup() {
if (thread_fiber) {
thread_fiber->stop_loop();
}
}
CatchupInterface &get_pool_catchup() {
return nullptr;
}
private:
thread_fiber_id id;
ThreadFiber thread_fiber;
};
class Pool {
public:
Pool(size_t size = 100) :
pool_catchup(std::make_unique()),
pool_no_catchup(std::make_unique()) {
register_custom_caughtup(pool_catchup.get_pool_catchup(), size);
}
Pool(int, size_t size = 100) :
pool_catchup(std::make_unique()),
pool_no_catchup(std::make_unique()) {
register_custom_caughtup(pool_catchup.get_pool_catchup(), size);
}
private:
template
static Pool *new_instance(T &&t) {
return new T(std::forward(t));
}
template
static Pool *new_instance() {
return new T();
}
ThreadFiber pool_catchup;
ThreadFiber pool_no_catchup;
};
class ThreadSafePoolNoCatchup : public ThreadSafePool {
public:
ThreadSafePoolNoCatchup() : ThreadSafePool(1000) {
}
};
template
Pool *PoolFactory::new_instance(T &&t) {
return new Pool(t);
}
template
Pool *PoolFactory::new_instance() {
return new Pool();
}
template
Pool *PoolFactory::new_threadsafe(T &&t) {
return ThreadSafePool(std::forward(t));
}
template
Pool *PoolFactory::new_threadsafe() {
return new ThreadSafePool();
}
template
void PoolFactory::register_custom_caughtup(CatchupInterface *iface, int size) {
if (!iface) {
return;
}
if (iface->get_pool_catchup() != nullptr) {
// The custom pool must wait for the thread to finish, it cannot be re-used to
// wait on any thread in the future.
dassert(false, "The custom pool cannot be re-used");
}
iface->set_pool_catchup(new PoolCatchup(size));
}
template
void PoolFactory::register_threadsafe_pool() {
Pool *pool = new_instance();
pool->register_custom_caughtup(pool->pool_no_catchup.get_pool_catchup(), 100);
}
template
void PoolFactory::deregister_custom_caughtup(CatchupInterface *iface) {
if (!iface) {
return;
}
iface->set_pool_catchup(nullptr);
}
template
void PoolFactory::deregister_threadsafe_pool() {
Pool *pool = new_instance();
pool->deregister_custom_caughtup(pool->pool_catchup.get_pool_catchup());
pool->deregister_custom_caughtup(pool->pool_no_catchup.get_pool_catchup());
delete pool;
}
template
PoolFactory::PoolFactory() {
register_threadsafe_pool();
}
template
template
void PoolFactory::register_custom_spawn(E &&thread, E *return_code) {
if (!thread) {
return;
}
this->pool->register_custom_caughtup(this->pool->pool_no_catchup.get_pool_catchup(), 1);
Catchup>::get()->set_callback([thread, return_code]() {
ThreadSafePoolNoCatchup &pool = *new_instance();
pool.pool_catchup.reset(new ThreadFiber(false));
if (!pool.pool_catchup) {
LOG_ERROR("thread safe pool cannot be created");
return;
}
pool.pool_catchup->resume();
*return_code = pool.pool_catchup->get_caughtup_returncode();
pool.pool_catchup->stop_loop();
pool.pool_catchup.reset();
});
}
template
template
void PoolFactory::deregister_custom_spawn(E &&thread) {
if (thread) {
// ThreadFactory will automatically unregister itself
return;
}
if (!thread) {
return;
}
this->pool->deregister_custom_caughtup(this->pool->pool_no_catchup.get_pool_catchup());
ThreadFiber *fiber = Catchup>::get()->get_caughtup();
fiber->stop_loop();
}
template
template
void PoolFactory::register_custom_spawn(E &&thread) {
if (!thread) {
return;
}
this->pool->register_custom_caughtup(this->pool->pool_no_catchup.get_