From 16de8c25ae9292d788e7e92adb4227d7a4558f3e Mon Sep 17 00:00:00 2001
From: Philipp Spilger <philipp.spilger@kip.uni-heidelberg.de>
Date: Thu, 18 Apr 2024 20:36:09 +0200
Subject: [PATCH] fix: QuiggeldyConnection to work across server restarts

* Server restarts can happen at two points in time
  * before a remote call
  * during a remote call
* in both cases we retry connecting to the server and executing the
  remote call a configurable number of times with waiting a
  configurable duration in-between
* default is to retry 200 times every 100ms, yielding a maximum recovery
  time of 20s without connection to server

Depends-On: 22604
Change-Id: I01345647556de660b6d51732d0d9e5c5c484ada8
---
 include/hxcomm/common/quiggeldy_connection.h  |  6 ++
 .../hxcomm/common/quiggeldy_connection.tcc    | 58 ++++++++-----------
 .../common/quiggeldy_connection_impl.tcc      | 24 ++++----
 tests/sw/hxcomm/test-quiggeldy.cpp            | 53 +++++++++++++++++
 4 files changed, 96 insertions(+), 45 deletions(-)

diff --git a/include/hxcomm/common/quiggeldy_connection.h b/include/hxcomm/common/quiggeldy_connection.h
index b0ac034..c85e868 100644
--- a/include/hxcomm/common/quiggeldy_connection.h
+++ b/include/hxcomm/common/quiggeldy_connection.h
@@ -305,6 +305,12 @@ protected:
 	template <typename Submitter>
 	auto submit(Submitter const&);
 
+	template <typename Function, typename... Args>
+	auto retrying_client_invoke(bool with_user_data, Function&& function, Args&&... args);
+
+	template <typename Function, typename... Args>
+	auto retrying_client_invoke(bool with_user_data, Function&& function, Args&&... args) const;
+
 	/**
 	 * Return function that creates RCF clients with this connection's info.
 	 */
diff --git a/include/hxcomm/common/quiggeldy_connection.tcc b/include/hxcomm/common/quiggeldy_connection.tcc
index 7546459..654c1e2 100644
--- a/include/hxcomm/common/quiggeldy_connection.tcc
+++ b/include/hxcomm/common/quiggeldy_connection.tcc
@@ -12,6 +12,7 @@
 #include "hxcomm/common/logger.h"
 #include "hxcomm/common/quiggeldy_common.h"
 #include "hxcomm/common/quiggeldy_connection.h"
+#include "rcf-extensions/retrying-client-invoke.h"
 
 #include "slurm/vision_defines.h"
 
@@ -19,6 +20,28 @@ namespace hxcomm {
 
 using namespace std::literals::chrono_literals;
 
+template <typename ConnectionParameter, typename RcfClient>
+template <typename Function, typename... Args>
+auto QuiggeldyConnection<ConnectionParameter, RcfClient>::retrying_client_invoke(
+    bool with_user_data, Function&& function, Args&&... args)
+{
+	return rcf_extensions::retrying_client_invoke(
+	    [this, with_user_data]() { return setup_client(with_user_data); },
+	    m_connection_attempt_num_max, m_connection_attempt_wait_after, function,
+	    std::forward<Args>(args)...);
+}
+
+template <typename ConnectionParameter, typename RcfClient>
+template <typename Function, typename... Args>
+auto QuiggeldyConnection<ConnectionParameter, RcfClient>::retrying_client_invoke(
+    bool with_user_data, Function&& function, Args&&... args) const
+{
+	return rcf_extensions::retrying_client_invoke(
+	    [this, with_user_data]() { return setup_client(with_user_data); },
+	    m_connection_attempt_num_max, m_connection_attempt_wait_after, function,
+	    std::forward<Args>(args)...);
+}
+
 template <typename ConnectionParameter, typename RcfClient>
 template <typename Submitter>
 auto QuiggeldyConnection<ConnectionParameter, RcfClient>::submit(Submitter const& submitter)
@@ -27,40 +50,7 @@ auto QuiggeldyConnection<ConnectionParameter, RcfClient>::submit(Submitter const
 	m_reinit_uploader->refresh();
 
 	auto const cur_sequence_num = next_sequence_number();
-
-	auto client = setup_client();
-	size_t attempts_performed = 0;
-
-	auto last_user_notification = std::chrono::system_clock::now();
-	for (attempts_performed = 1; attempts_performed <= m_connection_attempt_num_max;
-	     ++attempts_performed) {
-		// build request and send it to server
-		try {
-			return submitter(client, cur_sequence_num);
-		} catch (const RCF::Exception& e) {
-			if (e.getErrorId() != RCF::RcfError_ClientConnectFail.getErrorId() ||
-			    attempts_performed == m_connection_attempt_num_max) {
-				// reraise if something unexpected happened or we reached the
-				// maximum number of tries
-				throw;
-			}
-		}
-		using namespace std::chrono_literals;
-		// Give the user feedback once per second in order to not spam the
-		// terminal
-		if ((std::chrono::system_clock::now() - last_user_notification) > 1s) {
-			HXCOMM_LOG_INFO(
-			    m_logger, "Server not ready yet, waiting "
-			                  << m_connection_attempt_wait_after.count()
-			                  << " ms in between attempts.. [Attempt: " << attempts_performed << "/"
-			                  << m_connection_attempt_num_max << "]");
-			last_user_notification = std::chrono::system_clock::now();
-		}
-		std::this_thread::sleep_for(m_connection_attempt_wait_after);
-	}
-	// NOTE: Should never be reached.
-	HXCOMM_LOG_FATAL(m_logger, "Could not submit request.");
-	throw std::runtime_error("Error submitting request.");
+	return retrying_client_invoke(true, submitter, cur_sequence_num);
 }
 
 namespace detail {
diff --git a/include/hxcomm/common/quiggeldy_connection_impl.tcc b/include/hxcomm/common/quiggeldy_connection_impl.tcc
index 093de13..2ba108b 100644
--- a/include/hxcomm/common/quiggeldy_connection_impl.tcc
+++ b/include/hxcomm/common/quiggeldy_connection_impl.tcc
@@ -85,7 +85,7 @@ QuiggeldyConnection<ConnectionParameter, RcfClient>::QuiggeldyConnection(
     typename QuiggeldyConnection<ConnectionParameter, RcfClient>::connect_parameters_type const&
         params) :
     m_connect_parameters{params},
-    m_connection_attempt_num_max(100),
+    m_connection_attempt_num_max(200),
     m_connection_attempt_wait_after(100ms),
     m_logger(log4cxx::Logger::getLogger("QuiggeldyConnection")),
     m_reinit_uploader{new reinit_uploader_type{
@@ -102,7 +102,8 @@ QuiggeldyConnection<ConnectionParameter, RcfClient>::QuiggeldyConnection(
 	m_session_uuid = boost::uuids::random_generator()();
 	try {
 		// Check if remote site has munge enabled.
-		m_use_munge = setup_client(false)->get_use_munge();
+		m_use_munge = retrying_client_invoke(
+		    false, [](auto const& client) { return client->get_use_munge(); });
 	} catch (const RCF::Exception& e) {
 		HXCOMM_LOG_ERROR(m_logger, "Could not request munge status from remote site: " << e.what());
 		m_use_munge = false;
@@ -287,9 +288,10 @@ QuiggeldyConnection<ConnectionParameter, RcfClient>::get_reinit_stack() const
 template <typename ConnectionParameter, typename RcfClient>
 void QuiggeldyConnection<ConnectionParameter, RcfClient>::reinit_enforce()
 {
-	setup_client()->reinit_enforce();
+	retrying_client_invoke(true, [](auto const& client) { return client->reinit_enforce(); });
 }
 
+
 template <typename ConnectionParameter, typename RcfClient>
 void QuiggeldyConnection<ConnectionParameter, RcfClient>::set_connection_attempts_max(size_t num)
 {
@@ -401,29 +403,29 @@ template <typename ConnectionParameter, typename RcfClient>
 std::string QuiggeldyConnection<ConnectionParameter, RcfClient>::get_unique_identifier(
     std::optional<std::string> hwdb_path) const
 {
-	auto client = setup_client();
-	return client->get_unique_identifier(hwdb_path);
+	return retrying_client_invoke(
+	    true, [hwdb_path](auto const& client) { return client->get_unique_identifier(hwdb_path); });
 }
 
 template <typename ConnectionParameter, typename RcfClient>
 std::string QuiggeldyConnection<ConnectionParameter, RcfClient>::get_bitfile_info() const
 {
-	auto client = setup_client();
-	return client->get_bitfile_info();
+	return retrying_client_invoke(
+	    true, [](auto const& client) { return client->get_bitfile_info(); });
 }
 
 template <typename ConnectionParameter, typename RcfClient>
 std::string QuiggeldyConnection<ConnectionParameter, RcfClient>::get_remote_repo_state() const
 {
-	auto client = setup_client();
-	return client->get_remote_repo_state();
+	return retrying_client_invoke(
+	    true, [](auto const& client) { return client->get_remote_repo_state(); });
 }
 
 template <typename ConnectionParameter, typename RcfClient>
 std::string QuiggeldyConnection<ConnectionParameter, RcfClient>::get_version_string() const
 {
-	auto client = setup_client();
-	return client->get_version_string();
+	return retrying_client_invoke(
+	    true, [](auto const& client) { return client->get_version_string(); });
 }
 
 template <typename ConnectionParameter, typename RcfClient>
diff --git a/tests/sw/hxcomm/test-quiggeldy.cpp b/tests/sw/hxcomm/test-quiggeldy.cpp
index 7c45f42..0fe924e 100644
--- a/tests/sw/hxcomm/test-quiggeldy.cpp
+++ b/tests/sw/hxcomm/test-quiggeldy.cpp
@@ -10,6 +10,7 @@
 #include <charconv>
 #include <chrono>
 #include <cstring>
+#include <future>
 #include <thread>
 
 #include <sys/types.h>
@@ -155,3 +156,55 @@ TEST(Quiggeldy, SimpleMockModeReinit)
 		ASSERT_EQ(WEXITSTATUS(status), 0);
 	}
 }
+
+TEST(Quiggeldy, ServerRestart)
+{
+	using namespace hxcomm;
+
+	auto log = log4cxx::Logger::getLogger("TestQuiggeldy");
+	HXCOMM_LOG_TRACE(log, "Starting");
+	int status;
+
+	hxcomm::port_t port = get_unused_port();
+
+	size_t const num_runs = 20;
+
+	int quiggeldy_pid = setup_quiggeldy(
+	    "quiggeldy", port, "--mock-mode", "--timeout", "20",
+	    hxcomm::is_munge_available() ? "" : "--no-munge");
+	using namespace std::literals::chrono_literals;
+	std::this_thread::sleep_for(1s);
+
+	auto const run_client = [port, &log]() -> int {
+		auto client = hxcomm::vx::QuiggeldyConnection("127.0.0.1", port);
+		StreamRC<decltype(client)> stream{client};
+
+		for (size_t i = 0; i < num_runs; ++i) {
+			// calling some remote method
+			auto const version = client.get_version_string();
+			(void) log;
+			HXCOMM_LOG_TRACE(log, "Executed program.");
+			std::this_thread::sleep_for(1s);
+		}
+		return 0;
+	};
+	auto ret = std::async(std::launch::async, run_client);
+
+	std::this_thread::sleep_for(10s);
+	HXCOMM_LOG_TRACE(log, "Killing quiggeldy.");
+	kill(quiggeldy_pid, SIGTERM);
+	std::this_thread::sleep_for(10s);
+
+	HXCOMM_LOG_TRACE(log, "Starting again");
+	quiggeldy_pid = setup_quiggeldy(
+	    "quiggeldy", port, "--mock-mode", "--timeout", "20",
+	    hxcomm::is_munge_available() ? "" : "--no-munge");
+	std::this_thread::sleep_for(10s);
+
+	HXCOMM_LOG_TRACE(log, "Waiting for quiggeldy to terminate.");
+	waitpid(quiggeldy_pid, &status, 0); // wait for the child to exit
+	ASSERT_TRUE(WIFEXITED(status));
+	ASSERT_EQ(WEXITSTATUS(status), 0);
+
+	EXPECT_EQ(ret.get(), 0);
+}
-- 
GitLab