Skip to content
Snippets Groups Projects
Commit 16de8c25 authored by Philipp Spilger's avatar Philipp Spilger
Browse files

fix: QuiggeldyConnection to work across server restarts

* Server restarts can happen at two points in time
  * before a remote call
  * during a remote call
* in both cases we retry connecting to the server and executing the
  remote call a configurable number of times with waiting a
  configurable duration in-between
* default is to retry 200 times every 100ms, yielding a maximum recovery
  time of 20s without connection to server

Depends-On: 22604
Change-Id: I01345647556de660b6d51732d0d9e5c5c484ada8
parent 25bf97e5
No related branches found
No related tags found
No related merge requests found
......@@ -305,6 +305,12 @@ protected:
template <typename Submitter>
auto submit(Submitter const&);
template <typename Function, typename... Args>
auto retrying_client_invoke(bool with_user_data, Function&& function, Args&&... args);
template <typename Function, typename... Args>
auto retrying_client_invoke(bool with_user_data, Function&& function, Args&&... args) const;
/**
* Return function that creates RCF clients with this connection's info.
*/
......
......@@ -12,6 +12,7 @@
#include "hxcomm/common/logger.h"
#include "hxcomm/common/quiggeldy_common.h"
#include "hxcomm/common/quiggeldy_connection.h"
#include "rcf-extensions/retrying-client-invoke.h"
#include "slurm/vision_defines.h"
......@@ -19,6 +20,28 @@ namespace hxcomm {
using namespace std::literals::chrono_literals;
template <typename ConnectionParameter, typename RcfClient>
template <typename Function, typename... Args>
auto QuiggeldyConnection<ConnectionParameter, RcfClient>::retrying_client_invoke(
bool with_user_data, Function&& function, Args&&... args)
{
return rcf_extensions::retrying_client_invoke(
[this, with_user_data]() { return setup_client(with_user_data); },
m_connection_attempt_num_max, m_connection_attempt_wait_after, function,
std::forward<Args>(args)...);
}
template <typename ConnectionParameter, typename RcfClient>
template <typename Function, typename... Args>
auto QuiggeldyConnection<ConnectionParameter, RcfClient>::retrying_client_invoke(
bool with_user_data, Function&& function, Args&&... args) const
{
return rcf_extensions::retrying_client_invoke(
[this, with_user_data]() { return setup_client(with_user_data); },
m_connection_attempt_num_max, m_connection_attempt_wait_after, function,
std::forward<Args>(args)...);
}
template <typename ConnectionParameter, typename RcfClient>
template <typename Submitter>
auto QuiggeldyConnection<ConnectionParameter, RcfClient>::submit(Submitter const& submitter)
......@@ -27,40 +50,7 @@ auto QuiggeldyConnection<ConnectionParameter, RcfClient>::submit(Submitter const
m_reinit_uploader->refresh();
auto const cur_sequence_num = next_sequence_number();
auto client = setup_client();
size_t attempts_performed = 0;
auto last_user_notification = std::chrono::system_clock::now();
for (attempts_performed = 1; attempts_performed <= m_connection_attempt_num_max;
++attempts_performed) {
// build request and send it to server
try {
return submitter(client, cur_sequence_num);
} catch (const RCF::Exception& e) {
if (e.getErrorId() != RCF::RcfError_ClientConnectFail.getErrorId() ||
attempts_performed == m_connection_attempt_num_max) {
// reraise if something unexpected happened or we reached the
// maximum number of tries
throw;
}
}
using namespace std::chrono_literals;
// Give the user feedback once per second in order to not spam the
// terminal
if ((std::chrono::system_clock::now() - last_user_notification) > 1s) {
HXCOMM_LOG_INFO(
m_logger, "Server not ready yet, waiting "
<< m_connection_attempt_wait_after.count()
<< " ms in between attempts.. [Attempt: " << attempts_performed << "/"
<< m_connection_attempt_num_max << "]");
last_user_notification = std::chrono::system_clock::now();
}
std::this_thread::sleep_for(m_connection_attempt_wait_after);
}
// NOTE: Should never be reached.
HXCOMM_LOG_FATAL(m_logger, "Could not submit request.");
throw std::runtime_error("Error submitting request.");
return retrying_client_invoke(true, submitter, cur_sequence_num);
}
namespace detail {
......
......@@ -85,7 +85,7 @@ QuiggeldyConnection<ConnectionParameter, RcfClient>::QuiggeldyConnection(
typename QuiggeldyConnection<ConnectionParameter, RcfClient>::connect_parameters_type const&
params) :
m_connect_parameters{params},
m_connection_attempt_num_max(100),
m_connection_attempt_num_max(200),
m_connection_attempt_wait_after(100ms),
m_logger(log4cxx::Logger::getLogger("QuiggeldyConnection")),
m_reinit_uploader{new reinit_uploader_type{
......@@ -102,7 +102,8 @@ QuiggeldyConnection<ConnectionParameter, RcfClient>::QuiggeldyConnection(
m_session_uuid = boost::uuids::random_generator()();
try {
// Check if remote site has munge enabled.
m_use_munge = setup_client(false)->get_use_munge();
m_use_munge = retrying_client_invoke(
false, [](auto const& client) { return client->get_use_munge(); });
} catch (const RCF::Exception& e) {
HXCOMM_LOG_ERROR(m_logger, "Could not request munge status from remote site: " << e.what());
m_use_munge = false;
......@@ -287,9 +288,10 @@ QuiggeldyConnection<ConnectionParameter, RcfClient>::get_reinit_stack() const
template <typename ConnectionParameter, typename RcfClient>
void QuiggeldyConnection<ConnectionParameter, RcfClient>::reinit_enforce()
{
setup_client()->reinit_enforce();
retrying_client_invoke(true, [](auto const& client) { return client->reinit_enforce(); });
}
template <typename ConnectionParameter, typename RcfClient>
void QuiggeldyConnection<ConnectionParameter, RcfClient>::set_connection_attempts_max(size_t num)
{
......@@ -401,29 +403,29 @@ template <typename ConnectionParameter, typename RcfClient>
std::string QuiggeldyConnection<ConnectionParameter, RcfClient>::get_unique_identifier(
std::optional<std::string> hwdb_path) const
{
auto client = setup_client();
return client->get_unique_identifier(hwdb_path);
return retrying_client_invoke(
true, [hwdb_path](auto const& client) { return client->get_unique_identifier(hwdb_path); });
}
template <typename ConnectionParameter, typename RcfClient>
std::string QuiggeldyConnection<ConnectionParameter, RcfClient>::get_bitfile_info() const
{
auto client = setup_client();
return client->get_bitfile_info();
return retrying_client_invoke(
true, [](auto const& client) { return client->get_bitfile_info(); });
}
template <typename ConnectionParameter, typename RcfClient>
std::string QuiggeldyConnection<ConnectionParameter, RcfClient>::get_remote_repo_state() const
{
auto client = setup_client();
return client->get_remote_repo_state();
return retrying_client_invoke(
true, [](auto const& client) { return client->get_remote_repo_state(); });
}
template <typename ConnectionParameter, typename RcfClient>
std::string QuiggeldyConnection<ConnectionParameter, RcfClient>::get_version_string() const
{
auto client = setup_client();
return client->get_version_string();
return retrying_client_invoke(
true, [](auto const& client) { return client->get_version_string(); });
}
template <typename ConnectionParameter, typename RcfClient>
......
......@@ -10,6 +10,7 @@
#include <charconv>
#include <chrono>
#include <cstring>
#include <future>
#include <thread>
#include <sys/types.h>
......@@ -155,3 +156,55 @@ TEST(Quiggeldy, SimpleMockModeReinit)
ASSERT_EQ(WEXITSTATUS(status), 0);
}
}
TEST(Quiggeldy, ServerRestart)
{
using namespace hxcomm;
auto log = log4cxx::Logger::getLogger("TestQuiggeldy");
HXCOMM_LOG_TRACE(log, "Starting");
int status;
hxcomm::port_t port = get_unused_port();
size_t const num_runs = 20;
int quiggeldy_pid = setup_quiggeldy(
"quiggeldy", port, "--mock-mode", "--timeout", "20",
hxcomm::is_munge_available() ? "" : "--no-munge");
using namespace std::literals::chrono_literals;
std::this_thread::sleep_for(1s);
auto const run_client = [port, &log]() -> int {
auto client = hxcomm::vx::QuiggeldyConnection("127.0.0.1", port);
StreamRC<decltype(client)> stream{client};
for (size_t i = 0; i < num_runs; ++i) {
// calling some remote method
auto const version = client.get_version_string();
(void) log;
HXCOMM_LOG_TRACE(log, "Executed program.");
std::this_thread::sleep_for(1s);
}
return 0;
};
auto ret = std::async(std::launch::async, run_client);
std::this_thread::sleep_for(10s);
HXCOMM_LOG_TRACE(log, "Killing quiggeldy.");
kill(quiggeldy_pid, SIGTERM);
std::this_thread::sleep_for(10s);
HXCOMM_LOG_TRACE(log, "Starting again");
quiggeldy_pid = setup_quiggeldy(
"quiggeldy", port, "--mock-mode", "--timeout", "20",
hxcomm::is_munge_available() ? "" : "--no-munge");
std::this_thread::sleep_for(10s);
HXCOMM_LOG_TRACE(log, "Waiting for quiggeldy to terminate.");
waitpid(quiggeldy_pid, &status, 0); // wait for the child to exit
ASSERT_TRUE(WIFEXITED(status));
ASSERT_EQ(WEXITSTATUS(status), 0);
EXPECT_EQ(ret.get(), 0);
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment