/** * Copyright 2023 AntGroup CO., Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ #include "gtest/gtest.h" #include "./ut_utils.h" #include "lgraph/olap_on_db.h" #include "fma-common/utils.h" #include "import/import_v3.h" using namespace lgraph_api; using namespace lgraph_api::olap; class TestOlapOnDB : public TuGraphTest {}; template void CreateCsvFiles(const T& data) { fma_common::OutputFmaStream stream; for (auto& kv : data) { const std::string& file_name = kv.first; const std::string& data = kv.second; if (file_name == "") continue; stream.Open(file_name); stream.Write(data.data(), data.size()); stream.Close(); UT_LOG() << " " << file_name << " created"; } } template void ClearCsvFiles(const T& data) { fma_common::OutputFmaStream stream; for (auto& kv : data) { const std::string& file_name = kv; fma_common::file_system::RemoveDir(file_name.c_str()); UT_LOG() << " " << file_name << " deleted"; } } static void WriteOlapDbFiles() { static const std::vector> data_import = { {"test_olap_on_db.conf", R"( { "schema": [ { "label" : "node", "type" : "VERTEX", "primary" : "id", "properties" : [ {"name" : "id", "type":"INT32"} ] }, { "label" : "edge", "type" : "EDGE", "properties" : [ {"name" : "weight", "type" : "DOUBLE"} ] } ], "files" : [ { "path" : "test_vertices.csv", "format" : "CSV", "label" : "node", "header" : 0, "columns" : ["id"] }, { "path" : "test_weighted.csv", "format" : "CSV", "label" : "edge", "header" : 0, "SRC_ID" : "node", "DST_ID" : "node", "columns" : ["SRC_ID","DST_ID", "weight"] } ] } )"}, {"test_vertices.csv", R"(0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 )"}, {"test_weighted.csv", R"(0,1,1 0,2,2 0,3,3 0,4,4 0,5,5 1,2,2 1,3,3 1,5,5 2,4,4 2,6,6 2,8,8 3,4,4 4,5,5 4,17,17 5,6,6 5,10,10 5,19,19 7,11,11 7,13,13 8,10,10 9,12,12 9,14,14 9,20,20 10,11,11 10,13,13 11,13,13 12,15,15 12,18,18 13,15,15 13,20,20 14,15,15 15,17,17 15,19,19 16,17,17 17,18,18 )"}}; CreateCsvFiles(data_import); } /** * \brief Default Parser for weighted edges for graph. * * \return Edge is converted into graph or not. */ template std::function vertex_extract = [] (VertexIterator &vit, VertexData &vertex_data) { vertex_data = 2; return true; }; std::function vertex_filter = [] (VertexIterator &vid) -> bool{ if (vid.GetId() == 0) { return false; } return true; }; TEST_F(TestOlapOnDB, OlapOnDB) { // configure test data WriteOlapDbFiles(); std::string db_path = "./testdb_olap"; lgraph::import_v3::Importer::Config config; config.config_file = "./test_olap_on_db.conf"; config.db_dir = db_path; config.delete_if_exists = true; config.graph = "default"; config.parse_block_threads = 1; config.parse_file_threads = 1; config.generate_sst_threads = 1; lgraph::import_v3::Importer importer(config); importer.DoImportOffline(); Galaxy g(db_path); g.SetCurrentUser("admin", "73@TuGraph"); GraphDB db = g.OpenGraph("default"); auto txn = db.CreateReadTxn(); // test ConstructWithVid() OlapOnDB test_db_one(db, txn, SNAPSHOT_PARALLEL | SNAPSHOT_UNDIRECTED, nullptr, edge_convert_default); UT_EXPECT_EQ(test_db_one.OutDegree(0), 5); UT_EXPECT_EQ(test_db_one.InDegree(0), 5); UT_EXPECT_EQ(test_db_one.OutDegree(6), 2); UT_EXPECT_EQ(test_db_one.InDegree(6), 2); UT_EXPECT_EQ(test_db_one.OutDegree(17), 4); UT_EXPECT_EQ(test_db_one.InDegree(17), 4); UT_EXPECT_EQ(test_db_one.OutDegree(20), 2); UT_EXPECT_EQ(test_db_one.InDegree(20), 2); auto active = test_db_one.AllocVertexSubset(); active.Fill(); UT_EXPECT_EQ(active.Size(), 21); auto label = test_db_one.AllocVertexArray(); UT_EXPECT_EQ(test_db_one.NumVertices(), 21); UT_EXPECT_EQ(test_db_one.NumEdges(), 70); test_db_one.ProcessVertexActive( [&](size_t vi) { label[vi] = -1; return 1; }, active); UT_EXPECT_EQ(label[0], -1); for (auto& edge : test_db_one.OutEdges(0)) { UT_EXPECT_EQ(edge.edge_data, 1.0); } auto vertex_lists = test_db_one.ExtractVertexData(vertex_extract); UT_EXPECT_EQ(vertex_lists[0], 2); OlapOnDB unparallel_one(db, txn, SNAPSHOT_UNDIRECTED); UT_EXPECT_EQ(unparallel_one.OutDegree(0), 5); UT_EXPECT_EQ(unparallel_one.InDegree(0), 5); UT_EXPECT_EQ(unparallel_one.OutDegree(6), 2); UT_EXPECT_EQ(unparallel_one.InDegree(6), 2); UT_EXPECT_EQ(unparallel_one.OutDegree(17), 4); UT_EXPECT_EQ(unparallel_one.InDegree(17), 4); UT_EXPECT_EQ(unparallel_one.OutDegree(20), 2); UT_EXPECT_EQ(unparallel_one.InDegree(20), 2); OlapOnDB unparallel_db_two(db, txn, SNAPSHOT_UNDIRECTED, nullptr, edge_convert_weight); UT_EXPECT_EQ(unparallel_db_two.OutDegree(0), 5); UT_EXPECT_EQ(unparallel_db_two.InDegree(0), 5); UT_EXPECT_EQ(unparallel_db_two.OutDegree(6), 2); UT_EXPECT_EQ(unparallel_db_two.InDegree(6), 2); UT_EXPECT_EQ(unparallel_db_two.OutDegree(17), 4); UT_EXPECT_EQ(unparallel_db_two.InDegree(17), 4); UT_EXPECT_EQ(unparallel_db_two.OutDegree(20), 2); UT_EXPECT_EQ(unparallel_db_two.InDegree(20), 2); for (auto& edge : unparallel_db_two.OutEdges(0)) { UT_EXPECT_EQ(edge.edge_data, edge.neighbour); } OlapOnDB parallel_one(db, txn, SNAPSHOT_PARALLEL, nullptr, edge_convert_weight); UT_EXPECT_EQ(parallel_one.OutDegree(0), 5); UT_EXPECT_EQ(parallel_one.InDegree(0), 0); UT_EXPECT_EQ(parallel_one.OutDegree(6), 0); UT_EXPECT_EQ(parallel_one.InDegree(6), 2); UT_EXPECT_EQ(parallel_one.OutDegree(17), 1); UT_EXPECT_EQ(parallel_one.InDegree(17), 3); UT_EXPECT_EQ(parallel_one.OutDegree(20), 0); UT_EXPECT_EQ(parallel_one.InDegree(20), 2); for (auto& edge : parallel_one.OutEdges(10)) { UT_EXPECT_EQ(edge.edge_data, edge.neighbour); } OlapOnDB parallel_two(db, txn, SNAPSHOT_PARALLEL, nullptr, edge_convert_default); UT_EXPECT_EQ(parallel_two.OutDegree(0), 5); UT_EXPECT_EQ(parallel_two.InDegree(0), 0); UT_EXPECT_EQ(parallel_two.OutDegree(6), 0); UT_EXPECT_EQ(parallel_two.InDegree(6), 2); UT_EXPECT_EQ(parallel_two.OutDegree(17), 1); UT_EXPECT_EQ(parallel_two.InDegree(17), 3); UT_EXPECT_EQ(parallel_two.OutDegree(20), 0); UT_EXPECT_EQ(parallel_two.InDegree(20), 2); for (auto& edge : parallel_two.OutEdges(10)) { UT_EXPECT_EQ(edge.edge_data, 1.0); } // test Construct() OlapOnDB test_db_two(db, txn, SNAPSHOT_PARALLEL | SNAPSHOT_IDMAPPING); UT_EXPECT_EQ(test_db_two.OutDegree(0), 5); UT_EXPECT_EQ(test_db_two.InDegree(0), 0); UT_EXPECT_EQ(test_db_two.OutDegree(6), 0); UT_EXPECT_EQ(test_db_two.InDegree(6), 2); UT_EXPECT_EQ(test_db_two.OutDegree(17), 1); UT_EXPECT_EQ(test_db_two.InDegree(17), 3); UT_EXPECT_EQ(test_db_two.OutDegree(20), 0); UT_EXPECT_EQ(test_db_two.InDegree(20), 2); UT_EXPECT_EQ(test_db_two.NumVertices(), 21); UT_EXPECT_EQ(test_db_two.NumEdges(), 35); UT_EXPECT_EQ(test_db_two.MappedVid(0), 0); UT_EXPECT_EQ(test_db_two.OriginalVid(0), 0); UT_EXPECT_EQ(test_db_two.MappedVid(test_db_two.OriginalVid(4)), 4); OlapOnDB unparallel_two(db, txn, SNAPSHOT_UNDIRECTED, vertex_filter, edge_convert_weight); UT_EXPECT_EQ(unparallel_two.OutDegree(0), 3); UT_EXPECT_EQ(unparallel_two.InDegree(0), 3); UT_EXPECT_EQ(unparallel_two.OutDegree(6), 2); UT_EXPECT_EQ(unparallel_two.InDegree(6), 2); UT_EXPECT_EQ(unparallel_two.OutDegree(13), 2); UT_EXPECT_EQ(unparallel_two.InDegree(13), 2); UT_EXPECT_EQ(unparallel_two.OutDegree(20), 0); UT_EXPECT_EQ(unparallel_two.InDegree(20), 0); UT_EXPECT_EQ(unparallel_two.NumVertices(), 20); UT_EXPECT_EQ(unparallel_two.NumEdges(), 60); UT_EXPECT_EQ(unparallel_two.MappedVid(unparallel_two.OriginalVid(17)), 17); // test ConstructWithDegree() OlapOnDB test_db_three(db, txn, SNAPSHOT_PARALLEL | SNAPSHOT_UNDIRECTED); UT_EXPECT_EQ(test_db_three.OutDegree(0), 5); UT_EXPECT_EQ(test_db_three.InDegree(0), 5); UT_EXPECT_EQ(test_db_three.OutDegree(6), 2); UT_EXPECT_EQ(test_db_three.InDegree(6), 2); UT_EXPECT_EQ(test_db_three.OutDegree(13), 5); UT_EXPECT_EQ(test_db_three.InDegree(13), 5); UT_EXPECT_EQ(test_db_three.OutDegree(20), 2); UT_EXPECT_EQ(test_db_three.InDegree(20), 2); UT_EXPECT_EQ(test_db_three.NumVertices(), 21); UT_EXPECT_EQ(test_db_three.NumEdges(), 70); UT_EXPECT_EQ(test_db_two.MappedVid(3), 3); UT_EXPECT_EQ(test_db_two.OriginalVid(7), 7); OlapOnDB directed_three(db, txn, SNAPSHOT_PARALLEL); UT_EXPECT_EQ(directed_three.OutDegree(0), 5); UT_EXPECT_EQ(directed_three.InDegree(0), 0); UT_EXPECT_EQ(directed_three.OutDegree(6), 0); UT_EXPECT_EQ(directed_three.InDegree(6), 2); UT_EXPECT_EQ(directed_three.OutDegree(13), 2); UT_EXPECT_EQ(directed_three.InDegree(13), 3); UT_EXPECT_EQ(directed_three.OutDegree(20), 0); UT_EXPECT_EQ(directed_three.InDegree(20), 2); UT_EXPECT_EQ(directed_three.NumVertices(), 21); UT_EXPECT_EQ(directed_three.NumEdges(), 35); UT_EXPECT_EQ(directed_three.MappedVid(1), 1); UT_EXPECT_EQ(directed_three.OriginalVid(20), 20); txn.Commit(); // test ExtractVertexData auto write_txn = db.CreateWriteTxn(); { OlapOnDB test_db_four(db, write_txn, SNAPSHOT_PARALLEL); UT_EXPECT_EQ(test_db_four.NumEdges(), 35); auto vertex_list = test_db_four.ExtractVertexData(vertex_extract); UT_EXPECT_EQ(vertex_list[0], 2); UT_EXPECT_EQ(vertex_list.Size(), 21); UT_EXPECT_EQ(test_db_four.InDegree(0), 0); } { OlapOnDB parallel_four(db, write_txn, SNAPSHOT_PARALLEL | SNAPSHOT_IDMAPPING); auto extract_list = parallel_four.ExtractVertexData(vertex_extract); UT_EXPECT_EQ(extract_list[0], 2); UT_EXPECT_EQ(extract_list.Size(), 21); } write_txn.Commit(); std::vector del_filename = {"test_olap_on_db.conf", "test_vertices.csv", "test_weighted.csv"}; ClearCsvFiles(del_filename); }