From 1914b3ee2a18102d0a94cbdbbfeae1afa03edd17 Mon Sep 17 00:00:00 2001
From: "Roger A. Light" <roger@atchoo.org>
Date: Sat, 8 Jun 2024 09:41:08 +0100
Subject: [PATCH] Use separate trees for shared/normal subs

Fixes Eclipse #217, #218.
---
 ChangeLog.txt                    |  4 +++
 src/database.c                   | 13 +++++++---
 src/loop.c                       |  3 ++-
 src/mosquitto_broker_internal.h  |  3 ++-
 src/persist_write.c              |  8 +++++-
 src/subs.c                       | 42 ++++++++++++++++++++++++--------
 test/broker/data/REGRESSION.json | 31 +++++++++++++++++++++++
 test/broker/msg_sequence_test.py |  4 +++
 test/unit/subs_test.c            |  7 +++---
 9 files changed, 95 insertions(+), 20 deletions(-)

Index: mosquitto-2.0.18/src/database.c
===================================================================
--- mosquitto-2.0.18.orig/src/database.c
+++ mosquitto-2.0.18/src/database.c
@@ -199,12 +199,16 @@ int db__open(struct mosquitto__config *c
 	/* Initialize the hashtable */
 	db.clientid_index_hash = NULL;
 
-	db.subs = NULL;
+	db.normal_subs = NULL;
+	db.shared_subs = NULL;
 
-	subhier = sub__add_hier_entry(NULL, &db.subs, "", 0);
+	subhier = sub__add_hier_entry(NULL, &db.shared_subs, "", 0);
 	if(!subhier) return MOSQ_ERR_NOMEM;
 
-	subhier = sub__add_hier_entry(NULL, &db.subs, "$SYS", (uint16_t)strlen("$SYS"));
+	subhier = sub__add_hier_entry(NULL, &db.normal_subs, "", 0);
+	if(!subhier) return MOSQ_ERR_NOMEM;
+
+	subhier = sub__add_hier_entry(NULL, &db.normal_subs, "$SYS", (uint16_t)strlen("$SYS"));
 	if(!subhier) return MOSQ_ERR_NOMEM;
 
 	retain__init();
@@ -240,7 +244,8 @@ static void subhier_clean(struct mosquit
 
 int db__close(void)
 {
-	subhier_clean(&db.subs);
+	subhier_clean(&db.normal_subs);
+	subhier_clean(&db.shared_subs);
 	retain__clean(&db.retains);
 	db__msg_store_clean();
 
Index: mosquitto-2.0.18/src/loop.c
===================================================================
--- mosquitto-2.0.18.orig/src/loop.c
+++ mosquitto-2.0.18/src/loop.c
@@ -241,7 +241,8 @@ int mosquitto_main_loop(struct mosquitto
 			flag_reload = false;
 		}
 		if(flag_tree_print){
-			sub__tree_print(db.subs, 0);
+			sub__tree_print(db.normal_subs, 0);
+			sub__tree_print(db.shared_subs, 0);
 			flag_tree_print = false;
 #ifdef WITH_XTREPORT
 			xtreport();
Index: mosquitto-2.0.18/src/mosquitto_broker_internal.h
===================================================================
--- mosquitto-2.0.18.orig/src/mosquitto_broker_internal.h
+++ mosquitto-2.0.18/src/mosquitto_broker_internal.h
@@ -442,7 +442,8 @@ struct mosquitto_message_v5{
 
 struct mosquitto_db{
 	dbid_t last_db_id;
-	struct mosquitto__subhier *subs;
+	struct mosquitto__subhier *normal_subs;
+	struct mosquitto__subhier *shared_subs;
 	struct mosquitto__retainhier *retains;
 	struct mosquitto *contexts_by_id;
 	struct mosquitto *contexts_by_sock;
Index: mosquitto-2.0.18/src/persist_write.c
===================================================================
--- mosquitto-2.0.18.orig/src/persist_write.c
+++ mosquitto-2.0.18/src/persist_write.c
@@ -266,7 +266,13 @@ static int persist__subs_save_all(FILE *
 {
 	struct mosquitto__subhier *subhier, *subhier_tmp;
 
-	HASH_ITER(hh, db.subs, subhier, subhier_tmp){
+	HASH_ITER(hh, db.normal_subs, subhier, subhier_tmp){
+		if(subhier->children){
+			persist__subs_save(db_fptr, subhier->children, "", 0);
+		}
+	}
+
+	HASH_ITER(hh, db.shared_subs, subhier, subhier_tmp){
 		if(subhier->children){
 			persist__subs_save(db_fptr, subhier->children, "", 0);
 		}
Index: mosquitto-2.0.18/src/subs.c
===================================================================
--- mosquitto-2.0.18.orig/src/subs.c
+++ mosquitto-2.0.18/src/subs.c
@@ -595,16 +595,29 @@ int sub__add(struct mosquitto *context,
 		mosquitto__free(topics);
 		return MOSQ_ERR_INVAL;
 	}
-	HASH_FIND(hh, db.subs, topics[0], topiclen, subhier);
-	if(!subhier){
-		subhier = sub__add_hier_entry(NULL, &db.subs, topics[0], (uint16_t)topiclen);
+
+	if(sharename){
+		HASH_FIND(hh, db.shared_subs, topics[0], topiclen, subhier);
 		if(!subhier){
-			mosquitto__free(local_sub);
-			mosquitto__free(topics);
-			log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
-			return MOSQ_ERR_NOMEM;
+			subhier = sub__add_hier_entry(NULL, &db.shared_subs, topics[0], (uint16_t)topiclen);
+			if(!subhier){
+				mosquitto__free(local_sub);
+				mosquitto__free(topics);
+				log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
+				return MOSQ_ERR_NOMEM;
+			}
+		}
+	}else{
+		HASH_FIND(hh, db.normal_subs, topics[0], topiclen, subhier);
+		if(!subhier){
+			subhier = sub__add_hier_entry(NULL, &db.normal_subs, topics[0], (uint16_t)topiclen);
+			if(!subhier){
+				mosquitto__free(local_sub);
+				mosquitto__free(topics);
+				log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
+				return MOSQ_ERR_NOMEM;
+			}
 		}
-
 	}
 	rc = sub__add_context(context, sub, qos, identifier, options, subhier, topics, sharename);
 
@@ -627,7 +640,11 @@ int sub__remove(struct mosquitto *contex
 	rc = sub__topic_tokenise(sub, &local_sub, &topics, &sharename);
 	if(rc) return rc;
 
-	HASH_FIND(hh, db.subs, topics[0], strlen(topics[0]), subhier);
+	if(sharename){
+		HASH_FIND(hh, db.shared_subs, topics[0], strlen(topics[0]), subhier);
+	}else{
+		HASH_FIND(hh, db.normal_subs, topics[0], strlen(topics[0]), subhier);
+	}
 	if(subhier){
 		*reason = MQTT_RC_NO_SUBSCRIPTION_EXISTED;
 		rc = sub__remove_recurse(context, subhier, topics, reason, sharename);
@@ -656,7 +673,17 @@ int sub__messages_queue(const char *sour
 	*/
 	db__msg_store_ref_inc(*stored);
 
-	HASH_FIND(hh, db.subs, split_topics[0], strlen(split_topics[0]), subhier);
+	HASH_FIND(hh, db.normal_subs, split_topics[0], strlen(split_topics[0]), subhier);
+	if(subhier){
+		rc = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored);
+	}
+
+	HASH_FIND(hh, db.shared_subs, split_topics[0], strlen(split_topics[0]), subhier);
+	if(subhier){
+		rc = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored);
+	}
+
+	HASH_FIND(hh, db.shared_subs, split_topics[0], strlen(split_topics[0]), subhier);
 	if(subhier){
 		rc = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored);
 	}