Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
A
asterisk
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container Registry
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Issue analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Voice
asterisk
Commits
4aa137f0
Commit
4aa137f0
authored
8 years ago
by
zuul
Committed by
Gerrit Code Review
8 years ago
Browse files
Options
Downloads
Plain Diff
Merge "res_pjsip_outbound_publish: Use a serializer shutdown group for unload."
parents
9042ad40
54869e48
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
res/res_pjsip_outbound_publish.c
+61
-82
61 additions, 82 deletions
res/res_pjsip_outbound_publish.c
with
61 additions
and
82 deletions
res/res_pjsip_outbound_publish.c
+
61
−
82
View file @
4aa137f0
...
...
@@ -32,6 +32,7 @@
#include
"asterisk/res_pjsip_outbound_publish.h"
#include
"asterisk/module.h"
#include
"asterisk/taskprocessor.h"
#include
"asterisk/threadpool.h"
#include
"asterisk/datastore.h"
/*** DOCUMENTATION
...
...
@@ -204,6 +205,8 @@ struct sip_outbound_publisher {
struct
sip_outbound_publish_message
*
sending
;
/*! \brief Publish client should be destroyed */
unsigned
int
destroy
;
/*! \brief Serializer for stuff and things */
struct
ast_taskprocessor
*
serializer
;
/*! \brief User, if any, associated with the publisher */
char
user
[
0
];
};
...
...
@@ -242,13 +245,11 @@ AST_RWLOCK_DEFINE_STATIC(load_lock);
AO2_STRING_FIELD_HASH_FN
(
sip_outbound_publisher
,
user
);
AO2_STRING_FIELD_CMP_FN
(
sip_outbound_publisher
,
user
);
/*! \brief Unloading data */
struct
unloading_data
{
int
is_unloading
;
int
count
;
ast_mutex_t
lock
;
ast_cond_t
cond
;
}
unloading
;
/*! Time needs to be long enough for a transaction to timeout if nothing replies. */
#define MAX_UNLOAD_TIMEOUT_TIME 35
/* Seconds */
/*! Shutdown group to monitor sip_outbound_registration_client_state serializers. */
static
struct
ast_serializer_shutdown_group
*
shutdown_group
;
/*! \brief Default number of client state container buckets */
#define DEFAULT_STATE_BUCKETS 31
...
...
@@ -772,7 +773,7 @@ fatal:
ast_free
(
message
);
service:
if
(
ast_sip_push_task
(
NULL
,
sip_publisher_service_queue
,
ao2_bump
(
publisher
)))
{
if
(
ast_sip_push_task
(
publisher
->
serializer
,
sip_publisher_service_queue
,
ao2_bump
(
publisher
)))
{
ao2_ref
(
publisher
,
-
1
);
}
return
-
1
;
...
...
@@ -815,7 +816,7 @@ static int publisher_client_send(void *obj, void *arg, void *data, int flags)
AST_LIST_INSERT_TAIL
(
&
publisher
->
queue
,
message
,
entry
);
*
res
=
ast_sip_push_task
(
NULL
,
sip_publisher_service_queue
,
ao2_bump
(
publisher
));
*
res
=
ast_sip_push_task
(
publisher
->
serializer
,
sip_publisher_service_queue
,
ao2_bump
(
publisher
));
if
(
*
res
)
{
ao2_ref
(
publisher
,
-
1
);
}
...
...
@@ -1008,25 +1009,17 @@ static void sip_outbound_publisher_destroy(void *obj)
}
ao2_cleanup
(
publisher
->
owner
);
ast_free
(
publisher
->
from_uri
);
ast_free
(
publisher
->
to_uri
);
/* if unloading the module and all objects have been unpublished
send the signal to finish unloading */
if
(
unloading
.
is_unloading
)
{
ast_mutex_lock
(
&
unloading
.
lock
);
if
(
--
unloading
.
count
==
0
)
{
ast_cond_signal
(
&
unloading
.
cond
);
}
ast_mutex_unlock
(
&
unloading
.
lock
);
}
ast_taskprocessor_unreference
(
publisher
->
serializer
);
}
static
struct
sip_outbound_publisher
*
sip_outbound_publisher_alloc
(
struct
ast_sip_outbound_publish_client
*
client
,
const
char
*
user
)
{
struct
sip_outbound_publisher
*
publisher
;
char
tps_name
[
AST_TASKPROCESSOR_MAX_NAME
+
1
];
publisher
=
ao2_alloc
(
sizeof
(
*
publisher
)
+
(
user
?
strlen
(
user
)
:
0
)
+
1
,
sip_outbound_publisher_destroy
);
...
...
@@ -1054,6 +1047,16 @@ static struct sip_outbound_publisher *sip_outbound_publisher_alloc(
*
publisher
->
user
=
'\0'
;
}
ast_taskprocessor_build_name
(
tps_name
,
sizeof
(
tps_name
),
"pjsip/outpub/%s"
,
ast_sorcery_object_get_id
(
client
->
publish
));
publisher
->
serializer
=
ast_sip_create_serializer_group
(
tps_name
,
shutdown_group
);
if
(
!
publisher
->
serializer
)
{
ao2_ref
(
publisher
,
-
1
);
return
NULL
;
}
if
(
ast_sip_push_task_synchronous
(
NULL
,
sip_outbound_publisher_init
,
publisher
))
{
ast_log
(
LOG_ERROR
,
"Unable to create publisher for outbound publish '%s'
\n
"
,
ast_sorcery_object_get_id
(
client
->
publish
));
...
...
@@ -1079,7 +1082,7 @@ static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher(
* No need to bump the reference here. The task will take care of
* removing the reference.
*/
if
(
ast_sip_push_task
(
NULL
,
cancel_refresh_timer_task
,
publisher
))
{
if
(
ast_sip_push_task
(
publisher
->
serializer
,
cancel_refresh_timer_task
,
publisher
))
{
ao2_ref
(
publisher
,
-
1
);
}
return
NULL
;
...
...
@@ -1142,13 +1145,13 @@ static int cancel_and_unpublish(void *obj, void *arg, int flags)
/* If the publisher was never started, there's nothing to unpublish, so just
* destroy the publication and remove its reference to the publisher.
*/
if
(
ast_sip_push_task
(
NULL
,
explicit_publish_destroy
,
ao2_bump
(
publisher
)))
{
if
(
ast_sip_push_task
(
publisher
->
serializer
,
explicit_publish_destroy
,
ao2_bump
(
publisher
)))
{
ao2_ref
(
publisher
,
-
1
);
}
return
0
;
}
if
(
ast_sip_push_task
(
NULL
,
cancel_refresh_timer_task
,
ao2_bump
(
publisher
)))
{
if
(
ast_sip_push_task
(
publisher
->
serializer
,
cancel_refresh_timer_task
,
ao2_bump
(
publisher
)))
{
ast_log
(
LOG_WARNING
,
"Could not stop refresh timer on outbound publish '%s'
\n
"
,
ast_sorcery_object_get_id
(
client
->
publish
));
ao2_ref
(
publisher
,
-
1
);
...
...
@@ -1156,7 +1159,7 @@ static int cancel_and_unpublish(void *obj, void *arg, int flags)
/* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
if
(
!
publisher
->
sending
)
{
if
(
ast_sip_push_task
(
NULL
,
send_unpublish_task
,
ao2_bump
(
publisher
)))
{
if
(
ast_sip_push_task
(
publisher
->
serializer
,
send_unpublish_task
,
ao2_bump
(
publisher
)))
{
ast_log
(
LOG_WARNING
,
"Could not send unpublish message on outbound publish '%s'
\n
"
,
ast_sorcery_object_get_id
(
client
->
publish
));
ao2_ref
(
publisher
,
-
1
);
...
...
@@ -1255,7 +1258,7 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
if
(
publisher
->
sending
)
{
publisher
->
sending
=
NULL
;
if
(
!
ast_sip_push_task
(
NULL
,
send_unpublish_task
,
ao2_bump
(
publisher
)))
{
if
(
!
ast_sip_push_task
(
publisher
->
serializer
,
send_unpublish_task
,
ao2_bump
(
publisher
)))
{
return
;
}
ast_log
(
LOG_WARNING
,
"Could not send unpublish message on outbound publish '%s'
\n
"
,
...
...
@@ -1336,7 +1339,7 @@ end:
ast_free
(
message
);
}
}
else
{
if
(
ast_sip_push_task
(
NULL
,
sip_publisher_service_queue
,
ao2_bump
(
publisher
)))
{
if
(
ast_sip_push_task
(
publisher
->
serializer
,
sip_publisher_service_queue
,
ao2_bump
(
publisher
)))
{
ao2_ref
(
publisher
,
-
1
);
}
}
...
...
@@ -1431,6 +1434,7 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
ao2_ref
(
state
,
-
1
);
return
NULL
;
}
state
->
client
->
publish
=
ao2_bump
(
publish
);
strcpy
(
state
->
id
,
id
);
...
...
@@ -1580,6 +1584,32 @@ static int outbound_auth_handler(const struct aco_option *opt, struct ast_variab
return
ast_sip_auth_vector_init
(
&
publish
->
outbound_auths
,
var
->
value
);
}
static
int
unload_module
(
void
)
{
int
remaining
;
ast_sorcery_object_unregister
(
ast_sip_get_sorcery
(),
"outbound-publish"
);
ao2_global_obj_release
(
current_states
);
/* Wait for publication serializers to get destroyed. */
ast_debug
(
2
,
"Waiting for publication to complete for unload.
\n
"
);
remaining
=
ast_serializer_shutdown_group_join
(
shutdown_group
,
MAX_UNLOAD_TIMEOUT_TIME
);
if
(
remaining
)
{
ast_log
(
LOG_WARNING
,
"Unload incomplete. Could not stop %d outbound publications. Try again later.
\n
"
,
remaining
);
return
-
1
;
}
ast_debug
(
2
,
"Successful shutdown.
\n
"
);
ao2_cleanup
(
shutdown_group
);
shutdown_group
=
NULL
;
return
0
;
}
static
int
load_module
(
void
)
{
CHECK_PJSIP_MODULE_LOADED
();
...
...
@@ -1587,12 +1617,18 @@ static int load_module(void)
/* As of pjproject 2.4.5, PJSIP_MAX_URL_SIZE isn't exposed yet but we try anyway. */
ast_pjproject_get_buildopt
(
"PJSIP_MAX_URL_SIZE"
,
"%d"
,
&
pjsip_max_url_size
);
shutdown_group
=
ast_serializer_shutdown_group_alloc
();
if
(
!
shutdown_group
)
{
return
AST_MODULE_LOAD_FAILURE
;
}
ast_sorcery_apply_config
(
ast_sip_get_sorcery
(),
"res_pjsip_outbound_publish"
);
ast_sorcery_apply_default
(
ast_sip_get_sorcery
(),
"outbound-publish"
,
"config"
,
"pjsip.conf,criteria=type=outbound-publish"
);
if
(
ast_sorcery_object_register
(
ast_sip_get_sorcery
(),
"outbound-publish"
,
sip_outbound_publish_alloc
,
NULL
,
sip_outbound_publish_apply
))
{
ast_log
(
LOG_ERROR
,
"Unable to register 'outbound-publish' type with sorcery
\n
"
);
unload_module
();
return
AST_MODULE_LOAD_DECLINE
;
}
...
...
@@ -1629,63 +1665,6 @@ static int reload_module(void)
return
0
;
}
static
int
current_publishing_count
(
void
*
obj
,
void
*
arg
,
int
flags
)
{
struct
ast_sip_outbound_publish_state
*
state
=
obj
;
unloading
.
count
+=
ao2_container_count
(
state
->
client
->
publishers
);
return
0
;
}
static
int
unload_module
(
void
)
{
struct
timeval
start
=
ast_tvnow
();
struct
timespec
end
=
{
.
tv_sec
=
start
.
tv_sec
+
10
,
.
tv_nsec
=
start
.
tv_usec
*
1000
};
int
res
=
0
;
struct
ao2_container
*
states
=
ao2_global_obj_ref
(
current_states
);
if
(
!
states
)
{
return
0
;
}
unloading
.
count
=
0
;
ao2_callback
(
states
,
OBJ_NODATA
,
current_publishing_count
,
NULL
);
ao2_ref
(
states
,
-
1
);
if
(
!
unloading
.
count
)
{
return
0
;
}
ast_mutex_init
(
&
unloading
.
lock
);
ast_cond_init
(
&
unloading
.
cond
,
NULL
);
ast_mutex_lock
(
&
unloading
.
lock
);
unloading
.
is_unloading
=
1
;
ao2_global_obj_release
(
current_states
);
/* wait for items to unpublish */
ast_verb
(
5
,
"Waiting to complete unpublishing task(s)
\n
"
);
while
(
unloading
.
count
&&
!
res
)
{
res
=
ast_cond_timedwait
(
&
unloading
.
cond
,
&
unloading
.
lock
,
&
end
);
}
ast_mutex_unlock
(
&
unloading
.
lock
);
ast_mutex_destroy
(
&
unloading
.
lock
);
ast_cond_destroy
(
&
unloading
.
cond
);
if
(
res
)
{
ast_verb
(
5
,
"At least %d items were unable to unpublish "
"in the allowed time
\n
"
,
unloading
.
count
);
}
else
{
ast_verb
(
5
,
"All items successfully unpublished
\n
"
);
ast_sorcery_object_unregister
(
ast_sip_get_sorcery
(),
"outbound-publish"
);
}
return
res
;
}
AST_MODULE_INFO
(
ASTERISK_GPL_KEY
,
AST_MODFLAG_GLOBAL_SYMBOLS
|
AST_MODFLAG_LOAD_ORDER
,
"PJSIP Outbound Publish Support"
,
.
load
=
load_module
,
.
reload
=
reload_module
,
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment