diff --git a/python/katana/analytics/__init__.py b/python/katana/analytics/__init__.py index dab88ff239..1c981a8fb8 100644 --- a/python/katana/analytics/__init__.py +++ b/python/katana/analytics/__init__.py @@ -30,6 +30,12 @@ .. automodule:: katana.analytics._independent_set +.. automodule:: katana.analytics._louvain_clustering + +.. automodule:: katana.analytics._local_clustering_coefficient + +.. automodule:: katana.analytics._subgraph_extraction + .. automodule:: katana.analytics._jaccard .. automodule:: katana.analytics._k_core @@ -67,6 +73,14 @@ IndependentSetStatistics, ) from katana.analytics._jaccard import jaccard, jaccard_assert_valid, JaccardPlan, JaccardStatistics +from katana.analytics._louvain_clustering import ( + louvain_clustering, + louvain_clustering_assert_valid, + LouvainClusteringPlan, + LouvainClusteringStatistics, +) +from katana.analytics._local_clustering_coefficient import local_clustering_coefficient, LocalClusteringCoefficientPlan +from katana.analytics._subgraph_extraction import subgraph_extraction, SubGraphExtractionPlan from katana.analytics._k_core import k_core, k_core_assert_valid, KCorePlan, KCoreStatistics from katana.analytics._k_truss import k_truss, k_truss_assert_valid, KTrussPlan, KTrussStatistics from katana.analytics._pagerank import pagerank, pagerank_assert_valid, PagerankPlan, PagerankStatistics diff --git a/python/katana/analytics/_local_clustering_coefficient.pyx b/python/katana/analytics/_local_clustering_coefficient.pyx new file mode 100644 index 0000000000..bcc496fd45 --- /dev/null +++ b/python/katana/analytics/_local_clustering_coefficient.pyx @@ -0,0 +1,119 @@ +from katana.cpp.libstd.iostream cimport ostream, ostringstream +from katana.cpp.libgalois.graphs.Graph cimport _PropertyGraph +from katana.cpp.libsupport.result cimport handle_result_void, handle_result_assert, raise_error_code, Result +from katana.analytics.plan cimport Plan, _Plan +from katana.property_graph cimport PropertyGraph + +from libcpp.string cimport string +from libcpp cimport bool + +from enum import Enum + +# TODO(amp): Module needs documenting. + + +cdef extern from "katana/analytics/local_clustering_coefficient/local_clustering_coefficient.h" namespace "katana::analytics" nogil: + cppclass _LocalClusteringCoefficientPlan "katana::analytics::LocalClusteringCoefficientPlan" (_Plan): + enum Algorithm: + kOrderedCountAtomics "katana::analytics::LocalClusteringCoefficientPlan::kOrderedCountAtomics" + kOrderedCountPerThread "katana::analytics::LocalClusteringCoefficientPlan::kOrderedCountPerThread" + + enum Relabeling: + kRelabel "katana::analytics::LocalClusteringCoefficientPlan::kRelabel" + kNoRelabel "katana::analytics::LocalClusteringCoefficientPlan::kNoRelabel" + kAutoRelabel "katana::analytics::LocalClusteringCoefficientPlan::kAutoRelabel" + + _LocalClusteringCoefficientPlan.Algorithm algorithm() const + _LocalClusteringCoefficientPlan.Relabeling relabeling() const + bool edges_sorted() const + + # LocalClusteringCoefficientPlan() + + @staticmethod + _LocalClusteringCoefficientPlan OrderedCountAtomics( + bool edges_sorted, + _LocalClusteringCoefficientPlan.Relabeling relabeling + ) + @staticmethod + _LocalClusteringCoefficientPlan OrderedCountPerThread( + bool edges_sorted, + _LocalClusteringCoefficientPlan.Relabeling relabeling + ) + + _LocalClusteringCoefficientPlan.Relabeling kDefaultRelabeling "katana::analytics::LocalClusteringCoefficientPlan::kDefaultRelabeling" + bool kDefaultEdgesSorted "katana::analytics::LocalClusteringCoefficientPlan::kDefaultEdgesSorted" + + Result[void] LocalClusteringCoefficient(_PropertyGraph* pfg, const string& output_property_name, _LocalClusteringCoefficientPlan plan) + + +class _LocalClusteringCoefficientPlanAlgorithm(Enum): + OrderedCountAtomics = _LocalClusteringCoefficientPlan.Algorithm.kOrderedCountAtomics + OrderedCountPerThread = _LocalClusteringCoefficientPlan.Algorithm.kOrderedCountPerThread + + +cdef _relabeling_to_python(v): + if v == _LocalClusteringCoefficientPlan.Relabeling.kRelabel: + return True + elif v == _LocalClusteringCoefficientPlan.Relabeling.kNoRelabel: + return False + else: + return None + + +cdef _relabeling_from_python(v): + if v is None: + return _LocalClusteringCoefficientPlan.Relabeling.kAutoRelabel + elif v: + return _LocalClusteringCoefficientPlan.Relabeling.kRelabel + else: + return _LocalClusteringCoefficientPlan.Relabeling.kNoRelabel + + +cdef class LocalClusteringCoefficientPlan(Plan): + cdef: + _LocalClusteringCoefficientPlan underlying_ + + cdef _Plan* underlying(self) except NULL: + return &self.underlying_ + + Algorithm = _LocalClusteringCoefficientPlanAlgorithm + + @staticmethod + cdef LocalClusteringCoefficientPlan make(_LocalClusteringCoefficientPlan u): + f = LocalClusteringCoefficientPlan.__new__(LocalClusteringCoefficientPlan) + f.underlying_ = u + return f + + @property + def algorithm(self) -> Algorithm: + return _LocalClusteringCoefficientPlanAlgorithm(self.underlying_.algorithm()) + + @property + def relabeling(self): + return self.underlying_.relabeling() + + @property + def edges_sorted(self) -> bool: + return self.underlying_.edges_sorted() + + @staticmethod + def ordered_count_atomics( + relabeling = _relabeling_to_python(kDefaultRelabeling), + bool edges_sorted = kDefaultEdgesSorted + ): + return LocalClusteringCoefficientPlan.make(_LocalClusteringCoefficientPlan.OrderedCountAtomics( + edges_sorted, _relabeling_from_python(relabeling))) + + @staticmethod + def ordered_count_per_thread( + relabeling = _relabeling_to_python(kDefaultRelabeling), + bool edges_sorted = kDefaultEdgesSorted + ): + return LocalClusteringCoefficientPlan.make(_LocalClusteringCoefficientPlan.OrderedCountPerThread( + edges_sorted, _relabeling_from_python(relabeling))) + + +def local_clustering_coefficient(PropertyGraph pg, str output_property_name, LocalClusteringCoefficientPlan plan = LocalClusteringCoefficientPlan()): + cdef string output_property_name_str = bytes(output_property_name, "utf-8") + with nogil: + handle_result_void(LocalClusteringCoefficient(pg.underlying.get(), output_property_name_str, plan.underlying_)) diff --git a/python/katana/analytics/_louvain_clustering.pyx b/python/katana/analytics/_louvain_clustering.pyx new file mode 100644 index 0000000000..c5d1d498f8 --- /dev/null +++ b/python/katana/analytics/_louvain_clustering.pyx @@ -0,0 +1,188 @@ +from katana.cpp.libstd.iostream cimport ostream, ostringstream +from katana.cpp.libgalois.graphs.Graph cimport _PropertyGraph +from katana.cpp.libsupport.result cimport handle_result_void, handle_result_assert, raise_error_code, Result +from katana.analytics.plan cimport Plan, _Plan +from katana.property_graph cimport PropertyGraph + +from libc.stdint cimport uint32_t, uint64_t +from libcpp.string cimport string +from libcpp cimport bool + +from enum import Enum + +# TODO(amp): Module needs documenting. + + +cdef extern from "katana/analytics/louvain_clustering/louvain_clustering.h" namespace "katana::analytics" nogil: + cppclass _LouvainClusteringPlan "katana::analytics::LouvainClusteringPlan" (_Plan): + enum Algorithm: + kDoAll "katana::analytics::LouvainClusteringPlan::kDoAll" + + _LouvainClusteringPlan.Algorithm algorithm() const + bool enable_vf() const + double modularity_threshold_per_round() const + double modularity_threshold_total() const + uint32_t max_iterations() const + uint32_t min_graph_size() const + + # LouvainClusteringPlan() + + @staticmethod + _LouvainClusteringPlan DoAll( + bool enable_vf, + double modularity_threshold_per_round, + double modularity_threshold_total, + uint32_t max_iterations, + uint32_t min_graph_size + ) + + bool kDefaultEnableVF "katana::analytics::LouvainClusteringPlan::kDefaultEnableVF" + double kDefaultModularityThresholdPerRound "katana::analytics::LouvainClusteringPlan::kDefaultModularityThresholdPerRound" + double kDefaultModularityThresholdTotal "katana::analytics::LouvainClusteringPlan::kDefaultModularityThresholdTotal" + uint32_t kDefaultMaxIterations "katana::analytics::LouvainClusteringPlan::kDefaultMaxIterations" + uint32_t kDefaultMinGraphSize "katana::analytics::LouvainClusteringPlan::kDefaultMinGraphSize" + + Result[void] LouvainClustering(_PropertyGraph* pfg, const string& edge_weight_property_name,const string& output_property_name, _LouvainClusteringPlan plan) + + Result[void] LouvainClusteringAssertValid(_PropertyGraph* pfg, + const string& edge_weight_property_name, + const string& output_property_name + ) + + cppclass _LouvainClusteringStatistics "katana::analytics::LouvainClusteringStatistics": + uint64_t n_clusters + uint64_t n_non_trivial_clusters + uint64_t largest_cluster_size + double largest_cluster_proportion + double modularity + + void Print(ostream os) + + @staticmethod + Result[_LouvainClusteringStatistics] Compute(_PropertyGraph* pfg, + const string& edge_weight_property_name, + const string& output_property_name + ) + + +class _LouvainClusteringPlanAlgorithm(Enum): + DoAll = _LouvainClusteringPlan.Algorithm.kDoAll + + +cdef class LouvainClusteringPlan(Plan): + cdef: + _LouvainClusteringPlan underlying_ + + cdef _Plan* underlying(self) except NULL: + return &self.underlying_ + + Algorithm = _LouvainClusteringPlanAlgorithm + + @staticmethod + cdef LouvainClusteringPlan make(_LouvainClusteringPlan u): + f = LouvainClusteringPlan.__new__(LouvainClusteringPlan) + f.underlying_ = u + return f + + @property + def algorithm(self) -> Algorithm: + return _LouvainClusteringPlanAlgorithm(self.underlying_.algorithm()) + + @property + def enable_vf(self) -> bool: + return self.underlying_.enable_vf() + + @property + def modularity_threshold_per_round(self) -> double: + return self.underlying_.modularity_threshold_per_round() + + @property + def modularity_threshold_total(self) -> double: + return self.underlying_.modularity_threshold_total() + + @property + def max_iterations(self) -> uint32_t: + return self.underlying_.max_iterations() + + @property + def min_graph_size(self) -> uint32_t: + return self.underlying_.min_graph_size() + + + @staticmethod + def do_all( + bool enable_vf = kDefaultEnableVF, + double modularity_threshold_per_round = kDefaultModularityThresholdPerRound, + double modularity_threshold_total = kDefaultModularityThresholdTotal, + uint32_t max_iterations = kDefaultMaxIterations, + uint32_t min_graph_size = kDefaultMinGraphSize + ) -> LouvainClusteringPlan: + return LouvainClusteringPlan.make(_LouvainClusteringPlan.DoAll( + enable_vf, modularity_threshold_per_round, modularity_threshold_total, max_iterations, min_graph_size)) + + +def louvain_clustering(PropertyGraph pg, str edge_weight_property_name, str output_property_name, LouvainClusteringPlan plan = LouvainClusteringPlan()): + cdef string edge_weight_property_name_str = bytes(edge_weight_property_name, "utf-8") + cdef string output_property_name_str = bytes(output_property_name, "utf-8") + with nogil: + handle_result_void(LouvainClustering(pg.underlying.get(), edge_weight_property_name_str, output_property_name_str, plan.underlying_)) + + +def louvain_clustering_assert_valid(PropertyGraph pg, str edge_weight_property_name, str output_property_name ): + cdef string edge_weight_property_name_str = bytes(edge_weight_property_name, "utf-8") + cdef string output_property_name_str = bytes(output_property_name, "utf-8") + with nogil: + handle_result_assert(LouvainClusteringAssertValid(pg.underlying.get(), + edge_weight_property_name_str, + output_property_name_str + )) + + +cdef _LouvainClusteringStatistics handle_result_LouvainClusteringStatistics(Result[_LouvainClusteringStatistics] res) nogil except *: + if not res.has_value(): + with gil: + raise_error_code(res.error()) + return res.value() + + +cdef class LouvainClusteringStatistics: + cdef _LouvainClusteringStatistics underlying + + def __init__(self, PropertyGraph pg, + str edge_weight_property_name, + str output_property_name + ): + cdef string edge_weight_property_name_str = bytes(edge_weight_property_name, "utf-8") + cdef string output_property_name_str = bytes(output_property_name, "utf-8") + with nogil: + self.underlying = handle_result_LouvainClusteringStatistics(_LouvainClusteringStatistics.Compute( + pg.underlying.get(), + edge_weight_property_name_str, + output_property_name_str + )) + + @property + def n_clusters(self) -> uint64_t: + return self.underlying.n_clusters + + @property + def n_non_trivial_clusters(self) -> uint64_t: + return self.underlying.n_non_trivial_clusters + + @property + def largest_cluster_size(self) -> uint64_t: + return self.underlying.largest_cluster_size + + @property + def largest_cluster_proportion(self) -> double: + return self.underlying.largest_cluster_proportion + + @property + def modularity(self) -> double: + return self.underlying.modularity + + + def __str__(self) -> str: + cdef ostringstream ss + self.underlying.Print(ss) + return str(ss.str(), "ascii") diff --git a/python/katana/analytics/_subgraph_extraction.pyx b/python/katana/analytics/_subgraph_extraction.pyx new file mode 100644 index 0000000000..3efddfc482 --- /dev/null +++ b/python/katana/analytics/_subgraph_extraction.pyx @@ -0,0 +1,73 @@ +from katana.cpp.libgalois.graphs.Graph cimport _PropertyGraph +from katana.cpp.libstd.iostream cimport ostream, ostringstream +from katana.cpp.libsupport.result cimport handle_result_void, handle_result_assert, raise_error_code, Result +from katana.analytics.plan cimport Plan, _Plan +from katana.property_graph cimport PropertyGraph + +from libcpp.vector cimport vector +from libc.stdint cimport uint32_t +from pyarrow.lib cimport to_shared +from libcpp.memory cimport shared_ptr, unique_ptr +from katana.cpp.libsupport.result cimport Result + +from enum import Enum + +# TODO(amp): Module needs documenting. + + +cdef extern from "katana/analytics/subgraph_extraction/subgraph_extraction.h" namespace "katana::analytics" nogil: + cppclass _SubGraphExtractionPlan "katana::analytics::SubGraphExtractionPlan" (_Plan): + enum Algorithm: + kNodeSet "katana::analytics::SubGraphExtractionPlan::kNodeSet" + + _SubGraphExtractionPlan.Algorithm algorithm() const + + # SubGraphExtractionPlan() + + @staticmethod + _SubGraphExtractionPlan NodeSet( + ) + + Result[unique_ptr[_PropertyGraph]] SubGraphExtraction(_PropertyGraph* pfg, const vector[uint32_t]& node_vec, _SubGraphExtractionPlan plan) + + +class _SubGraphExtractionPlanAlgorithm(Enum): + NodeSet = _SubGraphExtractionPlan.Algorithm.kNodeSet + + +cdef class SubGraphExtractionPlan(Plan): + cdef: + _SubGraphExtractionPlan underlying_ + + cdef _Plan* underlying(self) except NULL: + return &self.underlying_ + + Algorithm = _SubGraphExtractionPlanAlgorithm + + @staticmethod + cdef SubGraphExtractionPlan make(_SubGraphExtractionPlan u): + f = SubGraphExtractionPlan.__new__(SubGraphExtractionPlan) + f.underlying_ = u + return f + + @property + def algorithm(self) -> Algorithm: + return _SubGraphExtractionPlanAlgorithm(self.underlying_.algorithm()) + + @staticmethod + def node_set() -> SubGraphExtractionPlan: + return SubGraphExtractionPlan.make(_SubGraphExtractionPlan.NodeSet()) + + +cdef shared_ptr[_PropertyGraph] handle_result_property_graph(Result[unique_ptr[_PropertyGraph]] res) nogil except *: + if not res.has_value(): + with gil: + raise_error_code(res.error()) + return to_shared(res.value()) + + +def subgraph_extraction(PropertyGraph pg, node_vec, SubGraphExtractionPlan plan = SubGraphExtractionPlan()) -> PropertyGraph: + cdef vector[uint32_t] vec = [n for n in node_vec] + with nogil: + v = handle_result_property_graph(SubGraphExtraction(pg.underlying.get(), vec, plan.underlying_)) + return PropertyGraph.make(v) diff --git a/python/katana/property_graph.pxd.jinja b/python/katana/property_graph.pxd.jinja index e473383709..d525d0adc3 100644 --- a/python/katana/property_graph.pxd.jinja +++ b/python/katana/property_graph.pxd.jinja @@ -14,6 +14,8 @@ cdef class PropertyGraph: @staticmethod cdef uint64_t _property_name_to_id(object prop, Schema schema) except -1 + @staticmethod + cdef PropertyGraph make(shared_ptr[_PropertyGraph] u) cdef GraphTopology topology(PropertyGraph) cpdef uint64_t num_nodes(PropertyGraph) diff --git a/python/katana/property_graph.pyx.jinja b/python/katana/property_graph.pyx.jinja index f360ce7afb..eb481247fb 100644 --- a/python/katana/property_graph.pyx.jinja +++ b/python/katana/property_graph.pyx.jinja @@ -60,6 +60,12 @@ cdef class PropertyGraph: opts.edge_properties = &edge_props self.underlying = handle_result_value(_PropertyGraph.Make(bytes(path, "utf-8"), opts)) + @staticmethod + cdef PropertyGraph make(shared_ptr[_PropertyGraph] u): + f = PropertyGraph.__new__(PropertyGraph) + f.underlying = u + return f + def write(self, path, command_line) : """ Write the property graph out the specified path or URL (or the original path it was loaded from if path is nor provided). Provide lineage information in the form of a command line. diff --git a/python/katana_setup.py b/python/katana_setup.py index 01878e1cd7..ae46dcb7ad 100644 --- a/python/katana_setup.py +++ b/python/katana_setup.py @@ -102,7 +102,7 @@ def _get_build_extension(): def test_cython_module(name, cython_code, python_code="", extension_options=None): extension_options = extension_options or {} - require_python_module("cython") + require_python_module("Cython") import Cython.Build import Cython.Build.Inline diff --git a/python/tests/test_cpp_algos.py b/python/tests/test_cpp_algos.py index 09e39e7bc3..26a7567a2f 100644 --- a/python/tests/test_cpp_algos.py +++ b/python/tests/test_cpp_algos.py @@ -1,6 +1,7 @@ import numpy as np from pyarrow import Schema, table from pytest import approx, raises +import pytest from katana import GaloisError from katana.analytics import * @@ -291,3 +292,54 @@ def test_k_truss_fail(): with raises(GaloisError): k_truss(property_graph, 1, "output2") + + +def test_louvain_clustering(): + property_graph = PropertyGraph(get_input("propertygraphs/rmat10_symmetric")) + + louvain_clustering(property_graph, "value", "output") + + louvain_clustering_assert_valid(property_graph, "value", "output") + + stats = LouvainClusteringStatistics(property_graph, "value", "output") + + # TODO(amp): This is non-deterministic. Are there bounds on the results we could check? + # assert stats.n_clusters == 83 + # assert stats.n_non_trivial_clusters == 13 + # assert stats.largest_cluster_size == 297 + + +def test_local_clustering_coefficient(): + property_graph = PropertyGraph(get_input("propertygraphs/rmat15_cleaned_symmetric")) + + local_clustering_coefficient(property_graph, "output") + property_graph: PropertyGraph + out = property_graph.get_node_property("output") + + assert out[-1].as_py() == 0 + assert not np.any(np.isnan(out)) + + +def test_subgraph_extraction(): + property_graph = PropertyGraph(get_input("propertygraphs/rmat15_cleaned_symmetric")) + sort_all_edges_by_dest(property_graph) + nodes = [1, 3, 11, 120] + + expected_edges = [ + [ + nodes.index(property_graph.get_edge_dst(e)) + for e in property_graph.edges(i) + if property_graph.get_edge_dst(e) in nodes + ] + for i in nodes + ] + + pg = subgraph_extraction(property_graph, nodes) + + assert isinstance(pg, PropertyGraph) + assert len(pg) == len(nodes) + assert pg.num_edges() == 6 + + for i in range(len(expected_edges)): + assert len(pg.edges(i)) == len(expected_edges[i]) + assert [pg.get_edge_dst(e) for e in pg.edges(i)] == expected_edges[i]