-
jason authoredUnverified3dfefbe4
Federated Algorithms User Guide
Intro
This guide will walk you through the process of writing federated algorithms using Exareme. Exareme's focus is primarily on machine learning and statistical algorithms, but the framework is so general as to allow any type of algorithm to be implemented, provided that the input data is scattered across multiple decentralized data sources.
System Overview
Exareme consists of multiple services distributed across remote nodes. There are multiple local nodes and a single global node. Local nodes host primary data sources, while the global node is responsible for receiving data from local nodes and perform global computations.
Getting started
Federated algorithms overview
In the highest level, a federated algorithm is composed of three ingredients. Local computations, global computations and the algorithm flow. A local computation takes places in each local node and usually produces some aggregate of the primary data found in the local node. A global computation takes place in a special node called the global node, and usually consilidates the local aggregates into a global aggregate. Finally, the algorithm flow is responsible for coordinating these computations and the data exchange between nodes.
Writing a federated algorithm
In order to write a federated algorithm we need to define local and global computations and write an algorithm flow that ties the computations together.
Let's break down the steps one by one. We'll begin by writing a simple algorithm for computing the mean of a single variable. This algorithm will be written as a simple python script, running on a single machine. The purpose of this exercise is to lustrate how an algorithm is decomposed into local and global steps, and how the flow coordinates these steps. Later, we'll add the necessary ingredients in order to be able to run the algorithm in an actual federated environment, using Exareme.
Since we will run the algorithm on a single machine we will represent the
federated data as a list of dataframes data: list[pandas.DataFrame]
.
Local computations
A local computation is python function taking local data and returning a dictionary of local aggregates. The example below demonstrates a local computation function that calculates the sum and count of some variable.
def mean_local(local_data):
# Compute sum and count
sx = local_data.sum()
n = len(local_data)
# Pack results into single dictionary which will
# be transferred to global node
results = {"sx": sx, "n": n}
return results
The result packs the two aggregates, sx
and n
into a dictionary. A separate
instance of this function will run on each local node, so sx
and n
are
different for each node and reflect each node's local data.
Global computations
A global computation is also a python function, and it usually takes the output
of a local computation as an input. The local computations, coming from
multiple nodes, produce a list of results, from which a global aggregate is
computed. We can then perform further computations, as in the current example
where the mean is computed by dividing the global sx
with the global n
.
def mean_global(local_results):
# Sum aggregates from all nodes
sx = sum(res["sx"] for res in local_results)
n = sum(res["n"] for res in local_results)
# Compute global mean
mean = sx / n
return mean
Algorithm flow
The algorithm flow coordinates local and global computations, as well as the
exchange of data between nodes. We can write the algorithm flow as a python function
run
that calls mean_local
on all local nodes and then calls mean_global
on the
global node, passing the results of the local computations.
def run(all_data):
local_results = [mean_local(local_data) for local_data in all_data]
mean = mean_global(local_results)
return mean
We can then run the algorithm flow like this:
import pandas as pd
x1 = pd.DataFrame({"var": [1, 2, 3]})
x2 = pd.DataFrame({"var": [4, 5, 6]})
all_data = [x1, x2]
print(run(all_data))
Porting to Exareme
Let's now port the above algorithm to Exareme so that it can be run in a real federated environment. We will still write functions to represent local and global computations, but we need to take a few extra steps to help Exareme run in the federated environment.
Local and global steps as database UDFs
The first this we need to do is to inform Exareme about which functions should
be treated as local and global computations. This is done by the udf
decorator, imported from the udfgen
module. The name UDF stands for user
defined function and comes from the fact that the decorated functions will
run as database UDFs.
More importantly, we also need to inform Exareme about the types of variables
involved in the local and global computations. Python is a dynamic language
where type annotations are optional. On the other hand, code written for
Exareme will run in an environment which is not just a single Python
interpreter. The various local and global steps will run in separate
interpreters, each embedded in the corresponding relational database. These
computations will need to communicate with the database in order to read from
and write data to it. Moreover, the outputs of these local and global
computations can have different fates. Some will be sent across the network to
other nodes, while others will be stored in the same node for later processing.
Having variables with dynamic types would make the communication with the
database and the communication between nodes very difficult to implement
efficiently. To overcome this difficulties, the udfgen
module defines a
number of types for the input and output variables of local and global
computations.
Let's rewrite the local/global functions of the previous examples as Exareme UDFs. First the local UDF.
from exareme2.udfgen import udf, relation, transfer
@udf(local_data=relation(), return_type=transfer())
def mean_local(local_data):
# Compute two aggregates, sx and n_obs
sx = local_data.sum()
n = len(local_data)
# Pack results into single dictionary which will
# be transferred to global node
results = {"sx": sx, "n": n}
return results
The actual function is exactly the same as before, the difference lies in the
udf
decorator. local_data
is declared to be of type relation
. This means
that the variable will be a relational table, implemented in python as a pandas
dataframe. The output is of type transfer
. This means that we intent to
transfer the output to another node. In our python implementation this is a
plain dictionary but it will be converted to a JSON object in order to be
transferred. This means that the contents of the dictionary should be JSON
serializable.
Now, let's write the global UDF.
from exareme2.udfgen import udf, transfer, merge_transfer
@udf(local_results=merge_transfer(), return_type=transfer())
def mean_global(local_results):
# Sum aggregates from all nodes
sx = sum(res["sx"] for res in local_results)
n = sum(res["n"] for res in local_results)
# Compute global mean
mean = sx / n
# Pack result into dictionary
result = {"mean": mean}
return result
The type of local_results
is merge_transfer
. This means that the
local_results
will be a list of dictionaries corresponding to one
mean_local
output per node. The return type is now again of type transfer
since, unlike in the single-machine example, we now need to transfer the global
result to the algorithm flow which might run in a different machine.
Algorithm flow in Exareme
Finally, lets write the algorithm flow. This will be quite quite different from
the single-machine case. The flow is encapsulated as a python object exposing a
run
method. This object is instantiated by the Exareme algorithm execution
engine, which is the mechanism for executing federated algorithms and takes
care of relaying work to the nodes and routing the transfer of data between
nodes. As algorithm writers, we need to inform the algorithm execution
engine about UDF execution order and data transfer, and this is done through
the algorithm execution interface. To have access to this interface we have to
inherit from the Algorithm
base class.
from exareme2.algorithms.algorithm import Algorithm
class MyAlgorithm(Algorithm, algname="my_algorithm"):
def run(self, data, metadata):
local_results = self.engine.run_udf_on_local_nodes(
func=mean_local,
keyword_args={"local_data": data},
share_to_global=True,
)
result = self.engine.run_udf_on_global_node(
func=mean_global,
keyword_args={"local_results": local_results},
)
return result
The attribute engine
, inherited from Algorithm
, has two methods for calling
UDFs. run_udf_on_local_nodes
runs a particular UDF on all local nodes,
each with the corresponding local data. run_udf_on_global_node
runs a UDF
on the global node.
Since we want the local results to be transferred to the global node
for further computations, we have to pass the share_to_global=True
argument
to the first method.
Data Loader
One issue that did not come up in the single machine version is data loading.
In the single machine version this is a trivial operation. However, in the
federated case, the actual data content has some essential impact on the
algorithm orchestration. For a particular data choice by the user, all nodes
having no data, or having data below some privacy
threshold will not participate in the run. This is
something that Exareme needs to know before the start of the algorithm
execution. This is achieved by defining a separate class, extending
AlgorithmDataLoader
, where the algorithm writer implements the logic
according to which the data are loaded from the database into python
dataframes.
The main method to implement is get_variable_groups
which returns a
list[list[str]]
. The inner list represents a list of column names,
while the outer one a list of dataframes. The user requested column names
can be found in self._variables
.
In our case we need a very simple data loader for a single dataframe with a single column, as requested by the user (see Examples for more advanced uses).
from exareme2.algorithms.algorithm import AlgorithmDataLoader
class MyDataLoader(AlgorithmDataLoader, algname="mean"):
def get_variable_groups(self):
return [self._variables.x]
Algorithm Specifications
Finally, we need to define a specification for each algorithm. This contains information about the public API of each algorithm, such as the number and types of variables, whether they are required or optional, the names and types of additional parameters etc.
The full description can be found in
exareme2.algorithms.specifications.AlgorithmSpecification
.
The algorithm writer needs to provide a JSON file, with the same name and
location as the file where the algorithm is defined. E.g. for an algorithm
defined in dir1/dir2/my_algorithm.py
we also create
dir1/dir2/my_algorithm.json
. The contents of the file are a JSON object with
the exact same structure as
exareme2.algorithms.specifications.AlgorithmSpecification
.
In our example the specs are
{
"name": "mean",
"desc": "Computes the mean of a single variable.",
"label": "Mean",
"enabled": true,
"inputdata": {
"x": {
"label": "Variable",
"desc": "A unique numerical variable.",
"types": [ "real" ],
"stattypes": [ "numerical" ],
"notblank": true,
"multiple": false
}
}
}
Running the algorithm
Once all building blocks are in place, and our system is deployed, we can run
the algorithm either by performing a POST request to Exareme, or by using
run_algorithm
from the
command line.
Advanced topics
The previous example is enough to get you started, but Exareme offers a few more features, giving you the necessary tools to write more complex algorithms. Let's explore some of these tools in this section.
UDF generator
The UDF generator module
is responsible for translating the udf
decorated python functions into actual
UDFs which run in the database. This translation has a few subtle points,
mostly related to the conflict between the dynamically typed Python on one
hand, and the statically typed SQL on the other. The udfgen
module offers a
few types to be used as input/output types for UDFs. These encode information
about how to read or write a python object into the database.
API
For a detailed explanation of the various types see the module's docstring. Here we present a few important ones.
relation(schema=None)
The type relation
is used for relational tables. In the database these are
plain tables whereas in Python they are encoded as pandas dataframes. The
table's schema can be declared by passing the schema
arg to the constructor,
e.g. relation(schema=[('var1', int), ('var2', float)])
. If schema
is
None
, the schema is generic, i.e. it will work with any schema passed at
runtime.
tensor(dtype, ndims)
A tensor is an n-dimensional array. In Python these are encoded as numpy
arrays. Tensors are fundamentally different from relational tables in that
their types are homogeneous and the order of their rows matter. Tensors are
used when the algorithmic context is linear algebra, rather than relational
algebra. dtype
is the tensor's datatype, and can be of type type
or
exareme2.datatypes.DType
. ndims
is an int
and defines the tensor's
dimensions. Another benefit of tensors is that their data are stored in a
contiguous block of memory (unlike relations
where individual columns are
contiguous) which result in better efficiency when used within frameworks like
numpy
, which makes heavy use of vectorization capabilities of the CPU.
literal()
Literals are used to pass small, often scalar, values to UDFs. Examples are
single a int
, float
or str
, a small list
or dict
. In general, values
for which it wouldn't make much sense to encode as tables in the database.
These are not passed to the UDF as inputs. They are instead printed literally
to the UDF's code, hence the name.
transfer()
Transfer objects are used to send data to and from local/global nodes. In
Python they are plain dictionaries, but they are transformed to JSON for the
data transfer, so all values in the dict
must be JSON serializable, and all
keys must be strings. transfer
does not encrypt data for
SMPC and thus should be used for
non-sensible data and for sending data from the global node to the local nodes.
secure_transfer(sum_op=False, min_op=False, max_op=False)
Type used for sending data thought the SMPC cluster. See SMPC for more details.
state()
State objects are used to store data in the same node where they are produced,
for later consumption. Like transfer
/secure_transfer
, they are Python
dictionaries but they are serialized as binary objects using pickle
.
Multiple outputs
A UDF can also have multiple outputs of different kinds. The typical use-case is when we want to store part of the output locally for later use in the same node, and we want to transfer the other part of the output to another node.
from exareme2.udfgen import udf, state, transfer
@udf(input=relation(), return_type=[state(), transfer()])
def two_outputs(input):
... # compute stuff
output_state = {} # output_state is a dict where we store variables for later use
... # add stuff to output_state
output_transfer = {} # output_transfer is a dict with variables we want to transfer
... # add stuff to output_transfer
return output_state, output_transfer # multiple return statement
Note that this time we declared a list of outputs in the udf
decorator. Then,
we simply use the Python multiple return statement and Exareme takes care of
the rest. In the database, the first element of the return statement is
returned as a table (the usual way of returning things from a UDF), while the
remaining elements are returned through the loopback query mechanism, which is
a mechanism for executing database queries from within the code of the UDF. It
is therefore advised to place the object with the largest memory footprint
first in the list of returned objects.
Secure multi-party computation
Secure multi-party computation is a cryptographic technique used when multiple parties want to jointly perform a computation on their data, usually some form of aggregation, but wish for their individual data to remain private. For more details see Wikipedia. In Exareme, SMPC is used to compute global aggregates. When a global aggregate must be computed, all participating local nodes first compute the local aggregates. These are then fed to the SMPC cluster in an encrypted form. The SMPC cluster performs the global aggregation and sends the result back to Exareme, where it is passed as an input to the global UDF.
To implement an SMPC computation we need to have a local UDF with a
secure_transfer
output.
from exareme2.udfgen import udf, relation, secure_transfer
@udf(local_data=relation(), return_type=secure_transfer(sum_op=True))
def mean_local(local_data):
sx = local_data.sum()
n = len(local_data)
results = {"sx": {"data": sx, "operation": "sum", "type": float},
"n": {"data": n, "operation": "sum", "type": int}}
return results
First we have to activate one or more aggregation operations. Here we activate
summation, passing sum_op=True
. Then we have to pass some more information to
the output dict. Namely, the data, the operation used in the aggregation, and
the datatype. Values for the "data"
key can be scalars or nested lists.
The global UDF then needs to declare its input using transfer
.
@udf(local_results=transfer(), return_type=transfer())
def mean_global(local_results):
sx = local_results['sx']
n = local_results['n']
mean = sx / n
result = {"mean": mean}
return result
Note that we don't need to perform the actual summation, as we did previously, as it is now performed by the SMPC cluster.
Best practices
Memory efficiency
The whole point of translating Python functions into database UDFs (see UDF
generator) is to avoid unnecessary data copying, as the
database can transfer data to UDFs with zero cost. If we are not careful when
writing udf
functions, we could end up performing numerous unnecessary copies
of the original data, effectively canceling the zero-cost transfer.
For example, say we want to compute the product of three matrices, A
, B
and
C
, and then sum the elements of the final resulting matrix. The result is a
single float and we ought to be able to allocate just a single float. If we are
not careful and write
result = (A @ B @ C).sum()
Python will allocate a new matrix for the result of A @ B
, then another one for A @ B @ C
and then it will sum the elements of this last matrix to obtain the final result.
To overcome this we can use numpy.einsum
like this
result = numpy.einsum('ij,jk,kl->', A, B, C)
This will only allocate a single float!
There are multiple tricks like this one that we can use to reduce the memory footprint of our UDFs. You can find a few in this commit together with a short summary of the main ideas.
Time efficiency
When writing a federated algorithm, we usually decompose an existing algorithm into local and global computations. This decomposition is not unique in general, thus the same algorithm can be written in more than one ways. However, different decompositions might lead to important differences in execution time. This is related to the overall number of local and global steps. Let's call a sequence of one local and one global step, a federation round.. Each federation round, together with the required data interchange between nodes, takes a non-negligible amount of time. It is therefore desirable to find a decomposition that minimizes the number of federation rounds.
Consider, for example, the computation of the total sum of squares. This quantity is needed in many algorithms, such as linear regression or ANOVA. The total sum of squares (TSS) is given by
\text{TSS} = \sum_i^N (y_i - \hat{y})^2
We might think that we have to compute \\hat{y} in a single round, then share the result with the local nodes, and finally compute the TSS in a second round. But in fact, the whole computation can be done in a single round. We first develop the square of the difference.
\text{TSS} = \sum_i^N y_i^2 - 2 \hat{y} \sum_i^N y_i + N \hat{y}^2
It follows that we must compute the sum, the sum of squares and N, in the local step. Then, in the global step, we can compute \\hat{y} and the above expression for the TSS. We managed to compute the result in a single federation round, instead of two.