From 284b459da5bb2a7cb45e031a3d462e00d83a7b6d Mon Sep 17 00:00:00 2001
From: Anjan Chanda <anjan.chanda@iopsys.eu>
Date: Sun, 13 Dec 2020 16:38:23 +0100
Subject: [PATCH] cmdu_defrag() to reassemble rx-cmdu fragments

---
 src/cmdu.c            | 336 +++++++++++++++++++++++++++++++++++++++++-
 src/cmdu.h            |  65 ++++++++
 src/cmdu_ackq.h       |   4 +
 src/cmduqueue.c       |   2 +
 src/i1905.c           |  21 ++-
 src/i1905.h           |   1 +
 src/i1905_dm.c        |   1 +
 src/i1905_extension.c |   2 +
 src/util.c            |  22 ++-
 src/util.h            |   2 +
 10 files changed, 452 insertions(+), 4 deletions(-)

diff --git a/src/cmdu.c b/src/cmdu.c
index 70b93d35..a3b5db0a 100644
--- a/src/cmdu.c
+++ b/src/cmdu.c
@@ -6,6 +6,7 @@
 #include <stdbool.h>
 #include <string.h>
 
+#include "timer.h"
 #include "util.h"
 #include "bufutil.h"
 //#include "1905_defs.h"
@@ -421,7 +422,6 @@ struct cmdu_buff *cmdu_fragment(uint8_t *data, int datalen)
 	uint16_t plen;
 	int ppos = 0;
 	int len = 0;
-	int i;
 
 
 	/* tlv_printf(data, datalen); */
@@ -546,3 +546,337 @@ struct cmdu_buff *cmdu_fragment(uint8_t *data, int datalen)
 
 	return cmdu;
 }
+
+
+static struct cmdu_frag_rx *cmdufrag_lookup(void *rxfq, uint16_t type,
+					    uint16_t mid, uint8_t fid,
+					    uint8_t *origin)
+{
+	struct cmdufrag_queue *q = (struct cmdufrag_queue *)rxfq;
+	int idx = cmdu_frag_hash(type, mid, origin);
+	struct cmdu_frag_rx *frag = NULL;
+
+
+	pthread_mutex_lock(&q->qlock);
+	hlist_for_each_entry(frag, &q->table[idx], hlist) {
+		if (frag->type == type && frag->mid == mid &&
+		    !memcmp(frag->origin, origin, 6)) {
+			if (frag->fid == fid) {
+				pthread_mutex_unlock(&q->qlock);
+				return frag;
+			}
+		}
+	}
+	pthread_mutex_unlock(&q->qlock);
+
+	return NULL;
+}
+
+static void cmdu_fragqueue_delete_chain(struct cmdu_frag_rx *frag)
+{
+	struct cmdu_frag_rx *ee = NULL, *e;
+
+	if (!frag)
+		return;
+
+	for (e = frag; e; e = e->next) {
+		if (ee) {
+			printf("freeing ee ..fid = %d ....\n", ee->fid);
+			free(ee);
+		}
+		ee = e;
+	}
+
+	if (ee) {
+		printf("freeing ee ..fid = %d ...\n", ee->fid);
+		free(ee);
+	}
+}
+
+static void cmdu_fragqueue_free_entry(struct cmdu_frag_rx *frag)
+{
+	if (frag)
+		free(frag);
+}
+
+static struct cmdu_frag_rx *cmdu_create_rxfrag(struct cmdu_buff *cmdu,
+					       uint32_t timeout)
+{
+	struct cmdu_frag_rx *frag;
+	struct timeval tsp = { 0 };
+
+	frag = calloc(1, sizeof(*frag));
+	if (!frag) {
+		fprintf(stderr, "calloc failed. err = NOMEM\n");
+		return NULL;
+	}
+
+	frag->cmdu = cmdu;
+	frag->type = cmdu_get_type(cmdu);
+	frag->mid = cmdu_get_mid(cmdu);
+	frag->fid = cmdu_get_fid(cmdu);
+	frag->last_frag = IS_CMDU_LAST_FRAGMENT(cmdu->cdata) ? true : false;
+	memcpy(frag->origin, cmdu_get_origin(cmdu), 6);
+	frag->next = NULL;
+	frag->last = frag;
+	frag->tlen = 0;
+	frag->numfrags = 1;
+	gettimeofday(&tsp, NULL);
+	frag->ageing_time = timeout;
+	timeradd_msecs(&tsp, frag->ageing_time, &frag->ageing_tmo);
+	frag->ageing_tmo.tv_usec = (frag->ageing_tmo.tv_usec / 1000) * 1000;
+	fprintf(stderr,
+		"CREATE frag: type = 0x%04x  mid = %hu (%d) origin = " MACFMT " timeout = { %u (%lu:%lu) }\n",
+		cmdu_get_type(cmdu),
+		cmdu_get_mid(cmdu),
+		cmdu_get_fid(cmdu),
+		MAC2STR(cmdu_get_origin(cmdu)),
+		frag->ageing_time,
+		frag->ageing_tmo.tv_sec,
+		frag->ageing_tmo.tv_usec / 1000);
+
+	return frag;
+}
+
+int cmdufrag_queue_enqueue(void *rxfq, struct cmdu_buff *cmdu, uint32_t timeout)
+{
+	struct cmdufrag_queue *q = (struct cmdufrag_queue *)rxfq;
+	struct cmdu_frag_rx *frag = NULL;
+	uint8_t *origin;
+	uint16_t type;
+	uint16_t mid;
+	uint8_t fid;
+
+
+	type = cmdu_get_type(cmdu);
+	mid = cmdu_get_mid(cmdu);
+	fid = cmdu_get_fid(cmdu);
+	origin = cmdu_get_origin(cmdu);
+
+	frag = cmdufrag_lookup(rxfq, type, mid, fid, origin);
+	if (frag) {
+		fprintf(stderr,
+			"Duplicate: type = 0x%04x mid = %hu fid = %d origin = " MACFMT "\n",
+			type, mid, fid, MAC2STR(origin));
+
+		return -1;
+	}
+
+	frag = cmdu_create_rxfrag(cmdu, timeout);
+	if (frag) {
+		int idx = cmdu_frag_hash(type, mid, origin);
+
+		pthread_mutex_lock(&q->qlock);
+		q->pending_cnt++;
+		pthread_mutex_unlock(&q->qlock);
+
+		fprintf(stderr,
+			"ENQ: type = 0x%04x  mid = %hu fid = %d  origin = " MACFMT "\n",
+			type, mid, fid, MAC2STR(origin));
+
+		if (fid > 0) {
+			struct cmdu_frag_rx *firstfrag = NULL;
+
+			firstfrag = cmdufrag_lookup(rxfq, type, mid, 0, origin);
+			if (!firstfrag) {
+				fprintf(stderr,
+					"First fragment missing for mid = %hu\n", mid);
+				cmdu_fragqueue_free_entry(frag);
+				return -1;
+			}
+
+			pthread_mutex_lock(&q->qlock);
+			firstfrag->last->next = frag;
+			firstfrag->last = frag;
+			firstfrag->tlen += frag->cmdu->datalen;
+			firstfrag->numfrags++;
+
+			/* do not ageout fragments other than the first.
+			 * If the first one ages-out, then all the related
+			 * fragments will be cleaned up.
+			 */
+			pthread_mutex_unlock(&q->qlock);
+			return 0;
+		}
+
+		pthread_mutex_lock(&q->qlock);
+		hlist_add_head(&frag->hlist, &q->table[idx]);
+
+		if (timer_pending(&q->ageing_timer)) {
+			if (timercmp(&q->next_tmo, &frag->ageing_tmo, >)) {
+				q->next_tmo.tv_sec = frag->ageing_tmo.tv_sec;
+				q->next_tmo.tv_usec = frag->ageing_tmo.tv_usec;
+
+				timer_set(&q->ageing_timer, frag->ageing_time);
+			}
+		} else {
+			q->next_tmo.tv_sec = frag->ageing_tmo.tv_sec;
+			q->next_tmo.tv_usec = frag->ageing_tmo.tv_usec;
+			timer_set(&q->ageing_timer, frag->ageing_time);
+		}
+
+		pthread_mutex_unlock(&q->qlock);
+		return 0;
+	}
+
+	return -1;
+}
+
+static void cmdu_frag_ageout(struct cmdufrag_queue *st, struct hlist_head *head,
+			     struct timeval *min_next_tmo)
+{
+	struct cmdu_frag_rx *frag;
+	struct hlist_node *tmp;
+	struct timeval now = { 0 };
+
+
+	gettimeofday(&now, NULL);
+	now.tv_usec = (now.tv_usec / 1000) * 1000;
+
+	hlist_for_each_entry_safe(frag, tmp, head, hlist) {
+		if (timercmp(&frag->ageing_tmo, &now, <=)) {
+			st->pending_cnt--;
+			hlist_del(&frag->hlist, head);
+			fprintf(stderr, "Fragments from " MACFMT " aged out.\n",
+				MAC2STR(frag->origin));
+			cmdu_fragqueue_delete_chain(frag);
+		} else {
+			struct timeval new_next_tmo = { 0 };
+
+			timersub(&frag->ageing_tmo, &now, &new_next_tmo);
+			if (!timercmp(min_next_tmo, &new_next_tmo, <)) {
+				min_next_tmo->tv_sec = new_next_tmo.tv_sec;
+				min_next_tmo->tv_usec = new_next_tmo.tv_usec;
+			}
+		}
+	}
+}
+
+static void cmdu_fragqueue_ageing_timer_run(atimer_t *t)
+{
+	struct cmdufrag_queue *st = container_of(t, struct cmdufrag_queue, ageing_timer);
+	struct timeval min_next_tmo = { .tv_sec = 999999 };
+	int remain_cnt = 0;
+	struct timeval nu;
+	int i;
+
+	gettimeofday(&nu, NULL);
+
+	fprintf(stderr, "\n Timer now = %lu.%lu   cnt = %d\n",
+		nu.tv_sec, nu.tv_usec, st->pending_cnt);
+
+	for (i = 0; i < NUM_FRAGMENTS; i++) {
+		if (hlist_empty(&st->table[i]))
+			continue;
+
+		cmdu_frag_ageout(st, &st->table[i], &min_next_tmo);
+	}
+
+	remain_cnt = st->pending_cnt;
+	st->next_tmo.tv_sec = min_next_tmo.tv_sec;
+	st->next_tmo.tv_usec = min_next_tmo.tv_usec;
+
+	if (remain_cnt) {
+		uint32_t tmo_msecs = st->next_tmo.tv_sec * 1000 +
+					st->next_tmo.tv_usec / 1000;
+
+		if (tmo_msecs > 0)
+			timer_set(&st->ageing_timer, tmo_msecs);
+	}
+}
+
+struct cmdu_buff *cmdu_defrag(void *rxfq, struct cmdu_buff *lastfrag)
+{
+	struct cmdufrag_queue *q = (struct cmdufrag_queue *)rxfq;
+	struct cmdu_frag_rx *ee = NULL, *e;
+	struct cmdu_frag_rx *frag = NULL;
+	struct cmdu_buff *cmdu = NULL;
+	struct hlist_node *tmp;
+	uint32_t fidsum = 0;
+	bool is_lastfrag;
+	uint8_t *origin;
+	uint16_t type;
+	uint16_t mid;
+	uint8_t fid;
+	int idx;
+
+
+	if (!lastfrag || !lastfrag->cdata)
+		return NULL;
+
+	type = cmdu_get_type(lastfrag);
+	mid = cmdu_get_mid(lastfrag);
+	fid = cmdu_get_fid(lastfrag);
+	is_lastfrag = IS_CMDU_LAST_FRAGMENT(lastfrag->cdata);
+	origin = cmdu_get_origin(lastfrag);
+
+	if (!is_lastfrag)
+		return NULL;
+
+	idx = cmdu_frag_hash(type, mid, origin);
+
+	pthread_mutex_lock(&q->qlock);
+	hlist_for_each_entry_safe(frag, tmp, &q->table[idx], hlist) {
+		if (frag->type == type && frag->mid == mid &&
+		    !memcmp(frag->origin, origin, 6)) {
+
+			fprintf(stderr, "DEFRAG: type: %hu mid = %hu  fid = %d\n",
+				frag->type, frag->mid, frag->fid);
+
+			hlist_del(&frag->hlist, &q->table[idx]);
+			q->pending_cnt -= frag->numfrags;
+			break;
+		}
+	}
+	pthread_mutex_unlock(&q->qlock);
+
+	/* alloc unfragmented cmdu */
+	cmdu = cmdu_alloc(frag->tlen);
+	if (!cmdu) {
+		printf("-ENOMEM\n");
+		return NULL;
+	}
+
+	cmdu->cdata = (struct cmdu_linear *)cmdu->head;
+	cmdu->data = (uint8_t *)(cmdu->cdata + 1);
+	buf_put_be16((uint8_t *)&cmdu->cdata->hdr.type, type);
+	buf_put_be16((uint8_t *)&cmdu->cdata->hdr.mid, mid);
+	memcpy(cmdu->origin, origin, 6);
+
+	for (e = frag; e; e = e->next) {
+		if (ee) {
+			fprintf(stderr, "freeing fid = %d\n", ee->fid);
+			free(ee);
+		}
+		fidsum += e->fid;
+		memcpy(cmdu->tail, e->cmdu->data, e->cmdu->datalen);
+		cmdu->datalen += e->cmdu->datalen;
+		cmdu->tail += e->cmdu->datalen;
+		ee = e;
+	}
+
+	if (ee) {
+		fprintf(stderr, "freeing fid = %d\n", ee->fid);
+		free(ee);
+	}
+
+	fprintf(stderr, "frags sum = %d\n", fidsum);
+	if (fid * (fid + 1) != 2 * fidsum) {
+		fprintf(stderr, "Defrag Failure!\n");
+		//TODO: cmdu_free(cmdu);
+		return NULL;
+	}
+
+	return cmdu;
+}
+
+int cmdufrag_queue_init(void *rxfq)
+{
+	struct cmdufrag_queue *q = (struct cmdufrag_queue *)rxfq;
+
+	memset(q, 0, sizeof(*q));
+	pthread_mutex_init(&q->qlock, NULL);
+	timer_init(&q->ageing_timer, cmdu_fragqueue_ageing_timer_run);
+
+	return 0;
+}
diff --git a/src/cmdu.h b/src/cmdu.h
index 2991d033..8d1fae0a 100644
--- a/src/cmdu.h
+++ b/src/cmdu.h
@@ -6,8 +6,12 @@
 
 #include <stdint.h>
 #include <pthread.h>
+#include <sys/time.h>
 #include <libubox/list.h>
 
+#include "bufutil.h"
+#include "hlist.h"
+
 
 struct cmdu_header {
 	uint8_t version;
@@ -49,6 +53,7 @@ struct cmdu_buff {
 	uint8_t *tail;
 	uint8_t *end;
 	uint8_t dev_macaddr[6];
+	uint8_t origin[6];
 	uint16_t datalen;
 	uint16_t len;
 	struct cmdu_linear *cdata;
@@ -93,6 +98,28 @@ struct cmdu_buff *cmdu_alloc(int size);
 struct cmdu_buff *cmdu_alloc_default(void);
 void cmdu_free(struct cmdu_buff *c);
 
+static inline uint16_t cmdu_get_type(struct cmdu_buff *c)
+{
+	return (c && c->cdata) ?
+		buf_get_be16((uint8_t *)&c->cdata->hdr.type) : 0xffff;
+}
+
+static inline uint16_t cmdu_get_mid(struct cmdu_buff *c)
+{
+	return (c && c->cdata) ?
+		buf_get_be16((uint8_t *)&c->cdata->hdr.mid) : 0xffff;
+}
+
+static inline uint8_t cmdu_get_fid(struct cmdu_buff *c)
+{
+	return (c && c->cdata) ? c->cdata->hdr.fid : 0xff;
+}
+
+static inline uint8_t *cmdu_get_origin(struct cmdu_buff *c)
+{
+	return c ? c->origin : NULL;
+}
+
 int cmdu_size(struct cmdu_buff *c);
 uint16_t cmdu_get_next_mid(void);
 uint16_t cmdu_init_mid(void);
@@ -148,4 +175,42 @@ extern void cmdu_enqueue(struct cmdu_queue *q, struct cmdu_buff *c);
 extern struct cmdu_buff *cmdu_dequeue(struct cmdu_queue *q);
 
 
+struct cmdu_frag_rx {
+	struct cmdu_buff *cmdu;
+	uint16_t type;
+	uint16_t mid;
+	uint8_t fid;
+	bool last_frag;
+	uint8_t origin[6];
+	struct cmdu_frag_rx *next, *last;
+	uint32_t tlen;
+	uint16_t numfrags;
+	struct hlist_node hlist;
+	uint32_t ageing_time;    /* in msecs */
+	struct timeval ageing_tmo;
+};
+
+#ifndef MAC_ADDR_HASH
+#define MAC_ADDR_HASH(_a)	(_a[0] ^ _a[1] ^ _a[2] ^ _a[3] ^ _a[4] ^ _a[5])
+#endif
+
+#define NUM_FRAGMENTS	128
+
+// TODO: improve func
+#define cmdu_frag_hash(t, m, o)		\
+		((MAC_ADDR_HASH(o) ^ (t) ^ (m)) & (NUM_FRAGMENTS - 1))
+
+struct cmdufrag_queue {
+	pthread_mutex_t qlock;
+	struct hlist_head table[NUM_FRAGMENTS];
+	int pending_cnt;
+	atimer_t ageing_timer;
+	struct timeval next_tmo;
+};
+
+int cmdufrag_queue_init(void *rxfq);
+int cmdufrag_queue_destroy(void *rxfq);
+int cmdufrag_queue_enqueue(void *rxfq, struct cmdu_buff *frag, uint32_t timeout);
+struct cmdu_buff *cmdu_defrag(void *rxfq, struct cmdu_buff *lastfrag);
+
 #endif /* CMDU_H */
diff --git a/src/cmdu_ackq.h b/src/cmdu_ackq.h
index 6adb61df..96a04323 100644
--- a/src/cmdu_ackq.h
+++ b/src/cmdu_ackq.h
@@ -17,7 +17,11 @@
 #include "hlist.h"
 
 #define CMDU_BACKLOG_MAX	128
+
+#ifndef MAC_ADDR_HASH
 #define MAC_ADDR_HASH(a)	(a[0] ^ a[1] ^ a[2] ^ a[3] ^ a[4] ^ a[5])
+#endif
+
 // TODO: improve hash func
 #define cmdu_ackq_hash(t, m, o)		\
 	((MAC_ADDR_HASH(o) ^ (t) ^ (m)) & (CMDU_BACKLOG_MAX - 1))
diff --git a/src/cmduqueue.c b/src/cmduqueue.c
index e5a2cd06..7c60370c 100644
--- a/src/cmduqueue.c
+++ b/src/cmduqueue.c
@@ -10,6 +10,8 @@
 #include <stdlib.h>
 #include <pthread.h>
 
+#include "timer.h"
+#include "bufutil.h"
 #include "cmdu.h"
 
 int cmdu_queue_init(struct cmdu_queue *q)
diff --git a/src/i1905.c b/src/i1905.c
index 03b9b830..c09c5847 100644
--- a/src/i1905.c
+++ b/src/i1905.c
@@ -494,9 +494,24 @@ static void i1905_recv_1905(struct uloop_fd *fd, unsigned int events)
 		rxf->datalen = res - eth_hdrsize - sizeof(struct cmdu_header);
 		rxf->tail = rxf->data + rxf->datalen;
 		memcpy(rxf->dev_macaddr, iface->macaddr, 6);
+		memcpy(rxf->origin, rxf->head + 6, 6);
 
-		cmdu_enqueue(&pif->rxqueue, rxf);
-		cmdu_queue_signal(&pif->rxqueue);
+		if (!IS_CMDU_LAST_FRAGMENT(rxf->cdata)) {
+			cmdufrag_queue_enqueue(&pif->rxfrag_queue, rxf, 1000);
+		} else {
+			if (rxf->cdata->hdr.fid == 0) {
+				cmdu_enqueue(&pif->rxqueue, rxf);
+			} else {
+				struct cmdu_buff *rxff;
+
+				cmdufrag_queue_enqueue(&pif->rxfrag_queue, rxf, 1000);
+				rxff = cmdu_defrag(&pif->rxfrag_queue, rxf);
+				if (rxff)
+					cmdu_enqueue(&pif->rxqueue, rxf);
+			}
+
+			cmdu_queue_signal(&pif->rxqueue);
+		}
 	}
 }
 
@@ -607,6 +622,7 @@ static void i1905_recv_lldp(struct uloop_fd *fd, unsigned int events)
 		rxf->datalen = res - eth_hdrsize;
 		rxf->tail = rxf->data + rxf->datalen;
 		memcpy(rxf->dev_macaddr, iface->macaddr, 6);
+		memcpy(rxf->origin, rxf->head + 6, 6);
 
 		cmdu_enqueue(&pif->rxqueue, rxf);
 		cmdu_queue_signal(&pif->rxqueue);
@@ -687,6 +703,7 @@ static int i1905_setup_interface_priv(struct i1905_interface *n)
 	p->sock_lldp = -1;
 
 	cmdu_queue_init(&p->rxqueue);
+	cmdufrag_queue_init(&p->rxfrag_queue);
 
 	fprintf(stderr, "%s: starting rx worker...\n", __func__);
 	p->rxwork.data = p;
diff --git a/src/i1905.h b/src/i1905.h
index dcdca005..2e15c22b 100644
--- a/src/i1905.h
+++ b/src/i1905.h
@@ -28,6 +28,7 @@ struct i1905_interface_private {
 	struct uloop_fd uloop_1905;
 	struct uloop_fd uloop_lldp;
 	struct cmdu_queue rxqueue;
+	struct cmdufrag_queue rxfrag_queue;
 	struct worker rxwork;
 	struct cmdu_ackq txack_q;
 	void *i1905private;
diff --git a/src/i1905_dm.c b/src/i1905_dm.c
index ee778cd0..ca2c61c6 100644
--- a/src/i1905_dm.c
+++ b/src/i1905_dm.c
@@ -26,6 +26,7 @@
 #include <easy/easy.h>
 
 
+#include "timer.h"
 #include "util.h"
 #include "bufutil.h"
 #include "cmdu.h"
diff --git a/src/i1905_extension.c b/src/i1905_extension.c
index 7c196c9f..49d12b3b 100644
--- a/src/i1905_extension.c
+++ b/src/i1905_extension.c
@@ -4,7 +4,9 @@
 #include <stdint.h>
 #include <string.h>
 #include <dlfcn.h>
+#include <sys/time.h>
 
+#include "timer.h"
 #include "bufutil.h"
 #include "cmdu.h"
 #include "1905_tlvs.h"
diff --git a/src/util.c b/src/util.c
index e0c50f85..d133f007 100644
--- a/src/util.c
+++ b/src/util.c
@@ -15,6 +15,7 @@
 #include <sys/stat.h>
 #include <sys/socket.h>
 #include <sys/ioctl.h>
+#include <sys/time.h>
 #include <netlink/genl/genl.h>
 #include <netlink/genl/ctrl.h>
 #include <netlink/route/link.h>
@@ -22,7 +23,26 @@
 
 #include "util.h"
 
-/* TODO: move following two functions to separate file */
+/* TODO: move following three functions to separate file */
+int timeradd_msecs(struct timeval *a, unsigned long msecs, struct timeval *res)
+{
+	if (res) {
+		struct timeval t = { 0 };
+
+		if (msecs > 1000) {
+			t.tv_sec += msecs / 1000;
+			t.tv_usec = (msecs % 1000) * 1000;
+		} else {
+			t.tv_usec = msecs * 1000;
+		}
+
+		timeradd(a, &t, res);
+		return 0;
+	}
+
+	return -1;
+}
+
 void get_random_bytes(int num, uint8_t *buf)
 {
 	unsigned int seed = (unsigned int)time(NULL);
diff --git a/src/util.h b/src/util.h
index 3f16a43c..7adb7b1f 100644
--- a/src/util.h
+++ b/src/util.h
@@ -5,6 +5,8 @@
 
 #include <libubox/list.h>
 
+
+int timeradd_msecs(struct timeval *a, unsigned long msecs, struct timeval *res);
 void get_random_bytes(int num, uint8_t *buf);
 void bufprintf(uint8_t *buf, int len, const char *label);
 
-- 
GitLab