diff --git a/mipengine/algorithms/descriptive_stats.py b/mipengine/algorithms/descriptive_stats.py index 10535ca0ea5ba5c81632102cd58f3f5f7b5e0033..dbe1e55ce664730ac70341220d02f61978eeefb4 100644 --- a/mipengine/algorithms/descriptive_stats.py +++ b/mipengine/algorithms/descriptive_stats.py @@ -36,6 +36,7 @@ import pandas as pd from pydantic import BaseModel from mipengine.algorithms.helpers import get_transfer_data +from mipengine.udfgen import MIN_ROW_COUNT from mipengine.udfgen import literal from mipengine.udfgen import merge_transfer from mipengine.udfgen import relation @@ -152,13 +153,15 @@ def run(executor): data=relation(), numerical_vars=literal(), nominal_vars=literal(), + min_row_count=MIN_ROW_COUNT, return_type=transfer(), ) -def local(data: pd.DataFrame, numerical_vars: list, nominal_vars: list): - # TODO privacy threshold is hardcoded. Find beter solution. - # https://team-1617704806227.atlassian.net/browse/MIP-689 - MIN_ROW_COUNT = 10 - +def local( + data: pd.DataFrame, + numerical_vars: list, + nominal_vars: list, + min_row_count: int, +): vars = numerical_vars + nominal_vars def record(var, dataset, data): @@ -168,7 +171,7 @@ def local(data: pd.DataFrame, numerical_vars: list, nominal_vars: list): return [record(var, dataset, None) for var in numerical_vars + nominal_vars] def compute_records(data, dataset): - if len(data) < MIN_ROW_COUNT: + if len(data) < min_row_count: return get_empty_records(dataset) num_total = len(data) # number datapoints/NA @@ -191,7 +194,7 @@ def local(data: pd.DataFrame, numerical_vars: list, nominal_vars: list): def numerical_var_data(var): # if privacy threshold is not met, return empty record - if num_dtps[var] < MIN_ROW_COUNT: + if num_dtps[var] < min_row_count: return None return dict( num_dtps=num_dtps[var], @@ -210,7 +213,7 @@ def local(data: pd.DataFrame, numerical_vars: list, nominal_vars: list): def nominal_var_data(var): # if privacy threshold is not met, return empty record - if num_dtps[var] < MIN_ROW_COUNT: + if num_dtps[var] < min_row_count: return None return dict( num_dtps=num_dtps[var], diff --git a/mipengine/node/tasks/udfs.py b/mipengine/node/tasks/udfs.py index 3ba622a3f01a96c92c9aaa4e47bdbef2960545d4..c2e3b45ebb8862e732e90fe0fd3ba40e95d7deee 100644 --- a/mipengine/node/tasks/udfs.py +++ b/mipengine/node/tasks/udfs.py @@ -498,6 +498,7 @@ def _generate_udf_statements( { "udf_name": udf_name, "node_id": node_config.identifier, + "min_row_count": node_config.privacy.minimum_row_count, } ) udf_statements = _create_udf_statements(udf_execution_queries, templates_mapping) diff --git a/mipengine/udfgen/__init__.py b/mipengine/udfgen/__init__.py index 8f6e64c0529a338b8ae9ed7b39ae24a8754af152..ece2ec45ca7f066f150f4d21cbbe229a27300add 100644 --- a/mipengine/udfgen/__init__.py +++ b/mipengine/udfgen/__init__.py @@ -1,4 +1,5 @@ from .udfgenerator import DEFERRED +from .udfgenerator import MIN_ROW_COUNT from .udfgenerator import TensorBinaryOp from .udfgenerator import TensorUnaryOp from .udfgenerator import generate_udf_queries @@ -32,4 +33,5 @@ __all__ = [ "TensorBinaryOp", "make_unique_func_name", "DEFERRED", + "MIN_ROW_COUNT", ] diff --git a/mipengine/udfgen/udfgenerator.py b/mipengine/udfgen/udfgenerator.py index 68e7ac41151f59c837533793958f11df1b9cc9a0..9f11256cb7c49a1033c5f20e0483245a3ada20c3 100644 --- a/mipengine/udfgen/udfgenerator.py +++ b/mipengine/udfgen/udfgenerator.py @@ -771,6 +771,38 @@ def udf_logger(): return UDFLoggerType() +class PlaceholderType(InputType): + def __init__(self, name): + self.name = "$" + name + + +def placeholder(name): + """ + UDF input type factory for inserting an assignment to arbitrary an + placeholder in UDF definition. + + Examples + -------- + Using this function in the udf decorator, as show below + + >>> @udf(x=placeholder("some_name")) + ... def f(x): + ... pass + + will insert the line + + x = $some_name + + in the SQL UDF definition. + """ + return PlaceholderType(name) + + +# special type for passing MIN_ROW_COUNT in UDF. Only Node knows the actual +# value so here it's exported as a placeholder and replaced by Node. +MIN_ROW_COUNT = placeholder("min_row_count") + + class TableType(ABC): @property @abstractmethod @@ -986,6 +1018,15 @@ class UDFLoggerArg(UDFArgument): self.udf_name = udf_name +class PlaceholderArg(UDFArgument): + def __init__(self, type): + self.type = type + + @property + def name(self): + return self.type.name + + class TableArg(UDFArgument, ABC): type: TableType @@ -1325,6 +1366,16 @@ class LoggerAssignment(ASTNode): return f"{name} = udfio.get_logger('{logger_arg.udf_name}', '{logger_arg.request_id}')" +class PlaceholderAssignments(ASTNode): + def __init__(self, placeholders): + self.placeholders = placeholders + + def compile(self) -> str: + return LN.join( + f"{name} = {arg.name}" for name, arg in self.placeholders.items() + ) + + class UDFBodyStatements(ASTNode): def __init__(self, statements): self.returnless_stmts = [ @@ -1344,6 +1395,7 @@ class UDFBody(ASTNode): smpc_args: Dict[str, SMPCSecureTransferArg], literal_args: Dict[str, LiteralArg], logger_arg: Optional[Tuple[str, UDFLoggerArg]], + placeholder_args: Dict[str, PlaceholderArg], statements: list, main_return_name: str, main_return_type: OutputType, @@ -1364,6 +1416,7 @@ class UDFBody(ASTNode): self.smpc_builds = SMPCBuilds(smpc_args) self.literals = LiteralAssignments(literal_args) self.logger = LoggerAssignment(logger_arg) + self.placeholders = PlaceholderAssignments(placeholder_args) all_types = ( [arg.type for arg in table_args.values()] + [main_return_type] @@ -1388,6 +1441,7 @@ class UDFBody(ASTNode): self.smpc_builds.compile(), self.literals.compile(), self.logger.compile(), + self.placeholders.compile(), self.returnless_stmts.compile(), self.loopback_return_stmts.compile(), self.return_stmt.compile(), @@ -1442,6 +1496,7 @@ class UDFDefinition(ASTNode): smpc_args: Dict[str, SMPCSecureTransferArg], literal_args: Dict[str, LiteralArg], logger_arg: Optional[Tuple[str, UDFLoggerArg]], + placeholder_args: Dict[str, PlaceholderArg], main_output_type: OutputType, sec_output_types: List[OutputType], smpc_used: bool, @@ -1457,6 +1512,7 @@ class UDFDefinition(ASTNode): smpc_args=smpc_args, literal_args=literal_args, logger_arg=logger_arg, + placeholder_args=placeholder_args, statements=funcparts.body_statements, main_return_name=funcparts.main_return_name, main_return_type=main_output_type, @@ -2230,6 +2286,14 @@ def get_udf_args(request_id, funcparts, posargs, keywordargs) -> Dict[str, UDFAr request_id=request_id, udf_name=funcparts.qualname, ) + placeholders = get_items_of_type(PlaceholderType, funcparts.sig.parameters) + if placeholders: + udf_args.update( + { + name: PlaceholderArg(type=placeholder) + for name, placeholder in placeholders.items() + } + ) return udf_args @@ -2335,6 +2399,7 @@ def get_udf_definition_template( logger_param = funcparts.logger_param_name if logger_param: logger_arg = (logger_param, input_args[logger_param]) + placeholder_args = get_items_of_type(PlaceholderArg, input_args) verify_declared_and_passed_param_types_match( funcparts.table_input_types, table_args @@ -2345,6 +2410,7 @@ def get_udf_definition_template( smpc_args=smpc_args, literal_args=literal_args, logger_arg=logger_arg, + placeholder_args=placeholder_args, main_output_type=main_output_type, sec_output_types=sec_output_types, smpc_used=smpc_used, diff --git a/tests/algorithm_validation_tests/five_node_deployment_template.toml b/tests/algorithm_validation_tests/five_node_deployment_template.toml index 53d3f98ed0ab2c7c5184a904dcf6d9936db0c4ab..af5919f7b20c866db3ff9c1e2b283ad10d7be149 100644 --- a/tests/algorithm_validation_tests/five_node_deployment_template.toml +++ b/tests/algorithm_validation_tests/five_node_deployment_template.toml @@ -14,7 +14,7 @@ celery_cleanup_task_timeout=2 celery_run_udf_task_timeout = 300 [privacy] -minimum_row_count = 0 +minimum_row_count = 1 [cleanup] nodes_cleanup_interval=30 diff --git a/tests/algorithm_validation_tests/one_node_deployment_template.toml b/tests/algorithm_validation_tests/one_node_deployment_template.toml index e9fd9193acb9aed4507471880e21dc648e864196..f724055d3e3d0ed3509adb3ab3ca9664f40213d8 100644 --- a/tests/algorithm_validation_tests/one_node_deployment_template.toml +++ b/tests/algorithm_validation_tests/one_node_deployment_template.toml @@ -14,7 +14,7 @@ celery_cleanup_task_timeout=2 celery_run_udf_task_timeout = 120 [privacy] -minimum_row_count = 0 +minimum_row_count = 1 [cleanup] nodes_cleanup_interval=30 diff --git a/tests/algorithm_validation_tests/test_anova_oneway_validation.py b/tests/algorithm_validation_tests/test_anova_oneway_validation.py index 7fbdccd243d002fe5b1c96d9540c3f468481a66c..ff8d2adb5b045a827d6ca04f85ef139a7282314c 100644 --- a/tests/algorithm_validation_tests/test_anova_oneway_validation.py +++ b/tests/algorithm_validation_tests/test_anova_oneway_validation.py @@ -12,7 +12,14 @@ algorithm_name = "anova_oneway" expected_file = Path(__file__).parent / "expected" / f"{algorithm_name}_expected.json" -@pytest.mark.parametrize("test_input, expected", get_test_params(expected_file)) +@pytest.mark.parametrize( + "test_input, expected", + get_test_params( + expected_file, + skip_indices=[10, 13, 19], + skip_reason="Awaiting https://team-1617704806227.atlassian.net/browse/MIP-698", + ), +) def test_anova_algorithm(test_input, expected): response = algorithm_request(algorithm_name, test_input) diff --git a/tests/algorithm_validation_tests/test_logisticregression_validation.py b/tests/algorithm_validation_tests/test_logisticregression_validation.py index 15b8d5cf2f779cc5b2a103e02778444dd5614f6b..681333e9f125069066f692225425bc7289235672 100644 --- a/tests/algorithm_validation_tests/test_logisticregression_validation.py +++ b/tests/algorithm_validation_tests/test_logisticregression_validation.py @@ -13,7 +13,14 @@ algorithm_name = "logistic_regression" expected_file = Path(__file__).parent / "expected" / f"{algorithm_name}_expected.json" -@pytest.mark.parametrize("test_input, expected", get_test_params(expected_file)) +@pytest.mark.parametrize( + "test_input, expected", + get_test_params( + expected_file, + skip_indices=[13], + skip_reason="Awaiting https://team-1617704806227.atlassian.net/browse/MIP-698", + ), +) def test_logisticregression_algorithm(test_input, expected, subtests): response = algorithm_request(algorithm_name, test_input) result = json.loads(response.content) diff --git a/tests/standalone_tests/test_udfgenerator.py b/tests/standalone_tests/test_udfgenerator.py index 0ce63a3f86169965c6b4e230035f09de852d3535..4ac9232f22b0cfe9f04d9d196fc5ef8295663703 100644 --- a/tests/standalone_tests/test_udfgenerator.py +++ b/tests/standalone_tests/test_udfgenerator.py @@ -48,6 +48,7 @@ from mipengine.udfgen.udfgenerator import mapping_inverse from mipengine.udfgen.udfgenerator import mappings_coincide from mipengine.udfgen.udfgenerator import merge_mappings_consistently from mipengine.udfgen.udfgenerator import merge_transfer +from mipengine.udfgen.udfgenerator import placeholder from mipengine.udfgen.udfgenerator import recursive_repr from mipengine.udfgen.udfgenerator import relation from mipengine.udfgen.udfgenerator import scalar @@ -5625,6 +5626,48 @@ FROM assert udf_output == expected_udf_output +class TestUDFGen_PlaceholderInputType(TestUDFGenBase, _TestGenerateUDFQueries): + @pytest.fixture(scope="class") + def udfregistry(self): + @udf(a=placeholder("some_name"), return_type=transfer()) + def f(a): + result = {"a": a} + return result + + return udf.registry + + @pytest.fixture(scope="class") + def expected_udfdef(self): + return """\ +CREATE OR REPLACE FUNCTION +$udf_name() +RETURNS +TABLE("transfer" CLOB) +LANGUAGE PYTHON +{ + import pandas as pd + import udfio + import json + a = $some_name + result = {'a': a} + return json.dumps(result) +}""" + + def test_generate_udf_queries( + self, + funcname, + expected_udfdef, + ): + udf_execution_queries = generate_udf_queries( + request_id="", + func_name=funcname, + positional_args=[], + keyword_args={}, + smpc_used=False, + ) + assert udf_execution_queries.udf_definition_query.template == expected_udfdef + + # ~~~~~~~~~~~~~~~~~~~~~~ Test SQL Generator ~~~~~~~~~~~~~~~~~~ #