diff --git a/include/asterisk/devicestate.h b/include/asterisk/devicestate.h index ccc46311e31cdf3c435d26704dffc31ef1ae18e8..4f9aa739bf9f88a275240913ddb92368a444f8d6 100644 --- a/include/asterisk/devicestate.h +++ b/include/asterisk/devicestate.h @@ -59,7 +59,7 @@ enum ast_device_state { AST_DEVICE_RINGING, /*!< Device is ringing */ AST_DEVICE_RINGINUSE, /*!< Device is ringing *and* in use */ AST_DEVICE_ONHOLD, /*!< Device is on hold */ - AST_DEVICE_TOTAL, /*/ Total num of device states, used for testing */ + AST_DEVICE_TOTAL, /*!< Total num of device states, used for testing */ }; /*! \brief Device State Cachability diff --git a/include/asterisk/event.h b/include/asterisk/event.h index 3178de5c221e828414e7154e502c2e5f22e134d9..7eea0581d962dfa56101d4ecf4b7a9b500d223f5 100644 --- a/include/asterisk/event.h +++ b/include/asterisk/event.h @@ -25,33 +25,27 @@ /*! * \page AstGenericEvents Generic event system * - * The purpose of this API is to provide a generic way to share events between - * Asterisk modules. Code can generate events, and other code can subscribe to - * them. + * Prior to the creation of \ref stasis, the purpose of this API was to provide + * a generic way to share events between Asterisk modules. Once there was a need + * to disseminate data whose definition was provided by the producers/consumers, + * it was no longer possible to use the binary representation in the generic + * event system. + * + * That aside, the generic event system is still useful and used by several + * modules in Asterisk. + * - CEL uses the \ref ast_event representation to pass information to registered + * backends. + * - The \file res_corosync module publishes \ref ast_event representations of + * information to other Asterisk instances in a cluster. + * - Security event represent their event types and data using this system. + * - Theoretically, any \ref stasis message can use this system to pass + * information around in a binary format. * * Events have an associated event type, as well as information elements. The * information elements are the meta data that go along with each event. For * example, in the case of message waiting indication, the event type is MWI, * and each MWI event contains at least three information elements: the * mailbox, the number of new messages, and the number of old messages. - * - * Subscriptions to events consist of an event type and information elements, - * as well. Subscriptions can be to all events, or a certain subset of events. - * If an event type is provided, only events of that type will be sent to this - * subscriber. Furthermore, if information elements are supplied with the - * subscription, only events that contain the specified information elements - * with specified values will be sent to the subscriber. For example, when a - * SIP phone subscribes to MWI for mailbox 1234, then chan_sip can subscribe - * to internal Asterisk MWI events with the MAILBOX information element with - * a value of "1234". - * - * Another key feature of this event system is the ability to cache events. - * It is useful for some types of events to be able to remember the last known - * value. These are usually events that indicate some kind of state change. - * In the example of MWI, app_voicemail can instruct the event core to cache - * these events based on the mailbox. So, the last known MWI state of each - * mailbox will be cached, and other modules can retrieve this information - * on demand without having to poll the mailbox directly. */ #ifndef AST_EVENT_H @@ -109,9 +103,6 @@ struct ast_event *ast_event_new(enum ast_event_type event_type, ...); * * \return Nothing * - * \note Events that have been queued should *not* be destroyed by the code that - * created the event. It will be automatically destroyed after being - * dispatched to the appropriate subscribers. */ void ast_event_destroy(struct ast_event *event); @@ -149,6 +140,55 @@ int ast_event_append_ie_str(struct ast_event **event, enum ast_event_ie_type ie_ int ast_event_append_ie_uint(struct ast_event **event, enum ast_event_ie_type ie_type, uint32_t data); +/*! + * \brief Append an information element that has a bitflags payload + * + * \param event the event that the IE will be appended to + * \param ie_type the type of IE to append + * \param bitflags the flags that are the payload of the IE + * + * \retval 0 success + * \retval -1 failure + * \since 1.8 + * + * The pointer to the event will get updated with the new location for the event + * that now contains the appended information element. If the re-allocation of + * the memory for this event fails, it will be set to NULL. + */ +int ast_event_append_ie_bitflags(struct ast_event **event, enum ast_event_ie_type ie_type, + uint32_t bitflags); + +/*! + * \brief Append an information element that has a raw payload + * + * \param event the event that the IE will be appended to + * \param ie_type the type of IE to append + * \param data A pointer to the raw data for the payload of the IE + * \param data_len The amount of data to copy into the payload + * + * \retval 0 success + * \retval -1 failure + * + * The pointer to the event will get updated with the new location for the event + * that now contains the appended information element. If the re-allocation of + * the memory for this event fails, it will be set to NULL. + */ +int ast_event_append_ie_raw(struct ast_event **event, enum ast_event_ie_type ie_type, + const void *data, size_t data_len); + +/*! + * \brief Append the global EID IE + * + * \param event the event to append IE to + * + * \note For ast_event_new() that includes IEs, this is done automatically + * for you. + * + * \retval 0 success + * \retval -1 failure + */ +int ast_event_append_eid(struct ast_event **event); + /*! * \brief Get the value of an information element that has an integer payload * @@ -172,6 +212,38 @@ uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_ */ const char *ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type); +/*! + * \brief Get the value of an information element that has a raw payload + * + * \param event The event to get the IE from + * \param ie_type the type of information element to retrieve + * + * \return This returns the payload of the information element with the given type. + * If the information element isn't found, NULL will be returned. + */ +const void *ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type); + +/*! + * \brief Get the length of the raw payload for a particular IE + * + * \param event The event to get the IE payload length from + * \param ie_type the type of information element to get the length of + * + * \return If an IE of type ie_type is found, its payload length is returned. + * Otherwise, 0 is returned. + */ +uint16_t ast_event_get_ie_raw_payload_len(const struct ast_event *event, enum ast_event_ie_type ie_type); + +/*! + * \brief Get the string representation of the type of the given event + * + * \arg event the event to get the type of + * + * \return the string representation of the event type of the provided event + * \since 1.6.1 + */ +const char *ast_event_get_type_name(const struct ast_event *event); + /*! * \brief Get the string representation of an information element type * @@ -273,6 +345,13 @@ uint32_t ast_event_iterator_get_ie_uint(struct ast_event_iterator *iterator); */ const char *ast_event_iterator_get_ie_str(struct ast_event_iterator *iterator); +/*! + * \brief Get the minimum length of an ast_event. + * + * \return minimum amount of memory that will be consumed by any ast_event. + */ +size_t ast_event_minimum_length(void); + #if defined(__cplusplus) || defined(c_plusplus) } #endif diff --git a/include/asterisk/event_defs.h b/include/asterisk/event_defs.h index f8f98ace0ca5cbf51f9ee0e5b2c189afe03d26e0..80a8d7dda171c9f8be4667ee4f9012f3dd84ef69 100644 --- a/include/asterisk/event_defs.h +++ b/include/asterisk/event_defs.h @@ -25,23 +25,41 @@ #ifndef AST_EVENT_DEFS_H #define AST_EVENT_DEFS_H -/*! \brief Event types - * \note These values no longer go over the wire and can change when items are removed. */ enum ast_event_type { /*! Reserved to provide the ability to subscribe to all events. A specific * event should never have a payload of 0. */ AST_EVENT_ALL = 0x00, /*! This event type is reserved for use by third-party modules to create - * custom events without having to modify this file. + * custom events without having to modify this file. * \note There are no "custom" IE types, because IEs only have to be * unique to the event itself, not necessarily across all events. */ AST_EVENT_CUSTOM = 0x01, + /*! Voicemail message waiting indication */ + AST_EVENT_MWI = 0x02, /*! Someone has subscribed to events */ - AST_EVENT_SUB = 0x02, + AST_EVENT_SUB = 0x03, + /*! Someone has unsubscribed from events */ + AST_EVENT_UNSUB = 0x04, + /*! The aggregate state of a device across all servers configured to be + * a part of a device state cluster has changed. */ + AST_EVENT_DEVICE_STATE = 0x05, + /*! The state of a device has changed on _one_ server. This should not be used + * directly, in general. Use AST_EVENT_DEVICE_STATE instead. */ + AST_EVENT_DEVICE_STATE_CHANGE = 0x06, /*! Channel Event Logging events */ - AST_EVENT_CEL = 0x03, + AST_EVENT_CEL = 0x07, + /*! A report of a security related event (see security_events.h) */ + AST_EVENT_SECURITY = 0x08, + /*! Used by res_stun_monitor to alert listeners to an exernal network address change. */ + AST_EVENT_NETWORK_CHANGE = 0x09, + /*! The presence state for a presence provider */ + AST_EVENT_PRESENCE_STATE = 0x0a, + /*! Used to alert listeners when a named ACL has changed. */ + AST_EVENT_ACL_CHANGE = 0x0b, + /*! Send out a ping for debugging distributed events */ + AST_EVENT_PING = 0x0c, /*! Number of event types. This should be the last event type + 1 */ - AST_EVENT_TOTAL = 0x04, + AST_EVENT_TOTAL = 0x0d, }; /*! \brief Event Information Element types */ @@ -49,199 +67,243 @@ enum ast_event_ie_type { /*! Used to terminate the arguments to event functions */ AST_EVENT_IE_END = -1, - /*! + /*! + * \brief Number of new messages + * Used by: AST_EVENT_MWI + * Payload type: UINT + */ + AST_EVENT_IE_NEWMSGS = 0x0001, + /*! + * \brief Number of + * Used by: AST_EVENT_MWI + * Payload type: UINT + */ + AST_EVENT_IE_OLDMSGS = 0x0002, + /*! + * \brief Mailbox name \verbatim (mailbox[@context]) \endverbatim + * Used by: AST_EVENT_MWI + * Payload type: STR + */ + AST_EVENT_IE_MAILBOX = 0x0003, + /*! * \brief Unique ID * Used by: AST_EVENT_SUB, AST_EVENT_UNSUB * Payload type: UINT */ - AST_EVENT_IE_UNIQUEID = 0x0001, - /*! - * \brief Event type + AST_EVENT_IE_UNIQUEID = 0x0004, + /*! + * \brief Event type * Used by: AST_EVENT_SUB, AST_EVENT_UNSUB * Payload type: UINT */ - AST_EVENT_IE_EVENTTYPE = 0x0002, + AST_EVENT_IE_EVENTTYPE = 0x0005, /*! * \brief Hint that someone cares that an IE exists * Used by: AST_EVENT_SUB * Payload type: UINT (ast_event_ie_type) */ - AST_EVENT_IE_EXISTS = 0x0003, - /*! - * \brief Context IE - * Used by AST_EVENT_MWI - * Payload type: str - */ - AST_EVENT_IE_CONTEXT = 0x0004, - /*! + AST_EVENT_IE_EXISTS = 0x0006, + /*! + * \brief Device Name + * Used by AST_EVENT_DEVICE_STATE_CHANGE + * Payload type: STR + */ + AST_EVENT_IE_DEVICE = 0x0007, + /*! + * \brief Generic State IE + * Used by AST_EVENT_DEVICE_STATE_CHANGE + * Payload type: UINT + * The actual state values depend on the event which + * this IE is a part of. + */ + AST_EVENT_IE_STATE = 0x0008, + /*! + * \brief Context IE + * Used by AST_EVENT_MWI + * Payload type: str + */ + AST_EVENT_IE_CONTEXT = 0x0009, + /*! * \brief Channel Event Type * Used by: AST_EVENT_CEL * Payload type: UINT */ - AST_EVENT_IE_CEL_EVENT_TYPE = 0x0005, - /*! + AST_EVENT_IE_CEL_EVENT_TYPE = 0x000a, + /*! * \brief Channel Event Time (seconds) * Used by: AST_EVENT_CEL * Payload type: UINT */ - AST_EVENT_IE_CEL_EVENT_TIME = 0x0006, - /*! + AST_EVENT_IE_CEL_EVENT_TIME = 0x000b, + /*! * \brief Channel Event Time (micro-seconds) * Used by: AST_EVENT_CEL * Payload type: UINT */ - AST_EVENT_IE_CEL_EVENT_TIME_USEC = 0x0007, - /*! + AST_EVENT_IE_CEL_EVENT_TIME_USEC = 0x000c, + /*! * \brief Channel Event User Event Name * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_USEREVENT_NAME = 0x0008, - /*! + AST_EVENT_IE_CEL_USEREVENT_NAME = 0x000d, + /*! * \brief Channel Event CID name * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_CIDNAME = 0x0009, - /*! + AST_EVENT_IE_CEL_CIDNAME = 0x000e, + /*! * \brief Channel Event CID num * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_CIDNUM = 0x000a, - /*! + AST_EVENT_IE_CEL_CIDNUM = 0x000f, + /*! * \brief Channel Event extension name * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_EXTEN = 0x000b, - /*! + AST_EVENT_IE_CEL_EXTEN = 0x0010, + /*! * \brief Channel Event context name * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_CONTEXT = 0x000c, - /*! + AST_EVENT_IE_CEL_CONTEXT = 0x0011, + /*! * \brief Channel Event channel name * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_CHANNAME = 0x000d, - /*! + AST_EVENT_IE_CEL_CHANNAME = 0x0012, + /*! * \brief Channel Event app name * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_APPNAME = 0x000e, - /*! + AST_EVENT_IE_CEL_APPNAME = 0x0013, + /*! * \brief Channel Event app args/data * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_APPDATA = 0x000f, - /*! + AST_EVENT_IE_CEL_APPDATA = 0x0014, + /*! * \brief Channel Event AMA flags * Used by: AST_EVENT_CEL * Payload type: UINT */ - AST_EVENT_IE_CEL_AMAFLAGS = 0x0010, - /*! + AST_EVENT_IE_CEL_AMAFLAGS = 0x0015, + /*! * \brief Channel Event AccountCode * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_ACCTCODE = 0x0011, - /*! + AST_EVENT_IE_CEL_ACCTCODE = 0x0016, + /*! * \brief Channel Event UniqueID * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_UNIQUEID = 0x0012, - /*! + AST_EVENT_IE_CEL_UNIQUEID = 0x0017, + /*! * \brief Channel Event Userfield * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_USERFIELD = 0x0013, - /*! + AST_EVENT_IE_CEL_USERFIELD = 0x0018, + /*! * \brief Channel Event CID ANI field * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_CIDANI = 0x0014, - /*! + AST_EVENT_IE_CEL_CIDANI = 0x0019, + /*! * \brief Channel Event CID RDNIS field * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_CIDRDNIS = 0x0015, - /*! + AST_EVENT_IE_CEL_CIDRDNIS = 0x001a, + /*! * \brief Channel Event CID dnid * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_CIDDNID = 0x0016, - /*! + AST_EVENT_IE_CEL_CIDDNID = 0x001b, + /*! * \brief Channel Event Peer -- for Things involving multiple channels, like BRIDGE * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_PEER = 0x0017, - /*! + AST_EVENT_IE_CEL_PEER = 0x001c, + /*! * \brief Channel Event LinkedID * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_LINKEDID = 0x0018, - /*! + AST_EVENT_IE_CEL_LINKEDID = 0x001d, + /*! * \brief Channel Event peeraccount * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_PEERACCT = 0x0019, - /*! + AST_EVENT_IE_CEL_PEERACCT = 0x001e, + /*! * \brief Channel Event extra data * Used by: AST_EVENT_CEL * Payload type: STR */ - AST_EVENT_IE_CEL_EXTRA = 0x001a, + AST_EVENT_IE_CEL_EXTRA = 0x001f, /*! * \brief Description * Used by: AST_EVENT_SUB, AST_EVENT_UNSUB * Payload type: STR */ - AST_EVENT_IE_DESCRIPTION = 0x001b, + AST_EVENT_IE_DESCRIPTION = 0x0020, /*! * \brief Entity ID * Used by All events * Payload type: RAW * This IE indicates which server the event originated from */ - AST_EVENT_IE_EVENT_VERSION = 0x001c, - AST_EVENT_IE_SERVICE = 0x001d, - AST_EVENT_IE_MODULE = 0x001e, - AST_EVENT_IE_ACCOUNT_ID = 0x001f, - AST_EVENT_IE_SESSION_ID = 0x0020, - AST_EVENT_IE_SESSION_TV = 0x0021, - AST_EVENT_IE_ACL_NAME = 0x0022, - AST_EVENT_IE_LOCAL_ADDR = 0x0023, - AST_EVENT_IE_REMOTE_ADDR = 0x0024, - AST_EVENT_IE_EVENT_TV = 0x0025, - AST_EVENT_IE_REQUEST_TYPE = 0x0026, - AST_EVENT_IE_REQUEST_PARAMS = 0x0027, - AST_EVENT_IE_AUTH_METHOD = 0x0028, - AST_EVENT_IE_SEVERITY = 0x0029, - AST_EVENT_IE_EXPECTED_ADDR = 0x002a, - AST_EVENT_IE_CHALLENGE = 0x002b, - AST_EVENT_IE_RESPONSE = 0x002c, - AST_EVENT_IE_EXPECTED_RESPONSE = 0x002e, - AST_EVENT_IE_RECEIVED_CHALLENGE = 0x002f, - AST_EVENT_IE_RECEIVED_HASH = 0x0030, - AST_EVENT_IE_USING_PASSWORD = 0x0031, - AST_EVENT_IE_ATTEMPTED_TRANSPORT = 0x0032, + AST_EVENT_IE_EID = 0x0021, + AST_EVENT_IE_SECURITY_EVENT = 0x0022, + AST_EVENT_IE_EVENT_VERSION = 0x0023, + AST_EVENT_IE_SERVICE = 0x0024, + AST_EVENT_IE_MODULE = 0x0025, + AST_EVENT_IE_ACCOUNT_ID = 0x0026, + AST_EVENT_IE_SESSION_ID = 0x0027, + AST_EVENT_IE_SESSION_TV = 0x0028, + AST_EVENT_IE_ACL_NAME = 0x0029, + AST_EVENT_IE_LOCAL_ADDR = 0x002a, + AST_EVENT_IE_REMOTE_ADDR = 0x002b, + AST_EVENT_IE_EVENT_TV = 0x002c, + AST_EVENT_IE_REQUEST_TYPE = 0x002d, + AST_EVENT_IE_REQUEST_PARAMS = 0x002e, + AST_EVENT_IE_AUTH_METHOD = 0x002f, + AST_EVENT_IE_SEVERITY = 0x0030, + AST_EVENT_IE_EXPECTED_ADDR = 0x0031, + AST_EVENT_IE_CHALLENGE = 0x0032, + AST_EVENT_IE_RESPONSE = 0x0033, + AST_EVENT_IE_EXPECTED_RESPONSE = 0x0034, + AST_EVENT_IE_RECEIVED_CHALLENGE = 0x0035, + AST_EVENT_IE_RECEIVED_HASH = 0x0036, + AST_EVENT_IE_USING_PASSWORD = 0x0037, + AST_EVENT_IE_ATTEMPTED_TRANSPORT = 0x0038, + AST_EVENT_IE_PRESENCE_PROVIDER = 0x0039, + AST_EVENT_IE_PRESENCE_STATE = 0x003a, + AST_EVENT_IE_PRESENCE_SUBTYPE = 0x003b, + AST_EVENT_IE_PRESENCE_MESSAGE = 0x003c, + /*! + * \brief Event non-cachability flag + * Used by: All events + * Payload type: UINT + */ + AST_EVENT_IE_CACHABLE = 0x003d, /*! \brief Must be the last IE value +1 */ - AST_EVENT_IE_TOTAL = 0x0033, + AST_EVENT_IE_TOTAL = 0x003e, }; /*! @@ -249,12 +311,16 @@ enum ast_event_ie_type { */ enum ast_event_ie_pltype { AST_EVENT_IE_PLTYPE_UNKNOWN = -1, + /*! Just check if it exists, not the value */ + AST_EVENT_IE_PLTYPE_EXISTS, /*! Unsigned Integer (Can be used for signed, too ...) */ AST_EVENT_IE_PLTYPE_UINT, /*! String */ AST_EVENT_IE_PLTYPE_STR, /*! Raw data, compared with memcmp */ AST_EVENT_IE_PLTYPE_RAW, + /*! Bit flags (unsigned integer, compared using boolean logic) */ + AST_EVENT_IE_PLTYPE_BITFLAGS, }; /*! diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 20870e6d6b9b22b04437903fcf95c8c1eb528b87..f5b4a60d9914a59ff69f800cab868dd0c68b444a 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -171,6 +171,7 @@ #include "asterisk/json.h" #include "asterisk/manager.h" #include "asterisk/utils.h" +#include "asterisk/event.h" /*! @{ */ @@ -255,6 +256,21 @@ struct stasis_message_vtable { */ struct ast_manager_event_blob *(*to_ami)( struct stasis_message *message); + + /*! + * \since 12.3.0 + * \brief Build the \ref ast_event representation of the message. + * + * May be \c NULL, or may return \c NULL, to indicate no representation. + * The returned object should be free'd. + * + * \param message Message to convert to an \ref ast_event. + * \return Newly allocated \ref ast_event. + * \return \c NULL on error. + * \return \c NULL if AMI format is not supported. + */ + struct ast_event *(*to_event)( + struct stasis_message *message); }; /*! @@ -389,6 +405,19 @@ struct ast_json *stasis_message_to_json(struct stasis_message *message, struct s struct ast_manager_event_blob *stasis_message_to_ami( struct stasis_message *message); +/*! + * \brief Build the \ref AstGenericEvents representation of the message. + * + * May return \c NULL, to indicate no representation. The returned object should + * be disposed of via \ref ast_event_destroy. + * + * \param message Message to convert to AMI. + * \return \c NULL on error. + * \return \c NULL if AMI format is not supported. + */ +struct ast_event *stasis_message_to_event( + struct stasis_message *message); + /*! @} */ /*! @{ */ @@ -1020,6 +1049,7 @@ void stasis_log_bad_type_access(const char *name); * STASIS_MESSAGE_TYPE_DEFN(ast_foo_type, * .to_ami = foo_to_ami, * .to_json = foo_to_json, + * .to_event = foo_to_event, * ); * \endcode * @@ -1046,6 +1076,7 @@ void stasis_log_bad_type_access(const char *name); * STASIS_MESSAGE_TYPE_DEFN_LOCAL(ast_foo_type, * .to_ami = foo_to_ami, * .to_json = foo_to_json, + * .to_event = foo_to_event, * ); * \endcode * diff --git a/main/app.c b/main/app.c index a5c4c7af560f80509405ef4dfcf37fd1ab6c1e4b..3c2f33c51f1160cb9c304cc49c0d2d82857d39e0 100644 --- a/main/app.c +++ b/main/app.c @@ -96,10 +96,43 @@ static struct stasis_topic *queue_topic_all; static struct stasis_topic_pool *queue_topic_pool; /* @} */ +/*! \brief Convert a MWI \ref stasis_message to a \ref ast_event */ +static struct ast_event *mwi_to_event(struct stasis_message *message) +{ + struct ast_event *event; + struct ast_mwi_state *mwi_state; + char *mailbox; + char *context; + + if (!message) { + return NULL; + } + + mwi_state = stasis_message_data(message); + + /* Strip off @context */ + context = mailbox = ast_strdupa(mwi_state->uniqueid); + strsep(&context, "@"); + if (ast_strlen_zero(context)) { + context = "default"; + } + + event = ast_event_new(AST_EVENT_MWI, + AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox, + AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context, + AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, mwi_state->new_msgs, + AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, mwi_state->old_msgs, + AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, &mwi_state->eid, sizeof(mwi_state->eid), + AST_EVENT_IE_END); + + return event; +} + /* * @{ \brief Define \ref stasis message types for MWI */ -STASIS_MESSAGE_TYPE_DEFN(ast_mwi_state_type); +STASIS_MESSAGE_TYPE_DEFN(ast_mwi_state_type, + .to_event = mwi_to_event, ); STASIS_MESSAGE_TYPE_DEFN(ast_mwi_vm_app_type); /* @} */ diff --git a/main/devicestate.c b/main/devicestate.c index 1199e68ab403fba5c891d2aa7283cb85fe821ea0..3580b1af7f57659a976ad426988564ad57b3e37d 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -224,9 +224,12 @@ static struct stasis_caching_topic *device_state_topic_cached; static struct stasis_topic_pool *device_state_topic_pool; static struct ast_manager_event_blob *devstate_to_ami(struct stasis_message *msg); +static struct ast_event *devstate_to_event(struct stasis_message *msg); + STASIS_MESSAGE_TYPE_DEFN(ast_device_state_message_type, .to_ami = devstate_to_ami, + .to_event = devstate_to_event, ); /* Forward declarations */ @@ -925,3 +928,33 @@ static struct ast_manager_event_blob *devstate_to_ami(struct stasis_message *msg "State: %s\r\n", dev_state->device, ast_devstate_str(dev_state->state)); } + +/*! \brief Convert a \ref stasis_message to a \ref ast_event */ +static struct ast_event *devstate_to_event(struct stasis_message *message) +{ + struct ast_event *event; + struct ast_device_state_message *device_state; + + if (!message) { + return NULL; + } + + device_state = stasis_message_data(message); + + if (device_state->eid) { + event = ast_event_new(AST_EVENT_DEVICE_STATE_CHANGE, + AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device_state->device, + AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, device_state->state, + AST_EVENT_IE_CACHABLE, AST_EVENT_IE_PLTYPE_UINT, device_state->cachable, + AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, device_state->eid, sizeof(*device_state->eid), + AST_EVENT_IE_END); + } else { + event = ast_event_new(AST_EVENT_DEVICE_STATE, + AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device_state->device, + AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, device_state->state, + AST_EVENT_IE_CACHABLE, AST_EVENT_IE_PLTYPE_UINT, device_state->cachable, + AST_EVENT_IE_END); + } + + return event; +} diff --git a/main/event.c b/main/event.c index 990f62161d19459e28bfe3bdd8238575e5c8844c..876e53b88f0f80ff2002215c7f411d0a2df8c92f 100644 --- a/main/event.c +++ b/main/event.c @@ -44,10 +44,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/cli.h" -static int event_append_ie_raw(struct ast_event **event, enum ast_event_ie_type ie_type, - const void *data, size_t data_len); -static const void *event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type); - /*! * \brief An event information element * @@ -109,19 +105,42 @@ struct ast_event_ie_val { size_t raw_datalen; }; -struct ie_map { - enum ast_event_ie_pltype ie_pltype; - const char *name; +/*! + * \brief Event Names + */ +static const char * const event_names[AST_EVENT_TOTAL] = { + [AST_EVENT_ALL] = "All", + [AST_EVENT_CUSTOM] = "Custom", + [AST_EVENT_MWI] = "MWI", + [AST_EVENT_SUB] = "Subscription", + [AST_EVENT_UNSUB] = "Unsubscription", + [AST_EVENT_DEVICE_STATE] = "DeviceState", + [AST_EVENT_DEVICE_STATE_CHANGE] = "DeviceStateChange", + [AST_EVENT_CEL] = "CEL", + [AST_EVENT_SECURITY] = "Security", + [AST_EVENT_NETWORK_CHANGE] = "NetworkChange", + [AST_EVENT_PRESENCE_STATE] = "PresenceState", + [AST_EVENT_ACL_CHANGE] = "ACLChange", + [AST_EVENT_PING] = "Ping", }; /*! * \brief IE payload types and names */ -static const struct ie_map ie_maps[AST_EVENT_IE_TOTAL] = { +static const struct ie_map { + enum ast_event_ie_pltype ie_pltype; + const char *name; +} ie_maps[AST_EVENT_IE_TOTAL] = { + [AST_EVENT_IE_NEWMSGS] = { AST_EVENT_IE_PLTYPE_UINT, "NewMessages" }, + [AST_EVENT_IE_OLDMSGS] = { AST_EVENT_IE_PLTYPE_UINT, "OldMessages" }, + [AST_EVENT_IE_MAILBOX] = { AST_EVENT_IE_PLTYPE_STR, "Mailbox" }, [AST_EVENT_IE_UNIQUEID] = { AST_EVENT_IE_PLTYPE_UINT, "UniqueID" }, [AST_EVENT_IE_EVENTTYPE] = { AST_EVENT_IE_PLTYPE_UINT, "EventType" }, [AST_EVENT_IE_EXISTS] = { AST_EVENT_IE_PLTYPE_UINT, "Exists" }, + [AST_EVENT_IE_DEVICE] = { AST_EVENT_IE_PLTYPE_STR, "Device" }, + [AST_EVENT_IE_STATE] = { AST_EVENT_IE_PLTYPE_UINT, "State" }, [AST_EVENT_IE_CONTEXT] = { AST_EVENT_IE_PLTYPE_STR, "Context" }, + [AST_EVENT_IE_EID] = { AST_EVENT_IE_PLTYPE_RAW, "EntityID" }, [AST_EVENT_IE_CEL_EVENT_TYPE] = { AST_EVENT_IE_PLTYPE_UINT, "CELEventType" }, [AST_EVENT_IE_CEL_EVENT_TIME] = { AST_EVENT_IE_PLTYPE_UINT, "CELEventTime" }, [AST_EVENT_IE_CEL_EVENT_TIME_USEC] = { AST_EVENT_IE_PLTYPE_UINT, "CELEventTimeUSec" }, @@ -144,6 +163,7 @@ static const struct ie_map ie_maps[AST_EVENT_IE_TOTAL] = { [AST_EVENT_IE_CEL_LINKEDID] = { AST_EVENT_IE_PLTYPE_STR, "CELLinkedID" }, [AST_EVENT_IE_CEL_PEERACCT] = { AST_EVENT_IE_PLTYPE_STR, "CELPeerAcct" }, [AST_EVENT_IE_CEL_EXTRA] = { AST_EVENT_IE_PLTYPE_STR, "CELExtra" }, + [AST_EVENT_IE_SECURITY_EVENT] = { AST_EVENT_IE_PLTYPE_STR, "SecurityEvent" }, [AST_EVENT_IE_EVENT_VERSION] = { AST_EVENT_IE_PLTYPE_UINT, "EventVersion" }, [AST_EVENT_IE_SERVICE] = { AST_EVENT_IE_PLTYPE_STR, "Service" }, [AST_EVENT_IE_MODULE] = { AST_EVENT_IE_PLTYPE_STR, "Module" }, @@ -166,8 +186,27 @@ static const struct ie_map ie_maps[AST_EVENT_IE_TOTAL] = { [AST_EVENT_IE_RECEIVED_HASH] = { AST_EVENT_IE_PLTYPE_STR, "ReceivedHash" }, [AST_EVENT_IE_USING_PASSWORD] = { AST_EVENT_IE_PLTYPE_UINT, "UsingPassword" }, [AST_EVENT_IE_ATTEMPTED_TRANSPORT] = { AST_EVENT_IE_PLTYPE_STR, "AttemptedTransport" }, + [AST_EVENT_IE_CACHABLE] = { AST_EVENT_IE_PLTYPE_UINT, "Cachable" }, + [AST_EVENT_IE_PRESENCE_PROVIDER] = { AST_EVENT_IE_PLTYPE_STR, "PresenceProvider" }, + [AST_EVENT_IE_PRESENCE_STATE] = { AST_EVENT_IE_PLTYPE_UINT, "PresenceState" }, + [AST_EVENT_IE_PRESENCE_SUBTYPE] = { AST_EVENT_IE_PLTYPE_STR, "PresenceSubtype" }, + [AST_EVENT_IE_PRESENCE_MESSAGE] = { AST_EVENT_IE_PLTYPE_STR, "PresenceMessage" }, }; +const char *ast_event_get_type_name(const struct ast_event *event) +{ + enum ast_event_type type; + + type = ast_event_get_type(event); + + if (type < 0 || type >= ARRAY_LEN(event_names)) { + ast_log(LOG_ERROR, "Invalid event type - '%d'\n", type); + return ""; + } + + return event_names[type]; +} + const char *ast_event_get_ie_type_name(enum ast_event_ie_type ie_type) { if (ie_type <= 0 || ie_type >= ARRAY_LEN(ie_maps)) { @@ -257,7 +296,7 @@ uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_ { const uint32_t *ie_val; - ie_val = event_get_ie_raw(event, ie_type); + ie_val = ast_event_get_ie_raw(event, ie_type); return ie_val ? ntohl(get_unaligned_uint32(ie_val)) : 0; } @@ -266,12 +305,12 @@ const char *ast_event_get_ie_str(const struct ast_event *event, enum ast_event_i { const struct ast_event_ie_str_payload *str_payload; - str_payload = event_get_ie_raw(event, ie_type); + str_payload = ast_event_get_ie_raw(event, ie_type); return str_payload ? str_payload->str : NULL; } -static const void *event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type) +const void *ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type) { struct ast_event_iterator iterator; int res; @@ -285,6 +324,26 @@ static const void *event_get_ie_raw(const struct ast_event *event, enum ast_even return NULL; } +static uint16_t event_iterator_get_ie_raw_payload_len(struct ast_event_iterator *iterator) +{ + return ntohs(iterator->ie->ie_payload_len); +} + +uint16_t ast_event_get_ie_raw_payload_len(const struct ast_event *event, enum ast_event_ie_type ie_type) +{ + struct ast_event_iterator iterator; + int res; + + for (res = ast_event_iterator_init(&iterator, event); !res; res = ast_event_iterator_next(&iterator)) { + if (ast_event_iterator_get_ie_type(&iterator) == ie_type) { + return event_iterator_get_ie_raw_payload_len(&iterator); + } + } + + return 0; +} + + int ast_event_append_ie_str(struct ast_event **event, enum ast_event_ie_type ie_type, const char *str) { @@ -297,17 +356,24 @@ int ast_event_append_ie_str(struct ast_event **event, enum ast_event_ie_type ie_ strcpy(str_payload->str, str); str_payload->hash = ast_str_hash(str); - return event_append_ie_raw(event, ie_type, str_payload, payload_len); + return ast_event_append_ie_raw(event, ie_type, str_payload, payload_len); } int ast_event_append_ie_uint(struct ast_event **event, enum ast_event_ie_type ie_type, uint32_t data) { data = htonl(data); - return event_append_ie_raw(event, ie_type, &data, sizeof(data)); + return ast_event_append_ie_raw(event, ie_type, &data, sizeof(data)); +} + +int ast_event_append_ie_bitflags(struct ast_event **event, enum ast_event_ie_type ie_type, + uint32_t flags) +{ + flags = htonl(flags); + return ast_event_append_ie_raw(event, ie_type, &flags, sizeof(flags)); } -static int event_append_ie_raw(struct ast_event **event, enum ast_event_ie_type ie_type, +int ast_event_append_ie_raw(struct ast_event **event, enum ast_event_ie_type ie_type, const void *data, size_t data_len) { struct ast_event_ie *ie; @@ -361,11 +427,16 @@ struct ast_event *ast_event_new(enum ast_event_type type, ...) memset(ie_value, 0, sizeof(*ie_value)); ie_value->ie_type = ie_type; ie_value->ie_pltype = va_arg(ap, enum ast_event_ie_pltype); + switch (ie_value->ie_pltype) { case AST_EVENT_IE_PLTYPE_UINT: ie_value->payload.uint = va_arg(ap, uint32_t); insert = 1; break; + case AST_EVENT_IE_PLTYPE_BITFLAGS: + ie_value->payload.uint = va_arg(ap, uint32_t); + insert = 1; + break; case AST_EVENT_IE_PLTYPE_STR: ie_value->payload.str = va_arg(ap, const char *); insert = 1; @@ -381,6 +452,7 @@ struct ast_event *ast_event_new(enum ast_event_type type, ...) break; } case AST_EVENT_IE_PLTYPE_UNKNOWN: + case AST_EVENT_IE_PLTYPE_EXISTS: break; } @@ -407,10 +479,14 @@ struct ast_event *ast_event_new(enum ast_event_type type, ...) case AST_EVENT_IE_PLTYPE_UINT: ast_event_append_ie_uint(&event, ie_val->ie_type, ie_val->payload.uint); break; + case AST_EVENT_IE_PLTYPE_BITFLAGS: + ast_event_append_ie_bitflags(&event, ie_val->ie_type, ie_val->payload.uint); + break; case AST_EVENT_IE_PLTYPE_RAW: - event_append_ie_raw(&event, ie_val->ie_type, + ast_event_append_ie_raw(&event, ie_val->ie_type, ie_val->payload.raw, ie_val->raw_datalen); break; + case AST_EVENT_IE_PLTYPE_EXISTS: case AST_EVENT_IE_PLTYPE_UNKNOWN: break; } @@ -421,10 +497,27 @@ struct ast_event *ast_event_new(enum ast_event_type type, ...) } } + if (!ast_event_get_ie_raw(event, AST_EVENT_IE_EID)) { + /* If the event is originating on this server, add the server's + * entity ID to the event. */ + ast_event_append_eid(&event); + } + return event; } +int ast_event_append_eid(struct ast_event **event) +{ + return ast_event_append_ie_raw(event, AST_EVENT_IE_EID, + &ast_eid_default, sizeof(ast_eid_default)); +} + void ast_event_destroy(struct ast_event *event) { ast_free(event); } + +size_t ast_event_minimum_length(void) +{ + return sizeof(struct ast_event); +} diff --git a/main/stasis.c b/main/stasis.c index 4d05f18e85c9a7d1f32ec9f7d8d46052b79e8980..5eca791ef158f61ca1cc9181895dca89cd18de2f 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -686,15 +686,17 @@ struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward) from = forward->from_topic; to = forward->to_topic; - topic_lock_both(to, from); - AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from, - AST_VECTOR_ELEM_CLEANUP_NOOP); + if (from && to) { + topic_lock_both(to, from); + AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from, + AST_VECTOR_ELEM_CLEANUP_NOOP); - for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) { - topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx)); + for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) { + topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx)); + } + ao2_unlock(from); + ao2_unlock(to); } - ao2_unlock(from); - ao2_unlock(to); ao2_cleanup(forward); @@ -717,6 +719,11 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic, return NULL; } + /* Forwards to ourselves are implicit. */ + if (to_topic == from_topic) { + return ao2_bump(forward); + } + forward->from_topic = ao2_bump(from_topic); forward->to_topic = ao2_bump(to_topic); diff --git a/main/stasis_message.c b/main/stasis_message.c index 1db2ae97a0c4a7a7f67b446a060fffba40be24c0..6132efc20d1f8c0fadd2f4c58a78f20f5ddd40e5 100644 --- a/main/stasis_message.c +++ b/main/stasis_message.c @@ -187,3 +187,8 @@ struct ast_json *stasis_message_to_json( { return INVOKE_VIRTUAL(to_json, msg, sanitize); } + +struct ast_event *stasis_message_to_event(struct stasis_message *msg) +{ + return INVOKE_VIRTUAL(to_event, msg); +} \ No newline at end of file diff --git a/res/res_corosync.c b/res/res_corosync.c index ce4165498cc7347269120a9695d982a6c4fc87da..6c4a3d1e71eb888af753bbcdb5179b0d15e7bead 100644 --- a/res/res_corosync.c +++ b/res/res_corosync.c @@ -44,20 +44,125 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); #include "asterisk/event.h" #include "asterisk/cli.h" #include "asterisk/devicestate.h" +#include "asterisk/app.h" +#include "asterisk/stasis.h" +#include "asterisk/stasis_message_router.h" AST_RWLOCK_DEFINE_STATIC(event_types_lock); +static void publish_mwi_to_stasis(struct ast_event *event); +static void publish_device_state_to_stasis(struct ast_event *event); + +/*! \brief The internal topic used for message forwarding and pings */ +static struct stasis_topic *corosync_aggregate_topic; + +/*! \brief Our \ref stasis message router */ +static struct stasis_message_router *stasis_router; + +/*! \brief Internal accessor for our topic */ +static struct stasis_topic *corosync_topic(void) +{ + return corosync_aggregate_topic; +} + +/*! \brief A payload wrapper around a corosync ping event */ +struct corosync_ping_payload { + /*! The corosync ping event being passed over \ref stasis */ + struct ast_event *event; +}; + +/*! \brief Destructor for the \ref corosync_ping_payload wrapper object */ +static void corosync_ping_payload_dtor(void *obj) +{ + struct corosync_ping_payload *payload = obj; + + ast_free(payload->event); +} + +/*! \brief Convert a Corosync PING to a \ref ast_event */ +static struct ast_event *corosync_ping_to_event(struct stasis_message *message) +{ + struct corosync_ping_payload *payload; + struct ast_event *event; + struct ast_eid *event_eid; + + if (!message) { + return NULL; + } + + payload = stasis_message_data(message); + + if (!payload->event) { + return NULL; + } + + event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID); + + event = ast_event_new(AST_EVENT_PING, + AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid), + AST_EVENT_IE_END); + + return event; +} + +STASIS_MESSAGE_TYPE_DEFN_LOCAL(corosync_ping_message_type, + .to_event = corosync_ping_to_event, ); + +/*! \brief Publish a Corosync ping to \ref stasis */ +static void publish_corosync_ping_to_stasis(struct ast_event *event) +{ + struct corosync_ping_payload *payload; + struct stasis_message *message; + + ast_assert(ast_event_get_type(event) == AST_EVENT_PING); + ast_assert(event != NULL); + + payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload"); + if (!payload) { + return; + } + payload->event = event; + + message = stasis_message_create(corosync_ping_message_type(), payload); + if (!message) { + ao2_t_ref(payload, -1, "Destroy payload on off nominal"); + return; + } + + stasis_publish(corosync_topic(), message); + + ao2_t_ref(payload, -1, "Hand ref to stasis"); + ao2_t_ref(message, -1, "Hand ref to stasis"); +} + static struct { const char *name; - struct ast_event_sub *sub; + struct stasis_forward *sub; unsigned char publish; unsigned char publish_default; unsigned char subscribe; unsigned char subscribe_default; + struct stasis_topic *(* topic_fn)(void); + struct stasis_cache *(* cache_fn)(void); + struct stasis_message_type *(* message_type_fn)(void); + void (* publish_to_stasis)(struct ast_event *); } event_types[] = { - [AST_EVENT_MWI] = { .name = "mwi", }, - [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state", }, - [AST_EVENT_PING] = { .name = "ping", .publish_default = 1, .subscribe_default = 1 }, + [AST_EVENT_MWI] = { .name = "mwi", + .topic_fn = ast_mwi_topic_all, + .cache_fn = ast_mwi_state_cache, + .message_type_fn = ast_mwi_state_type, + .publish_to_stasis = publish_mwi_to_stasis, }, + [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state", + .topic_fn = ast_device_state_topic_all, + .cache_fn = ast_device_state_cache, + .message_type_fn = ast_device_state_message_type, + .publish_to_stasis = publish_device_state_to_stasis, }, + [AST_EVENT_PING] = { .name = "ping", + .publish_default = 1, + .subscribe_default = 1, + .topic_fn = corosync_topic, + .message_type_fn = corosync_ping_message_type, + .publish_to_stasis = publish_corosync_ping_to_stasis, }, }; static struct { @@ -88,6 +193,71 @@ static corosync_cfg_callbacks_t cfg_callbacks = { .corosync_cfg_shutdown_callback = cfg_shutdown_cb, }; +/*! \brief Publish a received MWI \ref ast_event to \ref stasis */ +static void publish_mwi_to_stasis(struct ast_event *event) +{ + const char *mailbox; + const char *context; + unsigned int new_msgs; + unsigned int old_msgs; + struct ast_eid *event_eid; + + ast_assert(ast_event_get_type(event) == AST_EVENT_MWI); + + mailbox = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX); + context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT); + new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS); + old_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS); + event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID); + + if (ast_strlen_zero(mailbox) || ast_strlen_zero(context)) { + return; + } + + if (new_msgs > INT_MAX) { + new_msgs = INT_MAX; + } + + if (old_msgs > INT_MAX) { + old_msgs = INT_MAX; + } + + if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs, + (int)old_msgs, NULL, event_eid)) { + char eid[16]; + ast_eid_to_str(eid, sizeof(eid), event_eid); + ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n", + mailbox, context, eid); + } +} + +/*! \brief Publish a received device state \ref ast_event to \ref stasis */ +static void publish_device_state_to_stasis(struct ast_event *event) +{ + const char *device; + enum ast_device_state state; + unsigned int cachable; + struct ast_eid *event_eid; + + ast_assert(ast_event_get_type(event) == AST_EVENT_DEVICE_STATE_CHANGE); + + device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE); + state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE); + cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE); + event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID); + + if (ast_strlen_zero(device)) { + return; + } + + if (ast_publish_device_state_full(device, state, cachable, event_eid)) { + char eid[16]; + ast_eid_to_str(eid, sizeof(eid), event_eid); + ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n", + device, eid); + } +} + static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len); @@ -101,8 +271,6 @@ static cpg_callbacks_t cpg_callbacks = { .cpg_confchg_fn = cpg_confchg_cb, }; -static void ast_event_cb(const struct ast_event *event, void *data); - #ifdef HAVE_COROSYNC_CFG_STATE_TRACK static void cfg_state_track_cb( corosync_cfg_state_notification_buffer_t *notification_buffer, @@ -120,6 +288,8 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { struct ast_event *event; + void (*publish_handler)(struct ast_event *) = NULL; + enum ast_event_type event_type; if (msg_len < ast_event_minimum_length()) { ast_debug(1, "Ignoring event that's too small. %u < %u\n", @@ -133,9 +303,17 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam return; } + event_type = ast_event_get_type(msg); + if (event_type > AST_EVENT_TOTAL) { + /* Egads, we don't support this */ + return; + } + ast_rwlock_rdlock(&event_types_lock); - if (!event_types[ast_event_get_type(msg)].subscribe) { - /* We are not configured to subscribe to these events. */ + publish_handler = event_types[event_type].publish_to_stasis; + if (!event_types[event_type].subscribe || !publish_handler) { + /* We are not configured to subscribe to these events or + we have no way to publish it internally. */ ast_rwlock_unlock(&event_types_lock); return; } @@ -147,20 +325,80 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam memcpy(event, msg, msg_len); + if (event_type == AST_EVENT_PING) { + const struct ast_eid *eid; + char buf[128] = ""; + + eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID); + ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid); + ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf); + } + ast_debug(5, "Publishing event %s (%d) to stasis\n", + ast_event_get_type_name(event), event_type); + publish_handler(event); +} + +static void publish_to_corosync(struct stasis_message *message) +{ + cs_error_t cs_err; + struct iovec iov; + struct ast_event *event; + + event = stasis_message_to_event(message); + if (!event) { + return; + } + + if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) { + /* If the event didn't originate from this server, don't send it back out. */ + ast_event_destroy(event); + return; + } + if (ast_event_get_type(event) == AST_EVENT_PING) { const struct ast_eid *eid; char buf[128] = ""; eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID); ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid); - ast_log(LOG_NOTICE, "(cpg_deliver_cb) Got event PING from server with EID: '%s'\n", buf); + ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf); + } + + iov.iov_base = (void *)event; + iov.iov_len = ast_event_get_size(event); + + ast_debug(5, "Publishing event %s (%d) to corosync\n", + ast_event_get_type_name(event), ast_event_get_type(event)); - ast_event_queue(event); - } else { - ast_event_queue_and_cache(event); + /* The stasis subscription will only exist if we are configured to publish + * these events, so just send away. */ + if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) { + ast_log(LOG_WARNING, "CPG mcast failed (%d)\n", cs_err); } } +static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) +{ + if (!message) { + return; + } + + publish_to_corosync(message); +} + +static int dump_cache_cb(void *obj, void *arg, int flags) +{ + struct stasis_message *message = obj; + + if (!message) { + return 0; + } + + publish_to_corosync(message); + + return 0; +} + static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, @@ -176,20 +414,27 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam } for (i = 0; i < ARRAY_LEN(event_types); i++) { - struct ast_event_sub *event_sub; + struct ao2_container *messages; ast_rwlock_rdlock(&event_types_lock); if (!event_types[i].publish) { ast_rwlock_unlock(&event_types_lock); continue; } + + if (!event_types[i].cache_fn || !event_types[i].message_type_fn) { + ast_rwlock_unlock(&event_types_lock); + continue; + } + + messages = stasis_cache_dump_by_eid(event_types[i].cache_fn(), + event_types[i].message_type_fn(), + &ast_eid_default); ast_rwlock_unlock(&event_types_lock); - event_sub = ast_event_subscribe_new(i, ast_event_cb, NULL); - ast_event_sub_append_ie_raw(event_sub, AST_EVENT_IE_EID, - &ast_eid_default, sizeof(ast_eid_default)); - ast_event_dump_cache(event_sub); - ast_event_sub_destroy(event_sub); + ao2_callback(messages, OBJ_NODATA, dump_cache_cb, NULL); + + ao2_t_ref(messages, -1, "Dispose of dumped cache"); } } @@ -231,13 +476,13 @@ static void *dispatch_thread_handler(void *data) if (pfd[0].revents & POLLIN) { if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) { - ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err); + ast_log(LOG_WARNING, "Failed CPG dispatch: %d\n", cs_err); } } if (pfd[1].revents & POLLIN) { if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) { - ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err); + ast_log(LOG_WARNING, "Failed CFG dispatch: %d\n", cs_err); } } @@ -287,37 +532,6 @@ static void *dispatch_thread_handler(void *data) return NULL; } -static void ast_event_cb(const struct ast_event *event, void *data) -{ - cs_error_t cs_err; - struct iovec iov = { - .iov_base = (void *) event, - .iov_len = ast_event_get_size(event), - }; - - if (ast_event_get_type(event) == AST_EVENT_PING) { - const struct ast_eid *eid; - char buf[128] = ""; - - eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID); - ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid); - ast_log(LOG_NOTICE, "(ast_event_cb) Got event PING from server with EID: '%s'\n", buf); - } - - if (ast_eid_cmp(&ast_eid_default, - ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) { - /* If the event didn't originate from this server, don't send it back out. */ - return; - } - - /* The ast_event subscription will only exist if we are configured to publish - * these events, so just send away. */ - - if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) { - ast_log(LOG_WARNING, "CPG mcast failed (%u)\n", cs_err); - } -} - static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) { cs_error_t cs_err; @@ -368,7 +582,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_ continue; } - ast_cli(a->fd, "=== Node %u\n", i); + ast_cli(a->fd, "=== Node %d\n", i); ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value); for (j = 0; j < num_addrs; j++) { @@ -378,7 +592,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_ getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST); - ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf); + ast_cli(a->fd, "=== --> Address %d: %s\n", j + 1, buf); } } @@ -421,7 +635,9 @@ static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args return CLI_FAILURE; } - ast_event_queue(event); + ast_rwlock_rdlock(&event_types_lock); + event_types[AST_EVENT_PING].publish_to_stasis(event); + ast_rwlock_unlock(&event_types_lock); return CLI_SUCCESS; } @@ -532,11 +748,16 @@ static int load_general_config(struct ast_config *cfg) for (i = 0; i < ARRAY_LEN(event_types); i++) { if (event_types[i].publish && !event_types[i].sub) { - event_types[i].sub = ast_event_subscribe(i, - ast_event_cb, "Corosync", NULL, - AST_EVENT_IE_END); + event_types[i].sub = stasis_forward_all(event_types[i].topic_fn(), + corosync_topic()); + stasis_message_router_add(stasis_router, + event_types[i].message_type_fn(), + stasis_message_cb, + NULL); } else if (!event_types[i].publish && event_types[i].sub) { - event_types[i].sub = ast_event_unsubscribe(event_types[i].sub); + event_types[i].sub = stasis_forward_cancel(event_types[i].sub); + stasis_message_router_remove(stasis_router, + event_types[i].message_type_fn()); } } @@ -577,14 +798,32 @@ static void cleanup_module(void) cs_error_t cs_err; unsigned int i; - for (i = 0; i < ARRAY_LEN(event_types); i++) { - if (event_types[i].sub) { - event_types[i].sub = ast_event_unsubscribe(event_types[i].sub); + if (stasis_router) { + + /* Unsubscribe all topic forwards and cancel all message routes */ + ast_rwlock_wrlock(&event_types_lock); + for (i = 0; i < ARRAY_LEN(event_types); i++) { + if (event_types[i].sub) { + event_types[i].sub = stasis_forward_cancel(event_types[i].sub); + stasis_message_router_remove(stasis_router, + event_types[i].message_type_fn()); + } + event_types[i].publish = 0; + event_types[i].subscribe = 0; } - event_types[i].publish = 0; - event_types[i].subscribe = 0; + ast_rwlock_unlock(&event_types_lock); + + stasis_message_router_unsubscribe_and_join(stasis_router); + stasis_router = NULL; + } + + if (corosync_aggregate_topic) { + ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup"); + corosync_aggregate_topic = NULL; } + STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type); + if (dispatch_thread.id != AST_PTHREADT_NULL) { char meepmeep = 'x'; dispatch_thread.stop = 1; @@ -623,13 +862,30 @@ static int load_module(void) enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE; struct cpg_name name; + corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic"); + if (!corosync_aggregate_topic) { + ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n"); + goto failed; + } + + stasis_router = stasis_message_router_create(corosync_aggregate_topic); + if (!stasis_router) { + ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n"); + goto failed; + } + + if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) { + ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n"); + goto failed; + } + if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err); - return AST_MODULE_LOAD_DECLINE; + ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err); + goto failed; } if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err); + ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err); goto failed; } @@ -637,7 +893,7 @@ static int load_module(void) name.length = strlen(name.value); if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to join (%d)\n", (int) cs_err); + ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err); goto failed; }