From 4c089efc4a67574cbd1016a0b4111c372b627099 Mon Sep 17 00:00:00 2001 From: Tom Yan Date: Tue, 26 Feb 2019 22:50:47 +0800 Subject: [PATCH] libpulseaudio: render via process_msg/asyncmsgq --- packages/libpulseaudio/module-aaudio-sink.c | 240 ++++++++++++-------- packages/libpulseaudio/module-sles-sink.c | 106 ++++++--- 2 files changed, 211 insertions(+), 135 deletions(-) diff --git a/packages/libpulseaudio/module-aaudio-sink.c b/packages/libpulseaudio/module-aaudio-sink.c index 45852d0e9..f8276c4e6 100644 --- a/packages/libpulseaudio/module-aaudio-sink.c +++ b/packages/libpulseaudio/module-aaudio-sink.c @@ -52,11 +52,17 @@ PA_MODULE_USAGE( "sink_properties= " "rate= " "latency= " + "pm= " "no_close_hack= " ); #define DEFAULT_SINK_NAME "AAudio sink" +enum { + SINK_MESSAGE_RENDER = PA_SINK_MESSAGE_MAX, + SINK_MESSAGE_OPEN_STREAM +}; + struct userdata { pa_core *core; pa_module *module; @@ -65,9 +71,12 @@ struct userdata { pa_thread *thread; pa_thread_mq thread_mq; pa_rtpoll *rtpoll; + pa_rtpoll_item *rtpoll_item; + pa_asyncmsgq *aaudio_msgq; - uint32_t latency; uint32_t rate; + uint32_t latency; + uint32_t pm; bool no_close; pa_memchunk memchunk; @@ -83,55 +92,42 @@ static const char* const valid_modargs[] = { "sink_properties", "rate", "latency", + "pm", "no_close_hack", NULL }; -static void update_latency(struct userdata *u) { - pa_usec_t block_usec; +static int process_render(struct userdata *u, void *audioData, int64_t numFrames) { + pa_assert(u->sink->thread_info.state != PA_SINK_INIT); - if(!u->latency) { - block_usec = PA_USEC_PER_SEC * AAudioStream_getBufferSizeInFrames(u->stream) / u->sink->sample_spec.rate / 2; - if(!pa_thread_mq_get()) - pa_sink_set_fixed_latency(u->sink, block_usec); - else - pa_sink_set_fixed_latency_within_thread(u->sink, block_usec); - } -} + /* a render message could be queued after a set state message */ + if (!PA_SINK_IS_LINKED(u->sink->thread_info.state)) + return AAUDIO_CALLBACK_RESULT_STOP; -static aaudio_data_callback_result_t buffer_callback(AAudioStream *stream, void *userdata, void *audioData, int32_t numFrames) { - struct userdata* u = userdata; - - pa_assert(u); - - if (!pa_thread_mq_get()) { - pa_log_debug("Thread starting up"); - pa_thread_mq_install(&u->thread_mq); - } - - u->memchunk.memblock = pa_memblock_new_fixed(u->core->mempool, audioData, numFrames * u->frame_size, false); - u->memchunk.length = numFrames * u->frame_size; + u->memchunk.memblock = pa_memblock_new_fixed(u->core->mempool, audioData, u->frame_size * numFrames, false); + u->memchunk.length = pa_memblock_get_length(u->memchunk.memblock); pa_sink_render_into_full(u->sink, &u->memchunk); pa_memblock_unref_fixed(u->memchunk.memblock); return AAUDIO_CALLBACK_RESULT_CONTINUE; } +static aaudio_data_callback_result_t data_callback(AAudioStream *stream, void *userdata, void *audioData, int32_t numFrames) { + struct userdata* u = userdata; + + pa_assert(u); + + return pa_asyncmsgq_send(u->aaudio_msgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_RENDER, audioData, numFrames, NULL); +} + static void error_callback(AAudioStream *stream, void *userdata, aaudio_result_t error) { struct userdata* u = userdata; pa_assert(u); if (error == AAUDIO_ERROR_DISCONNECTED) { - if (!pa_thread_mq_get()) { - pa_sink_suspend(u->sink, true, PA_SUSPEND_UNAVAILABLE); - pa_sink_suspend(u->sink, false, PA_SUSPEND_UNAVAILABLE); - } else { - AAudioStream_requestStop(u->stream); - AAudioStream_requestStart(u->stream); - update_latency(u); - pa_log("Failed to reconfigure sink for new device.\n"); - } + pa_sink_suspend(u->sink, true, PA_SUSPEND_UNAVAILABLE); + pa_sink_suspend(u->sink, false, PA_SUSPEND_UNAVAILABLE); } } @@ -150,8 +146,8 @@ static int pa_open_aaudio_stream(struct userdata *u) pa_sample_spec *ss = &u->ss; CHK(AAudio_createStreamBuilder(&u->builder)); - AAudioStreamBuilder_setPerformanceMode(u->builder, AAUDIO_PERFORMANCE_MODE_LOW_LATENCY); - AAudioStreamBuilder_setDataCallback(u->builder, buffer_callback, u); + AAudioStreamBuilder_setPerformanceMode(u->builder, AAUDIO_PERFORMANCE_MODE_NONE + u->pm); + AAudioStreamBuilder_setDataCallback(u->builder, data_callback, u); AAudioStreamBuilder_setErrorCallback(u->builder, error_callback, u); want_float = ss->format > PA_SAMPLE_S16BE; @@ -178,68 +174,44 @@ fail: #undef CHK -static void thread_func(void *userdata) { - struct userdata *u = userdata; +static pa_usec_t get_latency(struct userdata *u) { + if(!u->latency) { + return PA_USEC_PER_SEC * AAudioStream_getBufferSizeInFrames(u->stream) / u->ss.rate / 2; + } else { + return PA_USEC_PER_MSEC * u->latency; + } +} + +static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *memchunk) { + struct userdata* u = PA_SINK(o)->userdata; pa_assert(u); - pa_log_debug("Thread starting up"); - pa_thread_mq_install(&u->thread_mq); - - for (;;) { - int ret; - - if (PA_SINK_IS_LINKED(u->sink->thread_info.state)) { - AAudioStream_requestStart(u->stream); - update_latency(u); + switch (code) { + case SINK_MESSAGE_RENDER: + return process_render(u, data, offset); + case SINK_MESSAGE_OPEN_STREAM: + if (pa_open_aaudio_stream(u) < 0) { + pa_log("pa_open_aaudio_stream() failed."); + return -1; + } + code = PA_SINK_MESSAGE_SET_FIXED_LATENCY; + offset = get_latency(u); break; - } - - /* Hmm, nothing to do. Let's sleep */ - if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) - goto fail; - - if (ret == 0) - goto finish; } - for (;;) { - int ret; + return pa_sink_process_msg(o, code, data, offset, memchunk); +}; - /* Hmm, nothing to do. Let's sleep */ - if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) - goto fail; - - if (ret == 0) - goto finish; - } - -fail: - /* If this was no regular exit from the loop we have to continue - * processing messages until we received PA_MESSAGE_SHUTDOWN */ - pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); - pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); - -finish: - pa_log_debug("Thread shutting down"); -} - -static int state_func(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) { +static int state_func_main(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) { struct userdata *u = s->userdata; - int r = 0; uint32_t idx; pa_sink_input *i; pa_idxset *inputs; - if ((PA_SINK_IS_OPENED(s->state) && state == PA_SINK_SUSPENDED) || - (PA_SINK_IS_LINKED(s->state) && state == PA_SINK_UNLINKED)) { - if (u->no_close) { - AAudioStream_requestStop(u->stream); - } else { - AAudioStream_close(u->stream); - } - } else if (s->state == PA_SINK_SUSPENDED && PA_SINK_IS_OPENED(state)) { - pa_open_aaudio_stream(u); + if (s->state == PA_SINK_SUSPENDED && PA_SINK_IS_OPENED(state)) { + if (pa_asyncmsgq_send(u->aaudio_msgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_OPEN_STREAM, NULL, 0, NULL) < 0) + return -1; inputs = pa_idxset_copy(s->inputs, NULL); PA_IDXSET_FOREACH(i, inputs, idx) { @@ -260,11 +232,31 @@ static int state_func(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t susp PA_IDXSET_FOREACH(i, inputs, idx) pa_sink_input_cork(i, false); pa_idxset_free(inputs, NULL); - - AAudioStream_requestStart(u->stream); - update_latency(u); } - return r; + return 0; +} + +static int state_func_io(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) { + struct userdata *u = s->userdata; + + if ((PA_SINK_IS_OPENED(s->thread_info.state) && state == PA_SINK_SUSPENDED) || + (PA_SINK_IS_LINKED(s->thread_info.state) && state == PA_SINK_UNLINKED)) { + AAudioStream_requestStop(u->stream); + if (!u->no_close) + AAudioStream_close(u->stream); + } else if (s->thread_info.state == PA_SINK_SUSPENDED && PA_SINK_IS_OPENED(state)) { + if (AAudioStream_requestStart(u->stream) < 0) + pa_log("AAudioStream_requestStart() failed."); + } else if (s->thread_info.state == PA_SINK_INIT && PA_SINK_IS_LINKED(state)) { + if (PA_SINK_IS_OPENED(state)) { + if (AAudioStream_requestStart(u->stream) < 0) + pa_log("AAudioStream_requestStart() failed."); + } else { + if (!u->no_close) + AAudioStream_close(u->stream); + } + } + return 0; } static int reconfigure_func(pa_sink *s, pa_sample_spec *ss, bool passthrough) { @@ -276,12 +268,39 @@ static void process_rewind(pa_sink *s) { pa_sink_process_rewind(s, 0); } +static void thread_func(void *userdata) { + struct userdata *u = userdata; + + pa_assert(u); + + pa_log_debug("Thread starting up"); + pa_thread_mq_install(&u->thread_mq); + + for (;;) { + int ret; + + if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) + goto fail; + + if (ret == 0) + goto finish; + } + +fail: + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ + pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); + pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); + +finish: + pa_log_debug("Thread shutting down"); +} + int pa__init(pa_module*m) { struct userdata *u = NULL; pa_channel_map map; pa_modargs *ma = NULL; pa_sink_new_data data; - pa_usec_t block_usec; pa_assert(m); @@ -290,7 +309,25 @@ int pa__init(pa_module*m) { u->core = m->core; u->module = m; u->rtpoll = pa_rtpoll_new(); - pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll); + + if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) { + pa_log("pa_thread_mq_init() failed."); + goto fail; + } + + /* The queue linking the AudioTrack thread and our RT thread */ + u->aaudio_msgq = pa_asyncmsgq_new(0); + if (!u->aaudio_msgq) { + pa_log("pa_asyncmsgq_new() failed."); + goto fail; + } + + /* The msgq from the AudioTrack RT thread should have an even higher + * priority than the normal message queues, to match the guarantee + * all other drivers make: supplying the audio device with data is + * the top priority -- and as long as that is possible we don't do + * anything else */ + u->rtpoll_item = pa_rtpoll_item_new_asyncmsgq_read(u->rtpoll, PA_RTPOLL_EARLY-1, u->aaudio_msgq); if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { pa_log("Failed to parse module arguments."); @@ -301,6 +338,11 @@ int pa__init(pa_module*m) { map = m->core->default_channel_map; pa_modargs_get_sample_rate(ma, &u->rate); + pa_modargs_get_value_u32(ma, "latency", &u->latency); + + u->pm = AAUDIO_PERFORMANCE_MODE_LOW_LATENCY - AAUDIO_PERFORMANCE_MODE_NONE; + pa_modargs_get_value_u32(ma, "pm", &u->pm); + pa_modargs_get_value_boolean(ma, "no_close_hack", &u->no_close); if (pa_open_aaudio_stream(u) < 0) @@ -330,20 +372,16 @@ int pa__init(pa_module*m) { goto fail; } - u->sink->parent.process_msg = pa_sink_process_msg; - u->sink->set_state_in_main_thread = state_func; + u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_main_thread = state_func_main; + u->sink->set_state_in_io_thread = state_func_io; u->sink->reconfigure = reconfigure_func; u->sink->request_rewind = process_rewind; u->sink->userdata = u; pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); pa_sink_set_rtpoll(u->sink, u->rtpoll); - - pa_modargs_get_value_u32(ma, "latency", &u->latency); - if (u->latency) { - block_usec = PA_USEC_PER_MSEC * u->latency; - pa_sink_set_fixed_latency(u->sink, block_usec); - } + pa_sink_set_fixed_latency(u->sink, get_latency(u)); if (!(u->thread = pa_thread_new("aaudio-sink", thread_func, u))) { pa_log("Failed to create thread."); @@ -395,6 +433,12 @@ void pa__done(pa_module*m) { if (u->sink) pa_sink_unref(u->sink); + if (u->rtpoll_item) + pa_rtpoll_item_free(u->rtpoll_item); + + if (u->aaudio_msgq) + pa_asyncmsgq_unref(u->aaudio_msgq); + if (u->rtpoll) pa_rtpoll_free(u->rtpoll); diff --git a/packages/libpulseaudio/module-sles-sink.c b/packages/libpulseaudio/module-sles-sink.c index 365de12f7..33c699221 100644 --- a/packages/libpulseaudio/module-sles-sink.c +++ b/packages/libpulseaudio/module-sles-sink.c @@ -58,6 +58,10 @@ PA_MODULE_USAGE( #define DEFAULT_SINK_NAME "OpenSL ES sink" #define BLOCK_USEC (PA_USEC_PER_MSEC * 125) +enum { + SINK_MESSAGE_RENDER = PA_SINK_MESSAGE_MAX +}; + struct userdata { pa_core *core; pa_module *module; @@ -66,6 +70,8 @@ struct userdata { pa_thread *thread; pa_thread_mq thread_mq; pa_rtpoll *rtpoll; + pa_rtpoll_item *rtpoll_item; + pa_asyncmsgq *sles_msgq; pa_usec_t block_usec; @@ -90,19 +96,36 @@ static const char* const valid_modargs[] = { NULL }; -static void process_render(SLBufferQueueItf bq, void *userdata) { +static void process_render(void *userdata) { struct userdata* u = userdata; pa_assert(u); - if (!pa_thread_mq_get()) { - pa_log_debug("Thread starting up"); - pa_thread_mq_install(&u->thread_mq); - } + /* a render message could be queued after a set state message */ + if (!PA_SINK_IS_LINKED(u->sink->thread_info.state)) + return; u->memchunk.length = u->nbytes; pa_sink_render_into(u->sink, &u->memchunk); - (*bq)->Enqueue(bq, u->buf, u->memchunk.length); + (*u->bqPlayerBufferQueue)->Enqueue(u->bqPlayerBufferQueue, u->buf, u->memchunk.length); +} + +static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *memchunk) { + switch (code) { + case SINK_MESSAGE_RENDER: + process_render(data); + return 0; + } + + return pa_sink_process_msg(o, code, data, offset, memchunk); +}; + +static void sles_callback(SLBufferQueueItf bqPlayerBufferQueue, void *userdata) { + struct userdata* u = userdata; + + pa_assert(u); + + pa_assert_se(pa_asyncmsgq_send(u->sles_msgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_RENDER, u, 0, NULL) == 0); } #define CHK(stmt) { \ @@ -160,7 +183,7 @@ static int pa_init_sles_player(struct userdata *u, pa_sample_spec *ss) { CHK((*u->bqPlayerObject)->GetInterface(u->bqPlayerObject, SL_IID_PLAY, &u->bqPlayerPlay)); CHK((*u->bqPlayerObject)->GetInterface(u->bqPlayerObject, SL_IID_BUFFERQUEUE, &u->bqPlayerBufferQueue)); - CHK((*u->bqPlayerBufferQueue)->RegisterCallback(u->bqPlayerBufferQueue, process_render, u)); + CHK((*u->bqPlayerBufferQueue)->RegisterCallback(u->bqPlayerBufferQueue, sles_callback, u)); return 0; @@ -175,27 +198,12 @@ static void thread_func(void *userdata) { pa_assert(u); - for (;;) { - int ret; - - /* Render some data and drop it immediately */ - if (PA_SINK_IS_LINKED(u->sink->thread_info.state)) { - process_render(u->bqPlayerBufferQueue, u); - break; - } - - /* Hmm, nothing to do. Let's sleep */ - if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) - goto fail; - - if (ret == 0) - goto finish; - } + pa_log_debug("Thread starting up"); + pa_thread_mq_install(&u->thread_mq); for (;;) { int ret; - /* Hmm, nothing to do. Let's sleep */ if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) goto fail; @@ -215,15 +223,14 @@ finish: static int state_func(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) { struct userdata *u = s->userdata; - int r = 0; if ((PA_SINK_IS_OPENED(s->state) && state == PA_SINK_SUSPENDED) || (PA_SINK_IS_LINKED(s->state) && state == PA_SINK_UNLINKED)) - r = (*u->bqPlayerPlay)->SetPlayState(u->bqPlayerPlay, SL_PLAYSTATE_STOPPED); - else if ((s->state == PA_SINK_SUSPENDED && PA_SINK_IS_OPENED(state)) || - (s->state == PA_SINK_INIT && PA_SINK_IS_LINKED(state))) - r = (*u->bqPlayerPlay)->SetPlayState(u->bqPlayerPlay, SL_PLAYSTATE_PLAYING); - return r; + (*u->bqPlayerPlay)->SetPlayState(u->bqPlayerPlay, SL_PLAYSTATE_STOPPED); + else if ((s->state == PA_SINK_SUSPENDED || state == PA_SINK_INIT) && + PA_SINK_IS_LINKED(state)) + (*u->bqPlayerPlay)->SetPlayState(u->bqPlayerPlay, SL_PLAYSTATE_PLAYING); + return 0; } static void process_rewind(pa_sink *s) { @@ -245,7 +252,25 @@ int pa__init(pa_module*m) { u->core = m->core; u->module = m; u->rtpoll = pa_rtpoll_new(); - pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll); + + if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) { + pa_log("pa_thread_mq_init() failed."); + goto fail; + } + + /* The queue linking the AudioTrack thread and our RT thread */ + u->sles_msgq = pa_asyncmsgq_new(0); + if (!u->sles_msgq) { + pa_log("pa_asyncmsgq_new() failed."); + goto fail; + } + + /* The msgq from the AudioTrack RT thread should have an even higher + * priority than the normal message queues, to match the guarantee + * all other drivers make: supplying the audio device with data is + * the top priority -- and as long as that is possible we don't do + * anything else */ + u->rtpoll_item = pa_rtpoll_item_new_asyncmsgq_read(u->rtpoll, PA_RTPOLL_EARLY-1, u->sles_msgq); if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { pa_log("Failed to parse module arguments."); @@ -294,7 +319,7 @@ int pa__init(pa_module*m) { goto fail; } - u->sink->parent.process_msg = pa_sink_process_msg; + u->sink->parent.process_msg = sink_process_msg; u->sink->set_state_in_main_thread = state_func; u->sink->request_rewind = process_rewind; u->sink->userdata = u; @@ -319,6 +344,7 @@ int pa__init(pa_module*m) { } pa_sink_put(u->sink); + sles_callback(u->bqPlayerBufferQueue, u); pa_modargs_free(ma); @@ -355,11 +381,6 @@ void pa__done(pa_module*m) { if (u->sink) pa_sink_unlink(u->sink); - if (u->thread) { - pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL); - pa_thread_free(u->thread); - } - DESTROY(bqPlayerObject); DESTROY(outputMixObject); DESTROY(engineObject); @@ -367,11 +388,22 @@ void pa__done(pa_module*m) { pa_memblock_unref_fixed(u->memchunk.memblock); free(u->buf); + if (u->thread) { + pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL); + pa_thread_free(u->thread); + } + pa_thread_mq_done(&u->thread_mq); if (u->sink) pa_sink_unref(u->sink); + if (u->rtpoll_item) + pa_rtpoll_item_free(u->rtpoll_item); + + if (u->sles_msgq) + pa_asyncmsgq_unref(u->sles_msgq); + if (u->rtpoll) pa_rtpoll_free(u->rtpoll);