#include <yaz/nmem.h>
#include <unistd.h>
#include <stdlib.h>
-#include <pthread.h>
+#include <yaz/thread_create.h>
+#include <yaz/mutex.h>
#include <assert.h>
struct work_item {
struct sel_thread {
int fd[2];
NMEM nmem;
- pthread_t *thread_id;
- pthread_mutex_t mutex;
- pthread_cond_t input_data;
+ yaz_thread_t *thread_id;
+ YAZ_MUTEX mutex;
+ YAZ_COND input_data;
int stop_flag;
int no_threads;
struct work_item *input_queue;
{
struct work_item *work_this = 0;
/* wait for some work */
- pthread_mutex_lock(&p->mutex);
+ yaz_mutex_enter(p->mutex);
while (!p->stop_flag && !p->input_queue)
- pthread_cond_wait(&p->input_data, &p->mutex);
+ yaz_cond_wait(p->input_data, p->mutex, 0);
/* see if we were waken up because we're shutting down */
if (p->stop_flag)
break;
yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length);
assert(work_this);
- pthread_mutex_unlock(&p->mutex);
+ yaz_mutex_leave(p->mutex);
/* work on this item */
p->work_handler(work_this->data);
/* put it back into output queue */
- pthread_mutex_lock(&p->mutex);
+ yaz_mutex_enter(p->mutex);
work_this->next = p->output_queue;
p->output_queue = work_this;
- pthread_mutex_unlock(&p->mutex);
+ yaz_mutex_leave(p->mutex);
/* wake up select/poll with a single byte */
(void) write(p->fd[1], "", 1);
}
- pthread_mutex_unlock(&p->mutex);
+ yaz_mutex_leave(p->mutex);
return 0;
}
p->free_queue = 0;
p->work_handler = work_handler;
p->work_destroy = work_destroy;
-
+ p->no_threads = 0; /* we if need to destroy */
p->stop_flag = 0;
- p->no_threads = no_of_threads;
- pthread_mutex_init(&p->mutex, 0);
- pthread_cond_init(&p->input_data, 0);
+ p->mutex = 0;
+ yaz_mutex_create(&p->mutex);
+ yaz_cond_create(&p->input_data);
+ if (p->input_data == 0) /* condition variable could not be created? */
+ {
+ sel_thread_destroy(p);
+ return 0;
+ }
+ p->no_threads = no_of_threads;
p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
for (i = 0; i < p->no_threads; i++)
- pthread_create(p->thread_id + i, 0, sel_thread_handler, p);
+ p->thread_id[i] = yaz_thread_create(sel_thread_handler, p);
return p;
}
void sel_thread_destroy(sel_thread_t p)
{
int i;
- pthread_mutex_lock(&p->mutex);
+ yaz_mutex_enter(p->mutex);
p->stop_flag = 1;
- pthread_cond_broadcast(&p->input_data);
- pthread_mutex_unlock(&p->mutex);
+ yaz_cond_broadcast(p->input_data);
+ yaz_mutex_leave(p->mutex);
for (i = 0; i< p->no_threads; i++)
- pthread_join(p->thread_id[i], 0);
+ yaz_thread_join(&p->thread_id[i], 0);
if (p->work_destroy)
{
close(p->fd[0]);
close(p->fd[1]);
- pthread_cond_destroy(&p->input_data);
- pthread_mutex_destroy(&p->mutex);
+ yaz_cond_destroy(&p->input_data);
+ yaz_mutex_destroy(&p->mutex);
nmem_destroy(p->nmem);
}
{
struct work_item *work_p;
- pthread_mutex_lock(&p->mutex);
+ yaz_mutex_enter(p->mutex);
if (p->free_queue)
{
p->input_queue = work_p;
input_queue_length++;
yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length);
- pthread_cond_signal(&p->input_data);
- pthread_mutex_unlock(&p->mutex);
+ yaz_cond_signal(p->input_data);
+ yaz_mutex_leave(p->mutex);
}
void *sel_thread_result(sel_thread_t p)
void *data = 0;
char read_buf[1];
- pthread_mutex_lock(&p->mutex);
+ yaz_mutex_enter(p->mutex);
/* got something. Take the last one out of output_queue */
work_this = queue_remove_last(&p->output_queue);
data = work_this->data;
(void) read(p->fd[0], read_buf, 1);
}
- pthread_mutex_unlock(&p->mutex);
+ yaz_mutex_leave(p->mutex);
return data;
}