From 1914b3ee2a18102d0a94cbdbbfeae1afa03edd17 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" <roger@atchoo.org> Date: Sat, 8 Jun 2024 09:41:08 +0100 Subject: [PATCH] Use separate trees for shared/normal subs Fixes Eclipse #217, #218. --- ChangeLog.txt | 4 +++ src/database.c | 13 +++++++--- src/loop.c | 3 ++- src/mosquitto_broker_internal.h | 3 ++- src/persist_write.c | 8 +++++- src/subs.c | 42 ++++++++++++++++++++++++-------- test/broker/data/REGRESSION.json | 31 +++++++++++++++++++++++ test/broker/msg_sequence_test.py | 4 +++ test/unit/subs_test.c | 7 +++--- 9 files changed, 95 insertions(+), 20 deletions(-) Index: mosquitto-2.0.18/src/database.c =================================================================== --- mosquitto-2.0.18.orig/src/database.c +++ mosquitto-2.0.18/src/database.c @@ -199,12 +199,16 @@ int db__open(struct mosquitto__config *c /* Initialize the hashtable */ db.clientid_index_hash = NULL; - db.subs = NULL; + db.normal_subs = NULL; + db.shared_subs = NULL; - subhier = sub__add_hier_entry(NULL, &db.subs, "", 0); + subhier = sub__add_hier_entry(NULL, &db.shared_subs, "", 0); if(!subhier) return MOSQ_ERR_NOMEM; - subhier = sub__add_hier_entry(NULL, &db.subs, "$SYS", (uint16_t)strlen("$SYS")); + subhier = sub__add_hier_entry(NULL, &db.normal_subs, "", 0); + if(!subhier) return MOSQ_ERR_NOMEM; + + subhier = sub__add_hier_entry(NULL, &db.normal_subs, "$SYS", (uint16_t)strlen("$SYS")); if(!subhier) return MOSQ_ERR_NOMEM; retain__init(); @@ -240,7 +244,8 @@ static void subhier_clean(struct mosquit int db__close(void) { - subhier_clean(&db.subs); + subhier_clean(&db.normal_subs); + subhier_clean(&db.shared_subs); retain__clean(&db.retains); db__msg_store_clean(); Index: mosquitto-2.0.18/src/loop.c =================================================================== --- mosquitto-2.0.18.orig/src/loop.c +++ mosquitto-2.0.18/src/loop.c @@ -241,7 +241,8 @@ int mosquitto_main_loop(struct mosquitto flag_reload = false; } if(flag_tree_print){ - sub__tree_print(db.subs, 0); + sub__tree_print(db.normal_subs, 0); + sub__tree_print(db.shared_subs, 0); flag_tree_print = false; #ifdef WITH_XTREPORT xtreport(); Index: mosquitto-2.0.18/src/mosquitto_broker_internal.h =================================================================== --- mosquitto-2.0.18.orig/src/mosquitto_broker_internal.h +++ mosquitto-2.0.18/src/mosquitto_broker_internal.h @@ -442,7 +442,8 @@ struct mosquitto_message_v5{ struct mosquitto_db{ dbid_t last_db_id; - struct mosquitto__subhier *subs; + struct mosquitto__subhier *normal_subs; + struct mosquitto__subhier *shared_subs; struct mosquitto__retainhier *retains; struct mosquitto *contexts_by_id; struct mosquitto *contexts_by_sock; Index: mosquitto-2.0.18/src/persist_write.c =================================================================== --- mosquitto-2.0.18.orig/src/persist_write.c +++ mosquitto-2.0.18/src/persist_write.c @@ -266,7 +266,13 @@ static int persist__subs_save_all(FILE * { struct mosquitto__subhier *subhier, *subhier_tmp; - HASH_ITER(hh, db.subs, subhier, subhier_tmp){ + HASH_ITER(hh, db.normal_subs, subhier, subhier_tmp){ + if(subhier->children){ + persist__subs_save(db_fptr, subhier->children, "", 0); + } + } + + HASH_ITER(hh, db.shared_subs, subhier, subhier_tmp){ if(subhier->children){ persist__subs_save(db_fptr, subhier->children, "", 0); } Index: mosquitto-2.0.18/src/subs.c =================================================================== --- mosquitto-2.0.18.orig/src/subs.c +++ mosquitto-2.0.18/src/subs.c @@ -595,16 +595,29 @@ int sub__add(struct mosquitto *context, mosquitto__free(topics); return MOSQ_ERR_INVAL; } - HASH_FIND(hh, db.subs, topics[0], topiclen, subhier); - if(!subhier){ - subhier = sub__add_hier_entry(NULL, &db.subs, topics[0], (uint16_t)topiclen); + + if(sharename){ + HASH_FIND(hh, db.shared_subs, topics[0], topiclen, subhier); if(!subhier){ - mosquitto__free(local_sub); - mosquitto__free(topics); - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; + subhier = sub__add_hier_entry(NULL, &db.shared_subs, topics[0], (uint16_t)topiclen); + if(!subhier){ + mosquitto__free(local_sub); + mosquitto__free(topics); + log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + return MOSQ_ERR_NOMEM; + } + } + }else{ + HASH_FIND(hh, db.normal_subs, topics[0], topiclen, subhier); + if(!subhier){ + subhier = sub__add_hier_entry(NULL, &db.normal_subs, topics[0], (uint16_t)topiclen); + if(!subhier){ + mosquitto__free(local_sub); + mosquitto__free(topics); + log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + return MOSQ_ERR_NOMEM; + } } - } rc = sub__add_context(context, sub, qos, identifier, options, subhier, topics, sharename); @@ -627,7 +640,11 @@ int sub__remove(struct mosquitto *contex rc = sub__topic_tokenise(sub, &local_sub, &topics, &sharename); if(rc) return rc; - HASH_FIND(hh, db.subs, topics[0], strlen(topics[0]), subhier); + if(sharename){ + HASH_FIND(hh, db.shared_subs, topics[0], strlen(topics[0]), subhier); + }else{ + HASH_FIND(hh, db.normal_subs, topics[0], strlen(topics[0]), subhier); + } if(subhier){ *reason = MQTT_RC_NO_SUBSCRIPTION_EXISTED; rc = sub__remove_recurse(context, subhier, topics, reason, sharename); @@ -656,7 +673,17 @@ int sub__messages_queue(const char *sour */ db__msg_store_ref_inc(*stored); - HASH_FIND(hh, db.subs, split_topics[0], strlen(split_topics[0]), subhier); + HASH_FIND(hh, db.normal_subs, split_topics[0], strlen(split_topics[0]), subhier); + if(subhier){ + rc = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored); + } + + HASH_FIND(hh, db.shared_subs, split_topics[0], strlen(split_topics[0]), subhier); + if(subhier){ + rc = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored); + } + + HASH_FIND(hh, db.shared_subs, split_topics[0], strlen(split_topics[0]), subhier); if(subhier){ rc = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored); }