Skip to content
Snippets Groups Projects
Commit 1fcc0eb7 authored by Your Name's avatar Your Name
Browse files

Finished multiupstream membership aggregation.

parent 2b093063
Branches
No related tags found
No related merge requests found
......@@ -29,6 +29,7 @@
struct source;
struct interface_infos;
class simple_routing_data;
class querier;
class test_status;
......@@ -75,6 +76,8 @@ private:
std::map<unsigned int, mem_source_state> m_data;
source_list<source> get_source_list(const std::set<addr_storage>& addr_set);
filter_source_state get_source_filter(rb_interface_direction if_direction, const std::string & input_if_name, const std::shared_ptr<interface>& iface, const addr_storage& gaddr, bool explicit_if_name);
mem_source_state get_mem_source_state(const std::unique_ptr<querier>& querier, const addr_storage & gaddr);
filter_source_state& convert_wildcard_filter(filter_source_state& rb_filter) const;
......
......@@ -22,7 +22,6 @@
#include "include/hamcast_logging.h"
#include "include/proxy/simple_membership_aggregation.hpp"
#include "include/proxy/interfaces.hpp"
#include "include/proxy/proxy_instance.hpp"
#include "include/proxy/simple_routing_data.hpp"
......@@ -296,19 +295,28 @@ source_list<source> simple_membership_aggregation::get_source_list(const std::se
return result;
}
std::map<unsigned int, mem_source_state> simple_membership_aggregation::get_merged_mem(const addr_storage& gaddr)
filter_source_state simple_membership_aggregation::get_source_filter(rb_interface_direction if_direction, const std::string& input_if_name, const std::shared_ptr<interface>& iface, const addr_storage& gaddr, bool explicit_if_name)
{
HC_LOG_TRACE("");
rb_filter_type tmp_ft = iface->get_filter_type(if_direction);
const source_list<source>& tmp_srcl = get_source_list(iface->get_saddr_set(ID_IN, input_if_name, gaddr, explicit_if_name));
return filter_source_state(tmp_ft, tmp_srcl);
}
//filters group memberships of the queries with the interface filter
auto get_filtered_mem = [&gaddr, this](const std::pair<const unsigned int, downstream_infos>& dstream, const std::string & input_if_name, bool explicite_if_name) {
rb_filter_type tmp_ft = dstream.second.m_interface->get_filter_type(ID_OUT);
const source_list<source>& tmp_srcl = get_source_list(dstream.second.m_interface->get_saddr_set(ID_OUT, input_if_name, gaddr, explicite_if_name));
auto filter = filter_source_state(tmp_ft, tmp_srcl);
mem_source_state simple_membership_aggregation::get_mem_source_state(const std::unique_ptr<querier>& querier, const addr_storage& gaddr)
{
HC_LOG_TRACE("");
return mem_source_state(querier->get_group_membership_infos(gaddr));
}
std::pair<mc_filter, source_list<source>> tmp_gmem_src = dstream.second.m_querier->get_group_membership_infos(gaddr);
auto gmem_src = mem_source_state(tmp_gmem_src);
std::map<unsigned int, mem_source_state> simple_membership_aggregation::get_merged_mem(const addr_storage& gaddr)
{
HC_LOG_TRACE("");
//filters group memberships of the queries with the interface filter
auto get_filtered_mem = [&gaddr, this](const std::pair<const unsigned int, downstream_infos>& dstream, const std::string & input_if_name, bool explicit_if_name) {
auto filter = get_source_filter(ID_OUT, input_if_name, dstream.second.m_interface, gaddr, explicit_if_name);
auto gmem_src = get_mem_source_state(dstream.second.m_querier, gaddr);
merge_memberships_filter(gmem_src, filter); //(p1)
return gmem_src;
};
......@@ -323,7 +331,7 @@ std::map<unsigned int, mem_source_state> simple_membership_aggregation::get_merg
for (auto & ustream : m_ii->m_upstreams) {
mem_source_state if_specific_merge;
std::string uif_name = interfaces::get_if_name(ustream.m_if_index);
std::string uif_name = ustream.m_interface->get_if_name();
for (auto & dstream : m_ii->m_downstreams) {
merge_group_memberships(if_specific_merge, get_filtered_mem(dstream, uif_name, true)); //(p2)
}
......@@ -334,12 +342,13 @@ std::map<unsigned int, mem_source_state> simple_membership_aggregation::get_merg
return downstreams_mem_merge;
}
void simple_membership_aggregation::process_upstream_in_first(const addr_storage& gaddr)
{
HC_LOG_TRACE("");
std::map<unsigned int, mem_source_state> downstreams_mem_merge = get_merged_mem(gaddr); //(p3) and result
mem_source_state reminder;
mem_source_state disjoiner;
for (auto & ustream : m_ii->m_upstreams) {
auto d_mem_merge_it = downstreams_mem_merge.find(ustream.m_if_index);
if (d_mem_merge_it == downstreams_mem_merge.end()) {
......@@ -350,11 +359,10 @@ void simple_membership_aggregation::process_upstream_in_first(const addr_storage
mem_source_state& d_mem_merge = d_mem_merge_it->second; //(p3)
rb_filter_type tmp_ft = ustream.m_interface->get_filter_type(ID_IN);
const source_list<source>& tmp_srcl = get_source_list(ustream.m_interface->get_saddr_set(ID_IN, interfaces::get_if_name(ustream.m_if_index), gaddr));
auto filter = filter_source_state(tmp_ft, tmp_srcl);
auto filter = get_source_filter(ID_IN, ustream.m_interface->get_if_name(), ustream.m_interface, gaddr, false);
merge_memberships_filter(d_mem_merge, filter);
merge_memberships_filter_reminder(reminder, d_mem_merge, filter);
disjoin_group_memberships(d_mem_merge, disjoiner);
merge_group_memberships(disjoiner, d_mem_merge);
}
m_data = std::move(downstreams_mem_merge);
......@@ -363,8 +371,54 @@ void simple_membership_aggregation::process_upstream_in_first(const addr_storage
void simple_membership_aggregation::process_upstream_in_mutex(const addr_storage& gaddr)
{
std::map<unsigned int, mem_source_state> downstreams_mem_merge = get_merged_mem(gaddr); //(p3) and result
std::map<unsigned int, mem_source_state> ustream_recvd_source_map;
mem_source_state dstream_recvd_sources;
const std::map<addr_storage, unsigned int>& tmp_if_map = m_routing_data->get_interface_map(gaddr);
//1.) convert m_routing_data->get_interface_map(gaddr) to up- and downstream_recvd_source;
for (auto & ustream : m_ii->m_upstreams) {
ustream_recvd_source_map.insert(std::make_pair(ustream.m_if_index, mem_source_state(INCLUDE_MODE, source_list<source> {})));
}
for (auto & e : tmp_if_map) {
auto ursm_it = ustream_recvd_source_map.find(e.second);
if (ursm_it == ustream_recvd_source_map.end()) {
dstream_recvd_sources.m_source_list.insert(e.first);
} else {
for (auto uit = ustream_recvd_source_map.begin(); uit != ustream_recvd_source_map.end(); ++uit) {
if (uit != ursm_it) {
uit->second.m_source_list.insert(e.first);
}
}
}
}
//2.) disjoin and merge with interface filters
for (auto & ustream : m_ii->m_upstreams) {
auto d_mem_merge_it = downstreams_mem_merge.find(ustream.m_if_index);
if (d_mem_merge_it == downstreams_mem_merge.end()) {
HC_LOG_ERROR("upstream interface index (" << ustream.m_if_index << ") in downstreams_mem_merge not found");
downstreams_mem_merge.erase(d_mem_merge_it);
continue;
}
auto ursm_it = ustream_recvd_source_map.find(ustream.m_if_index);
if (ursm_it == ustream_recvd_source_map.end()) {
HC_LOG_ERROR("upstream interface index (" << ustream.m_if_index << ") in ustream_recvd_source_map not found");
continue;
}
auto filter = get_source_filter(ID_IN, ustream.m_interface->get_if_name(), ustream.m_interface, gaddr, false);
mem_source_state& d_mem_merge = d_mem_merge_it->second;
merge_group_memberships(ursm_it->second, dstream_recvd_sources);
disjoin_group_memberships(d_mem_merge, ursm_it->second);
merge_memberships_filter(d_mem_merge, filter);
}
m_data = std::move(downstreams_mem_merge);
}
std::pair<mc_filter, const source_list<source>&> simple_membership_aggregation::get_group_memberships(unsigned int upstream_if_index) const
......
......@@ -168,8 +168,7 @@ const std::map<addr_storage, unsigned int>& simple_routing_data::get_interface_m
if(it != std::end(m_data)){
return it->second.m_if_map;
}else{
static std::map<addr_storage, unsigned int> result;
result.clear();
const static std::map<addr_storage, unsigned int> result;
return result;
}
}
......
......@@ -200,6 +200,18 @@ struct test_membership_aggregation {
UT_CHECK(check_disjoin_group_memberships(ex_a, in_b, mss(EXCLUDE_MODE, sl {s1,s2,s3})));
UT_CHECK(check_disjoin_group_memberships(ex_a, ex_b, mss(INCLUDE_MODE, sl {s2})));
UT_SUMMARY;
}
test_status test_mem_aggregation() {
HC_LOG_TRACE("");
UT_INITIALISATION;
UT_SUMMARY;
}
};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment