libpulseaudio: rewrite sles sink

This should work way better than the old code, as it makes use of
the buffer queue callback to do Enqueue(), which might be the only
nice/right way to use OpenSLES on Android. CPU usage is low and
RAM usage seems reasonable. No memory leak noticed. Tested with
mpv on my Oreo phone (wired and Bluetooth).

The latency was chosen base on Bluetooth audio requirement on Oreo.
Shouldn't be hard to make it configurable as a module param in the
future.

The new code has a known downside though, that is it doesn't really
support sink suspension, as in, silence will kept being written
to the audio device even when the sink is suspended, which may have
certain impact to battery time.

It's probably possible to catch the state change of the sink and
notify the buffer queue about it. It's just I don't want to bother
digging further at the moment.
This commit is contained in:
Tom Yan 2018-07-23 21:23:58 +08:00 committed by Fredrik Fornwall
parent f06727719f
commit fd69382e37

View File

@ -42,54 +42,38 @@
#include <pulsecore/rtpoll.h> #include <pulsecore/rtpoll.h>
#include <SLES/OpenSLES.h> #include <SLES/OpenSLES.h>
#include <SLES/OpenSLES_Android.h>
//Only certain interfaces are supported by the fast mixer. These are:
//SL_IID_ANDROIDSIMPLEBUFFERQUEUE
//SL_IID_VOLUME
//SL_IID_MUTESOLO
#define USE_ANDROID_SIMPLE_BUFFER_QUEUE #define USE_ANDROID_SIMPLE_BUFFER_QUEUE
#ifdef USE_ANDROID_SIMPLE_BUFFER_QUEUE #ifdef USE_ANDROID_SIMPLE_BUFFER_QUEUE
#include <SLES/OpenSLES_Android.h> #define DATALOCATOR_BUFFERQUEUE SL_DATALOCATOR_ANDROIDSIMPLEBUFFERQUEUE
#define DATALOCATOR_BUFFERQUEUE SL_DATALOCATOR_ANDROIDSIMPLEBUFFERQUEUE #define IID_BUFFERQUEUE SL_IID_ANDROIDSIMPLEBUFFERQUEUE
#define IID_BUFFERQUEUE SL_IID_ANDROIDSIMPLEBUFFERQUEUE #define BufferQueueItf SLAndroidSimpleBufferQueueItf
#define BufferQueueItf SLAndroidSimpleBufferQueueItf #define BufferQueueState SLAndroidSimpleBufferQueueState
#define BufferQueueState SLAndroidSimpleBufferQueueState #define IID_BUFFERQUEUE_USED SL_IID_ANDROIDSIMPLEBUFFERQUEUE
#define IID_BUFFERQUEUE_USED SL_IID_ANDROIDSIMPLEBUFFERQUEUE #define INDEX index
#define INDEX index
#else #else
#define DATALOCATOR_BUFFERQUEUE SL_DATALOCATOR_BUFFERQUEUE #define DATALOCATOR_BUFFERQUEUE SL_DATALOCATOR_BUFFERQUEUE
#define IID_BUFFERQUEUE SL_IID_BUFFERQUEUE #define IID_BUFFERQUEUE SL_IID_BUFFERQUEUE
#define BufferQueueItf SLBufferQueueItf #define BufferQueueItf SLBufferQueueItf
#define BufferQueueState SLBufferQueueState #define BufferQueueState SLBufferQueueState
#define IID_BUFFERQUEUE_USED IID_BUFFERQUEUE #define IID_BUFFERQUEUE_USED IID_BUFFERQUEUE
#define INDEX playIndex #define INDEX playIndex
#endif #endif
#define checkResult(r) do { \
if ((r) != SL_RESULT_SUCCESS) { \
if ((r) == SL_RESULT_PARAMETER_INVALID) fprintf(stderr, "error SL_RESULT_PARAMETER_INVALID at %s:%d\n", __FILE__, __LINE__); \
else if ((r) == SL_RESULT_PRECONDITIONS_VIOLATED ) fprintf(stderr, "error SL_RESULT_PRECONDITIONS_VIOLATED at %s:%d\n", __FILE__, __LINE__); \
else fprintf(stderr, "error %d at %s:%d\n", (int) r, __FILE__, __LINE__); \
} \
} while (0)
PA_MODULE_AUTHOR("Lennart Poettering, Nathan Martynov"); PA_MODULE_AUTHOR("Lennart Poettering, Nathan Martynov");
PA_MODULE_DESCRIPTION("Android OpenSL ES sink"); PA_MODULE_DESCRIPTION("Android OpenSL ES sink");
PA_MODULE_VERSION(PACKAGE_VERSION); PA_MODULE_VERSION(PACKAGE_VERSION);
PA_MODULE_LOAD_ONCE(false); PA_MODULE_LOAD_ONCE(false);
PA_MODULE_USAGE( PA_MODULE_USAGE(
"sink_name=<name for the sink> " "sink_name=<name for the sink> "
"sink_properties=<properties for the sink> " "sink_properties=<properties for the sink> "
"rate=<sampling rate> "); "rate=<sampling rate> "
);
#define DEFAULT_SINK_NAME "OpenSL ES sink" #define DEFAULT_SINK_NAME "OpenSL ES sink"
#define BLOCK_USEC (PA_USEC_PER_SEC * 2) #define BLOCK_USEC (PA_USEC_PER_MSEC * 125)
typedef struct pa_memblock_queue_t {
pa_memblock *memblock;
struct pa_memblock_queue_t* next;
} pa_memblock_queue;
struct userdata { struct userdata {
pa_core *core; pa_core *core;
@ -101,23 +85,19 @@ struct userdata {
pa_rtpoll *rtpoll; pa_rtpoll *rtpoll;
pa_usec_t block_usec; pa_usec_t block_usec;
pa_usec_t timestamp;
pa_memchunk memchunk; pa_memchunk memchunk;
SLObjectItf engineObject; SLObjectItf engineObject;
SLEngineItf engineEngine; SLEngineItf engineEngine;
// output mix interfaces // output mix interfaces
SLObjectItf outputMixObject; SLObjectItf outputMixObject;
// buffer queue player interfaces // buffer queue player interfaces
SLObjectItf bqPlayerObject; SLObjectItf bqPlayerObject;
SLPlayItf bqPlayerPlay; SLPlayItf bqPlayerPlay;
BufferQueueItf bqPlayerBufferQueue; BufferQueueItf bqPlayerBufferQueue;
pa_memblock_queue* current;
pa_memblock_queue* last;
}; };
static const char* const valid_modargs[] = { static const char* const valid_modargs[] = {
@ -127,174 +107,111 @@ static const char* const valid_modargs[] = {
NULL NULL
}; };
static int sink_process_msg( static void process_render(BufferQueueItf bq, void *context) {
pa_msgobject *o, struct userdata* u = (struct userdata*) context;
int code, void *p;
void *data,
int64_t offset,
pa_memchunk *chunk) {
struct userdata *u = PA_SINK(o)->userdata; pa_assert(u);
//pa_log_debug("Called\n");
switch (code) { if (!pa_thread_mq_get()) {
case PA_SINK_MESSAGE_SET_STATE: pa_log_debug("Thread starting up");
pa_thread_mq_install(&u->thread_mq);
if (pa_sink_get_state(u->sink) == PA_SINK_SUSPENDED || pa_sink_get_state(u->sink) == PA_SINK_INIT) {
if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING || PA_PTR_TO_UINT(data) == PA_SINK_IDLE)
u->timestamp = pa_rtclock_now();
}
break;
case PA_SINK_MESSAGE_GET_LATENCY: {
pa_usec_t now;
now = pa_rtclock_now();
*((pa_usec_t*) data) = u->timestamp > now ? u->timestamp - now : 0ULL;
return 0;
}
} }
return pa_sink_process_msg(o, code, data, offset, chunk); if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) {
//pa_log_debug("Rewinded\n");
pa_sink_process_rewind(u->sink, 0);
}
pa_sink_render(u->sink, u->sink->thread_info.max_request, &u->memchunk);
p = pa_memblock_acquire_chunk(&u->memchunk);
(*bq)->Enqueue(bq, p, u->memchunk.length);
//pa_log_debug("Written: %zu\n", u->memchunk.length);
pa_memblock_release(u->memchunk.memblock);
pa_memblock_unref(u->memchunk.memblock);
} }
static void sink_update_requested_latency_cb(pa_sink *s) { #define CHK(stmt) { \
struct userdata *u; SLresult res = stmt; \
size_t nbytes; if (res != SL_RESULT_SUCCESS) { \
fprintf(stderr, "error %d at %s:%d\n", res, __FILE__, __LINE__); \
pa_sink_assert_ref(s); goto fail; \
pa_assert_se(u = s->userdata); } \
u->block_usec = pa_sink_get_requested_latency_within_thread(s);
if (u->block_usec == (pa_usec_t) -1)
u->block_usec = s->thread_info.max_latency;
nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec);
pa_sink_set_max_rewind_within_thread(s, nbytes);
pa_sink_set_max_request_within_thread(s, nbytes);
}
static void pa_sles_callback(BufferQueueItf bq, void *context){
struct userdata* s = (struct userdata*) context;
pa_memblock_queue* next;
if (s->current != NULL){
if (s->current->memblock != NULL) pa_memblock_unref(s->current->memblock);
next = s->current->next;
free(s->current);
s->current = next;
}
} }
static int pa_init_sles_player(struct userdata *s, SLint32 sl_rate) static int pa_init_sles_player(struct userdata *s, SLint32 sl_rate)
{ {
if (s == NULL) return -1; if (s == NULL) return -1;
SLresult result;
// create engine
// create engine CHK(slCreateEngine(&(s->engineObject), 0, NULL, 0, NULL, NULL));
result = slCreateEngine(&(s->engineObject), 0, NULL, 0, NULL, NULL); checkResult(result); CHK((*s->engineObject)->Realize(s->engineObject, SL_BOOLEAN_FALSE));
result = (*s->engineObject)->Realize(s->engineObject, SL_BOOLEAN_FALSE); checkResult(result);
CHK((*s->engineObject)->GetInterface(s->engineObject, SL_IID_ENGINE, &(s->engineEngine)));
result = (*s->engineObject)->GetInterface(s->engineObject, SL_IID_ENGINE, &(s->engineEngine)); checkResult(result);
// create output mix
// create output mix CHK((*s->engineEngine)->CreateOutputMix(s->engineEngine, &(s->outputMixObject), 0, NULL, NULL));
result = (*s->engineEngine)->CreateOutputMix(s->engineEngine, &(s->outputMixObject), 0, NULL, NULL); checkResult(result); CHK((*s->outputMixObject)->Realize(s->outputMixObject, SL_BOOLEAN_FALSE));
result = (*s->outputMixObject)->Realize(s->outputMixObject, SL_BOOLEAN_FALSE); checkResult(result);
// create audio player
// create audio player
SLDataLocator_OutputMix locator_outputmix;
SLDataLocator_OutputMix locator_outputmix; locator_outputmix.locatorType = SL_DATALOCATOR_OUTPUTMIX;
locator_outputmix.locatorType = SL_DATALOCATOR_OUTPUTMIX; locator_outputmix.outputMix = s->outputMixObject;
locator_outputmix.outputMix = s->outputMixObject;
SLDataLocator_BufferQueue locator_bufferqueue;
SLDataLocator_BufferQueue locator_bufferqueue; locator_bufferqueue.locatorType = DATALOCATOR_BUFFERQUEUE;
locator_bufferqueue.locatorType = DATALOCATOR_BUFFERQUEUE; locator_bufferqueue.numBuffers = 1;
locator_bufferqueue.numBuffers = 50;
if (sl_rate < 8000 || sl_rate > 192000) {
if (sl_rate < SL_SAMPLINGRATE_8 || sl_rate > SL_SAMPLINGRATE_192) { pa_log("Incompatible sample rate");
pa_log("Incompatible sample rate"); return -1;
return -1; }
}
SLDataFormat_PCM pcm;
SLDataFormat_PCM pcm; pcm.formatType = SL_DATAFORMAT_PCM;
pcm.formatType = SL_DATAFORMAT_PCM; pcm.numChannels = 2;
pcm.numChannels = 2; pcm.samplesPerSec = sl_rate * 1000;
pcm.samplesPerSec = sl_rate; pcm.bitsPerSample = SL_PCMSAMPLEFORMAT_FIXED_16;
pcm.bitsPerSample = SL_PCMSAMPLEFORMAT_FIXED_16; pcm.containerSize = 16;
pcm.containerSize = 16; pcm.channelMask = SL_SPEAKER_FRONT_LEFT | SL_SPEAKER_FRONT_RIGHT;
pcm.channelMask = SL_SPEAKER_FRONT_LEFT | SL_SPEAKER_FRONT_RIGHT; pcm.endianness = SL_BYTEORDER_LITTLEENDIAN;
pcm.endianness = SL_BYTEORDER_LITTLEENDIAN;
SLDataSource audiosrc;
SLDataSource audiosrc; audiosrc.pLocator = &locator_bufferqueue;
audiosrc.pLocator = &locator_bufferqueue; audiosrc.pFormat = &pcm;
audiosrc.pFormat = &pcm;
SLDataSink audiosnk;
SLDataSink audiosnk; audiosnk.pLocator = &locator_outputmix;
audiosnk.pLocator = &locator_outputmix; audiosnk.pFormat = NULL;
audiosnk.pFormat = NULL;
SLInterfaceID ids[1] = {IID_BUFFERQUEUE};
SLInterfaceID ids[1] = {IID_BUFFERQUEUE}; SLboolean flags[1] = {SL_BOOLEAN_TRUE};
SLboolean flags[1] = {SL_BOOLEAN_TRUE}; CHK((*s->engineEngine)->CreateAudioPlayer(s->engineEngine, &s->bqPlayerObject, &audiosrc, &audiosnk, 1, ids, flags));
result = (*s->engineEngine)->CreateAudioPlayer(s->engineEngine, &s->bqPlayerObject, &audiosrc, &audiosnk, 1, ids, flags); checkResult(result); CHK((*s->bqPlayerObject)->Realize(s->bqPlayerObject, SL_BOOLEAN_FALSE));
result = (*s->bqPlayerObject)->Realize(s->bqPlayerObject, SL_BOOLEAN_FALSE); checkResult(result);
CHK((*s->bqPlayerObject)->GetInterface(s->bqPlayerObject, SL_IID_PLAY, &s->bqPlayerPlay));
result = (*s->bqPlayerObject)->GetInterface(s->bqPlayerObject, SL_IID_PLAY, &s->bqPlayerPlay); checkResult(result); CHK((*s->bqPlayerObject)->GetInterface(s->bqPlayerObject, IID_BUFFERQUEUE_USED, &s->bqPlayerBufferQueue));
result = (*s->bqPlayerObject)->GetInterface(s->bqPlayerObject, IID_BUFFERQUEUE_USED, &s->bqPlayerBufferQueue); checkResult(result);
CHK((*s->bqPlayerBufferQueue)->RegisterCallback(s->bqPlayerBufferQueue, process_render, s));
result = (*s->bqPlayerBufferQueue)->RegisterCallback(s->bqPlayerBufferQueue, pa_sles_callback, s); checkResult(result);
CHK((*s->bqPlayerPlay)->SetPlayState(s->bqPlayerPlay, SL_PLAYSTATE_PLAYING));
result = (*s->bqPlayerPlay)->SetPlayState(s->bqPlayerPlay, SL_PLAYSTATE_PLAYING); checkResult(result);
return 0;
return 0;
fail:
return -1;
} }
#undef CHK
static void pa_destroy_sles_player(struct userdata *s){ static void pa_destroy_sles_player(struct userdata *s){
if (s == NULL) return; if (s == NULL) return;
(*s->bqPlayerPlay)->SetPlayState(s->bqPlayerPlay, SL_PLAYSTATE_STOPPED); (*s->bqPlayerPlay)->SetPlayState(s->bqPlayerPlay, SL_PLAYSTATE_STOPPED);
(*s->bqPlayerObject)->Destroy(s->bqPlayerObject); (*s->bqPlayerObject)->Destroy(s->bqPlayerObject);
(*s->outputMixObject)->Destroy(s->outputMixObject); (*s->outputMixObject)->Destroy(s->outputMixObject);
(*s->engineObject)->Destroy(s->engineObject); (*s->engineObject)->Destroy(s->engineObject);
}
static void process_render(struct userdata *u, pa_usec_t now) {
pa_memblock_queue* current_block;
size_t ate = 0;
pa_assert(u);
/* This is the configured latency. Sink inputs connected to us
might not have a single frame more than the maxrequest value
queued. Hence: at maximum read this many bytes from the sink
inputs. */
/* Fill the buffer up the latency size */
while (u->timestamp < now + u->block_usec) {
void *p;
pa_sink_render(u->sink, u->sink->thread_info.max_request, &u->memchunk);
p = pa_memblock_acquire(u->memchunk.memblock);
(*u->bqPlayerBufferQueue)->Enqueue(u->bqPlayerBufferQueue, (uint8_t*) p + u->memchunk.index, u->memchunk.length);
pa_memblock_release(u->memchunk.memblock);
u->timestamp += pa_bytes_to_usec(u->memchunk.length, &u->sink->sample_spec);
ate += u->memchunk.length;
current_block = malloc(sizeof(pa_memblock_queue));
memset(current_block, 0, sizeof(pa_memblock_queue));
current_block->memblock = u->memchunk.memblock;
if (u->current == NULL) { u->current = current_block; }
if (u->last == NULL) { u->last = current_block; }
else {
u->last->next = current_block;
u->last = current_block;
}
//pa_memblock_unref(u->memchunk.memblock);
pa_memchunk_reset(&u->memchunk);
if (ate >= u->sink->thread_info.max_request) break;
}
} }
static void thread_func(void *userdata) { static void thread_func(void *userdata) {
@ -302,30 +219,25 @@ static void thread_func(void *userdata) {
pa_assert(u); pa_assert(u);
pa_log_debug("Thread starting up");
pa_thread_mq_install(&u->thread_mq);
u->timestamp = pa_rtclock_now();
for (;;) { for (;;) {
pa_usec_t now = 0;
int ret; int ret;
if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
now = pa_rtclock_now();
if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
pa_sink_process_rewind(u->sink, 0);
/* Render some data and drop it immediately */ /* Render some data and drop it immediately */
if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) { if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
if (u->timestamp <= now) process_render(u->bqPlayerBufferQueue, u);
process_render(u, now); break;
}
pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp); /* Hmm, nothing to do. Let's sleep */
} else if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
pa_rtpoll_set_timer_disabled(u->rtpoll); goto fail;
if (ret == 0)
goto finish;
}
for (;;) {
int ret;
/* Hmm, nothing to do. Let's sleep */ /* Hmm, nothing to do. Let's sleep */
if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
@ -345,13 +257,6 @@ finish:
pa_log_debug("Thread shutting down"); pa_log_debug("Thread shutting down");
} }
static int getenv_int(const char * env, size_t min_len){
char * got_env = getenv(env);
int ret = 0;
if (got_env != NULL && strlen(got_env) >= min_len) ret = atoi(got_env); //"8000" is 4 symbols
return ret;
}
int pa__init(pa_module*m) { int pa__init(pa_module*m) {
struct userdata *u = NULL; struct userdata *u = NULL;
pa_sample_spec ss; pa_sample_spec ss;
@ -367,12 +272,6 @@ int pa__init(pa_module*m) {
goto fail; goto fail;
} }
// High rate causes glitches on some devices, this is needed to prevent it
//ss.rate = 32000;
//ss.channels = 2;
//ss.format = PA_SAMPLE_S16LE;
//OK. That will allow users to define sampling rate under his responsibility
ss = m->core->default_sample_spec; ss = m->core->default_sample_spec;
map = m->core->default_channel_map; map = m->core->default_channel_map;
if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) { if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
@ -380,28 +279,18 @@ int pa__init(pa_module*m) {
goto fail; goto fail;
} }
//Needed. Don't touch ss.channels = 2;
ss.channels = 2;
ss.format = PA_SAMPLE_S16LE; ss.format = PA_SAMPLE_S16LE;
m->userdata = u = pa_xnew0(struct userdata, 1); m->userdata = u = pa_xnew0(struct userdata, 1);
int forceFormat = getenv_int("PROPERTY_OUTPUT_SAMPLE_RATE", 4); //"8000" is 4 symbols
if (forceFormat >= 8000 && forceFormat <= 192000) {
ss.rate = forceFormat;
pa_log_info("Sample rate was forced to be %u\n", ss.rate);
}
u->core = m->core; u->core = m->core;
u->module = m; u->module = m;
u->rtpoll = pa_rtpoll_new(); u->rtpoll = pa_rtpoll_new();
pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll); pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
//Pulseaudio uses samples per sec but OpenSL ES uses samples per ms if (pa_init_sles_player(u, ss.rate) < 0)
if (pa_init_sles_player(u, ss.rate * 1000) < 0) goto fail;
goto fail;
//int buff[2] = {0, 0};
//(*u->bqPlayerBufferQueue)->Enqueue(u->bqPlayerBufferQueue, buff, 1);
pa_sink_new_data_init(&data); pa_sink_new_data_init(&data);
data.driver = __FILE__; data.driver = __FILE__;
@ -418,7 +307,7 @@ int pa__init(pa_module*m) {
goto fail; goto fail;
} }
u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY); u->sink = pa_sink_new(m->core, &data, 0);
pa_sink_new_data_done(&data); pa_sink_new_data_done(&data);
if (!u->sink) { if (!u->sink) {
@ -426,8 +315,7 @@ int pa__init(pa_module*m) {
goto fail; goto fail;
} }
u->sink->parent.process_msg = sink_process_msg; u->sink->parent.process_msg = pa_sink_process_msg;
u->sink->update_requested_latency = sink_update_requested_latency_cb;
u->sink->userdata = u; u->sink->userdata = u;
pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
@ -443,7 +331,7 @@ int pa__init(pa_module*m) {
goto fail; goto fail;
} }
pa_sink_set_latency_range(u->sink, 0, BLOCK_USEC); pa_sink_set_fixed_latency(u->sink, u->block_usec);
pa_sink_put(u->sink); pa_sink_put(u->sink);
@ -484,10 +372,10 @@ void pa__done(pa_module*m) {
pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL); pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
pa_thread_free(u->thread); pa_thread_free(u->thread);
} }
if (u->engineObject){ if (u->engineObject){
pa_destroy_sles_player(u); pa_destroy_sles_player(u);
} }
pa_thread_mq_done(&u->thread_mq); pa_thread_mq_done(&u->thread_mq);