Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
A
asterisk
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container registry
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Issue analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Voice
asterisk
Commits
4534f32f
Commit
4534f32f
authored
7 years ago
by
Jenkins2
Committed by
Gerrit Code Review
7 years ago
Browse files
Options
Downloads
Plain Diff
Merge "stasis: Remove silly usage of RAII_VAR."
parents
ae723f85
f0eb00d1
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
main/stasis.c
+63
-29
63 additions, 29 deletions
main/stasis.c
with
63 additions
and
29 deletions
main/stasis.c
+
63
−
29
View file @
4534f32f
...
...
@@ -422,10 +422,10 @@ static void subscription_invoke(struct stasis_subscription *sub,
{
/* Notify that the final message has been received */
if
(
stasis_subscription_final_message
(
sub
,
message
))
{
SCOPED_AO2LOCK
(
lock
,
sub
);
ao2_lock
(
sub
);
sub
->
final_message_rxed
=
1
;
ast_cond_signal
(
&
sub
->
join_cond
);
ao2_unlock
(
sub
);
}
/* Since sub is mostly immutable, no need to lock sub */
...
...
@@ -433,10 +433,10 @@ static void subscription_invoke(struct stasis_subscription *sub,
/* Notify that the final message has been processed */
if
(
stasis_subscription_final_message
(
sub
,
message
))
{
SCOPED_AO2LOCK
(
lock
,
sub
);
ao2_lock
(
sub
);
sub
->
final_message_processed
=
1
;
ast_cond_signal
(
&
sub
->
join_cond
);
ao2_unlock
(
sub
);
}
}
...
...
@@ -454,7 +454,7 @@ struct stasis_subscription *internal_stasis_subscribe(
int
needs_mailbox
,
int
use_thread_pool
)
{
RAII_VAR
(
struct
stasis_subscription
*
,
sub
,
NULL
,
ao2_cleanup
)
;
struct
stasis_subscription
*
sub
;
if
(
!
topic
)
{
return
NULL
;
...
...
@@ -486,6 +486,8 @@ struct stasis_subscription *internal_stasis_subscribe(
sub
->
mailbox
=
ast_taskprocessor_get
(
tps_name
,
TPS_REF_DEFAULT
);
}
if
(
!
sub
->
mailbox
)
{
ao2_ref
(
sub
,
-
1
);
return
NULL
;
}
ast_taskprocessor_set_local
(
sub
->
mailbox
,
sub
);
...
...
@@ -500,11 +502,12 @@ struct stasis_subscription *internal_stasis_subscribe(
ast_cond_init
(
&
sub
->
join_cond
,
NULL
);
if
(
topic_add_subscription
(
topic
,
sub
)
!=
0
)
{
ao2_ref
(
sub
,
-
1
);
return
NULL
;
}
send_subscription_subscribe
(
topic
,
sub
);
ao2_ref
(
sub
,
+
1
);
return
sub
;
}
...
...
@@ -535,18 +538,21 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
/* The subscription may be the last ref to this topic. Hold
* the topic ref open until after the unlock. */
RAII_VAR
(
struct
stasis_topic
*
,
topic
,
ao2_bump
(
sub
?
sub
->
topic
:
NULL
),
ao2_cleanup
);
struct
stasis_topic
*
topic
;
if
(
!
sub
)
{
return
NULL
;
}
topic
=
ao2_bump
(
sub
->
topic
);
/* We have to remove the subscription first, to ensure the unsubscribe
* is the final message */
if
(
topic_remove_subscription
(
sub
->
topic
,
sub
)
!=
0
)
{
ast_log
(
LOG_ERROR
,
"Internal error: subscription has invalid topic
\n
"
);
ao2_cleanup
(
topic
);
return
NULL
;
}
...
...
@@ -560,6 +566,8 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
/* Unsubscribing unrefs the subscription */
ao2_cleanup
(
sub
);
ao2_cleanup
(
topic
);
return
NULL
;
}
...
...
@@ -578,22 +586,26 @@ int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscr
void
stasis_subscription_join
(
struct
stasis_subscription
*
subscription
)
{
if
(
subscription
)
{
SCOPED_AO2LOCK
(
lock
,
subscription
);
ao2_lock
(
subscription
);
/* Wait until the processed flag has been set */
while
(
!
subscription
->
final_message_processed
)
{
ast_cond_wait
(
&
subscription
->
join_cond
,
ao2_object_get_lockaddr
(
subscription
));
}
ao2_unlock
(
subscription
);
}
}
int
stasis_subscription_is_done
(
struct
stasis_subscription
*
subscription
)
{
if
(
subscription
)
{
SCOPED_AO2LOCK
(
lock
,
subscription
)
;
int
ret
;
return
subscription
->
final_message_rxed
;
ao2_lock
(
subscription
);
ret
=
subscription
->
final_message_rxed
;
ao2_unlock
(
subscription
);
return
ret
;
}
/* Null subscription is about as done as you can get */
...
...
@@ -621,13 +633,15 @@ int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
if
(
sub
)
{
size_t
i
;
struct
stasis_topic
*
topic
=
sub
->
topic
;
SCOPED_AO2LOCK
(
lock_topic
,
topic
);
ao2_lock
(
topic
);
for
(
i
=
0
;
i
<
AST_VECTOR_SIZE
(
&
topic
->
subscribers
);
++
i
)
{
if
(
AST_VECTOR_GET
(
&
topic
->
subscribers
,
i
)
==
sub
)
{
ao2_unlock
(
topic
);
return
1
;
}
}
ao2_unlock
(
topic
);
}
return
0
;
...
...
@@ -668,8 +682,8 @@ int stasis_subscription_final_message(struct stasis_subscription *sub, struct st
static
int
topic_add_subscription
(
struct
stasis_topic
*
topic
,
struct
stasis_subscription
*
sub
)
{
size_t
idx
;
SCOPED_AO2LOCK
(
lock
,
topic
);
ao2_lock
(
topic
);
/* The reference from the topic to the subscription is shared with
* the owner of the subscription, which will explicitly unsubscribe
* to release it.
...
...
@@ -682,6 +696,7 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
topic_add_subscription
(
AST_VECTOR_GET
(
&
topic
->
upstream_topics
,
idx
),
sub
);
}
ao2_unlock
(
topic
);
return
0
;
}
...
...
@@ -689,15 +704,18 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
static
int
topic_remove_subscription
(
struct
stasis_topic
*
topic
,
struct
stasis_subscription
*
sub
)
{
size_t
idx
;
SCOPED_AO2LOCK
(
lock_topic
,
topic
)
;
int
res
;
ao2_lock
(
topic
);
for
(
idx
=
0
;
idx
<
AST_VECTOR_SIZE
(
&
topic
->
upstream_topics
);
++
idx
)
{
topic_remove_subscription
(
AST_VECTOR_GET
(
&
topic
->
upstream_topics
,
idx
),
sub
);
}
return
AST_VECTOR_REMOVE_ELEM_UNORDERED
(
&
topic
->
subscribers
,
sub
,
res
=
AST_VECTOR_REMOVE_ELEM_UNORDERED
(
&
topic
->
subscribers
,
sub
,
AST_VECTOR_ELEM_CLEANUP_NOOP
);
ao2_unlock
(
topic
);
return
res
;
}
/*!
...
...
@@ -1214,25 +1232,25 @@ static void multi_object_blob_dtor(void *obj)
struct
ast_multi_object_blob
*
ast_multi_object_blob_create
(
struct
ast_json
*
blob
)
{
int
type
;
RAII_VAR
(
struct
ast_multi_object_blob
*
,
multi
,
ao2_alloc
(
sizeof
(
*
multi
),
multi_object_blob_dtor
),
ao2_cleanup
);
struct
ast_multi_object_blob
*
multi
;
ast_assert
(
blob
!=
NULL
);
multi
=
ao2_alloc
(
sizeof
(
*
multi
),
multi_object_blob_dtor
);
if
(
!
multi
)
{
return
NULL
;
}
for
(
type
=
0
;
type
<
STASIS_UMOS_MAX
;
++
type
)
{
if
(
AST_VECTOR_INIT
(
&
multi
->
snapshots
[
type
],
0
))
{
ao2_ref
(
multi
,
-
1
);
return
NULL
;
}
}
multi
->
blob
=
ast_json_ref
(
blob
);
ao2_ref
(
multi
,
+
1
);
return
multi
;
}
...
...
@@ -1249,9 +1267,9 @@ void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
void
ast_multi_object_blob_single_channel_publish
(
struct
ast_channel
*
chan
,
struct
stasis_message_type
*
type
,
struct
ast_json
*
blob
)
{
RAII_VAR
(
struct
stasis_message
*
,
message
,
NULL
,
ao2_cleanup
)
;
RAII_VAR
(
struct
ast_channel_snapshot
*
,
channel_snapshot
,
NULL
,
ao2_cleanup
)
;
RAII_VAR
(
struct
ast_multi_object_blob
*
,
multi
,
NULL
,
ao2_cleanup
)
;
struct
stasis_message
*
message
;
struct
ast_channel_snapshot
*
channel_snapshot
;
struct
ast_multi_object_blob
*
multi
;
if
(
!
type
)
{
return
;
...
...
@@ -1263,13 +1281,20 @@ void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
}
channel_snapshot
=
ast_channel_snapshot_create
(
chan
);
ao2_ref
(
channel_snapshot
,
+
1
);
if
(
!
channel_snapshot
)
{
ao2_ref
(
multi
,
-
1
);
return
;
}
/* this call steals the channel_snapshot reference */
ast_multi_object_blob_add
(
multi
,
STASIS_UMOS_CHANNEL
,
channel_snapshot
);
message
=
stasis_message_create
(
type
,
multi
);
ao2_ref
(
multi
,
-
1
);
if
(
message
)
{
/* app_userevent still publishes to channel */
stasis_publish
(
ast_channel_topic
(
chan
),
message
);
ao2_ref
(
message
,
-
1
);
}
}
...
...
@@ -1278,7 +1303,7 @@ static struct ast_json *multi_user_event_to_json(
struct
stasis_message
*
message
,
const
struct
stasis_message_sanitizer
*
sanitize
)
{
RAII_VAR
(
struct
ast_json
*
,
out
,
NULL
,
ast_json_unref
)
;
struct
ast_json
*
out
;
struct
ast_multi_object_blob
*
multi
=
stasis_message_data
(
message
);
struct
ast_json
*
blob
=
multi
->
blob
;
const
struct
timeval
*
tv
=
stasis_message_timestamp
(
message
);
...
...
@@ -1320,7 +1345,8 @@ static struct ast_json *multi_user_event_to_json(
}
}
}
return
ast_json_ref
(
out
);
return
out
;
}
/*! \internal \brief convert multi object blob to ami string */
...
...
@@ -1513,17 +1539,19 @@ static void *stasis_config_alloc(void)
int
stasis_message_type_declined
(
const
char
*
name
)
{
RAII_VAR
(
struct
stasis_config
*
,
cfg
,
ao2_global_obj_ref
(
globals
)
,
ao2_cleanup
)
;
struct
stasis_config
*
cfg
=
ao2_global_obj_ref
(
globals
);
char
*
name_in_declined
;
int
res
;
if
(
!
cfg
||
!
cfg
->
declined_message_types
)
{
ao2_cleanup
(
cfg
);
return
0
;
}
name_in_declined
=
ao2_find
(
cfg
->
declined_message_types
->
declined
,
name
,
OBJ_SEARCH_KEY
);
res
=
name_in_declined
?
1
:
0
;
ao2_cleanup
(
name_in_declined
);
ao2_ref
(
cfg
,
-
1
);
if
(
res
)
{
ast_log
(
LOG_NOTICE
,
"Declining to allocate Stasis message type '%s' due to configuration
\n
"
,
name
);
}
...
...
@@ -1569,7 +1597,7 @@ static void stasis_cleanup(void)
int
stasis_init
(
void
)
{
RAII_VAR
(
struct
stasis_config
*
,
cfg
,
NULL
,
ao2_cleanup
)
;
struct
stasis_config
*
cfg
;
int
cache_init
;
struct
ast_threadpool_options
threadpool_opts
=
{
0
,
};
...
...
@@ -1605,11 +1633,14 @@ int stasis_init(void)
if
(
aco_set_defaults
(
&
threadpool_option
,
"threadpool"
,
default_cfg
->
threadpool_options
))
{
ast_log
(
LOG_ERROR
,
"Failed to initialize defaults on Stasis configuration object
\n
"
);
ao2_ref
(
default_cfg
,
-
1
);
return
-
1
;
}
if
(
aco_set_defaults
(
&
declined_option
,
"declined_message_types"
,
default_cfg
->
declined_message_types
))
{
ast_log
(
LOG_ERROR
,
"Failed to load stasis.conf and failed to initialize defaults.
\n
"
);
ao2_ref
(
default_cfg
,
-
1
);
return
-
1
;
}
...
...
@@ -1620,6 +1651,7 @@ int stasis_init(void)
cfg
=
ao2_global_obj_ref
(
globals
);
if
(
!
cfg
)
{
ast_log
(
LOG_ERROR
,
"Failed to obtain Stasis configuration object
\n
"
);
return
-
1
;
}
}
...
...
@@ -1630,8 +1662,10 @@ int stasis_init(void)
threadpool_opts
.
max_size
=
cfg
->
threadpool_options
->
max_size
;
threadpool_opts
.
idle_timeout
=
cfg
->
threadpool_options
->
idle_timeout_sec
;
pool
=
ast_threadpool_create
(
"stasis-core"
,
NULL
,
&
threadpool_opts
);
ao2_ref
(
cfg
,
-
1
);
if
(
!
pool
)
{
ast_log
(
LOG_ERROR
,
"Failed to create 'stasis-core' threadpool
\n
"
);
return
-
1
;
}
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment