From 606d4e109bb9764e077d4212bdccfd429fb712c8 Mon Sep 17 00:00:00 2001 From: Anton Mitrofanov Date: Wed, 7 Apr 2010 12:15:54 +0300 Subject: [PATCH] Thread-pool patch --- common/common.h | 20 ++++-- encoder/encoder.c | 198 +++++++++++++++++++++++++++++++++++++++------------ encoder/lookahead.c | 4 +- 3 files changed, 168 insertions(+), 54 deletions(-) diff --git a/common/common.h b/common/common.h index 6abd42b..2eb3c1e 100644 --- a/common/common.h +++ b/common/common.h @@ -350,12 +350,20 @@ struct x264_t /* encoder parameters */ x264_param_t param; - x264_t *thread[X264_THREAD_MAX+1]; - x264_pthread_t thread_handle; - int b_thread_active; - int i_thread_phase; /* which thread to use for the next frame */ - int i_threadslice_start; /* first row in this thread slice */ - int i_threadslice_end; /* row after the end of this thread slice */ + x264_t *thread[X264_THREAD_MAX+1]; /* contexts for each frame in progress + lookahead */ + x264_pthread_t *thread_handle; + x264_pthread_cond_t thread_queue_cv; + x264_pthread_mutex_t thread_queue_mutex; + x264_t **thread_queue; /* frames that have been prepared but not yet claimed by a worker thread */ + x264_pthread_cond_t thread_active_cv; + x264_pthread_mutex_t thread_active_mutex; + int thread_active; + int b_thread_active; + int i_thread_phase; /* which thread to use for the next frame */ + int thread_exit; + int thread_error; + int i_threadslice_start; /* first row in this thread slice */ + int i_threadslice_end; /* row after the end of this thread slice */ /* bitstream output */ struct diff --git a/encoder/encoder.c b/encoder/encoder.c index e25c9db..cae2c1d 100644 --- a/encoder/encoder.c +++ b/encoder/encoder.c @@ -45,6 +45,53 @@ static int x264_encoder_frame_end( x264_t *h, x264_t *thread_current, x264_nal_t **pp_nal, int *pi_nal, x264_picture_t *pic_out ); +/* threading */ + +static void *x264_slices_write_thread( x264_t *h ); + +#ifdef HAVE_PTHREAD +static void x264_int_cond_broadcast( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val ) +{ + x264_pthread_mutex_lock( mutex ); + *var = val; + x264_pthread_cond_broadcast( cv ); + x264_pthread_mutex_unlock( mutex ); +} + +static void x264_int_cond_wait( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val ) +{ + x264_pthread_mutex_lock( mutex ); + while( *var != val ) + x264_pthread_cond_wait( cv, mutex ); + x264_pthread_mutex_unlock( mutex ); +} + +#else +static void x264_int_cond_broadcast( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val ) +{} +static void x264_int_cond_wait( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val ) +{} +#endif + +static void x264_thread_pool_push( x264_t *h ) +{ + assert( h->thread_active == 0 ); + h->thread_active = 1; + assert( h->b_thread_active == 0 ); + h->b_thread_active = 1; + x264_pthread_mutex_lock( &h->thread[0]->thread_queue_mutex ); + x264_frame_push( (void*)h->thread_queue, (void*)h ); + x264_pthread_cond_broadcast( &h->thread[0]->thread_queue_cv ); + x264_pthread_mutex_unlock( &h->thread[0]->thread_queue_mutex ); +} + +static int x264_thread_pool_wait( x264_t *h ) +{ + x264_int_cond_wait( &h->thread_active_cv, &h->thread_active_mutex, &h->thread_active, 0 ); + h->b_thread_active = 0; + return h->thread_error; +} + /**************************************************************************** * ******************************* x264 libs ********************************** @@ -1019,6 +1066,16 @@ x264_t *x264_encoder_open( x264_param_t *param ) for( int i = 1; i < h->param.i_threads + !!h->param.i_sync_lookahead; i++ ) CHECKED_MALLOC( h->thread[i], sizeof(x264_t) ); + if( h->param.i_threads > 1 ) + { + CHECKED_MALLOCZERO( h->thread_handle, (h->param.i_threads + 1) * sizeof(x264_pthread_t) ); + CHECKED_MALLOCZERO( h->thread_queue, (h->param.i_threads + 1) * sizeof(x264_t*) ); + if( x264_pthread_cond_init( &h->thread_queue_cv, NULL ) ) + goto fail; + if( x264_pthread_mutex_init( &h->thread_queue_mutex, NULL ) ) + goto fail; + } + if( x264_lookahead_init( h, i_slicetype_length ) ) goto fail; @@ -1043,6 +1100,20 @@ x264_t *x264_encoder_open( x264_param_t *param ) CHECKED_MALLOC( h->thread[i]->out.nal, init_nal_count*sizeof(x264_nal_t) ); h->thread[i]->out.i_nals_allocated = init_nal_count; + if( h->param.i_threads > 1 ) + { + if( x264_pthread_cond_init( &h->thread[i]->thread_active_cv, NULL ) ) + goto fail; + if( x264_pthread_mutex_init( &h->thread[i]->thread_active_mutex, NULL ) ) + goto fail; + } + +#ifdef HAVE_VISUALIZE + if( h->param.b_visualize ) + if( x264_visualize_init( h->thread[i] ) ) + goto fail; +#endif + if( allocate_threadlocal_data && x264_macroblock_cache_init( h->thread[i] ) < 0 ) goto fail; } @@ -1093,6 +1164,11 @@ x264_t *x264_encoder_open( x264_param_t *param ) h->sps->i_profile_idc == PROFILE_HIGH ? "High" : "High 4:4:4 Predictive", h->sps->i_level_idc/10, h->sps->i_level_idc%10 ); + if( h->param.i_threads > 1 ) + for( int i = 0; i < h->param.i_threads; i++ ) + if( x264_pthread_create( &h->thread_handle[i], NULL, (void*)x264_slices_write_thread, h ) ) + return NULL; + return h; fail: x264_free( h ); @@ -1984,24 +2060,10 @@ static void x264_thread_sync_stat( x264_t *dst, x264_t *src ) memcpy( &dst->stat.i_frame_count, &src->stat.i_frame_count, sizeof(dst->stat) - sizeof(dst->stat.frame) ); } -static void *x264_slices_write( x264_t *h ) +static int x264_slices_write_internal( x264_t *h ) { int i_slice_num = 0; int last_thread_mb = h->sh.i_last_mb; - if( h->param.i_sync_lookahead ) - x264_lower_thread_priority( 10 ); - -#ifdef HAVE_MMX - /* Misalign mask has to be set separately for each thread. */ - if( h->param.cpu&X264_CPU_SSE_MISALIGN ) - x264_cpu_mask_misalign_sse(); -#endif - -#ifdef HAVE_VISUALIZE - if( h->param.b_visualize ) - if( x264_visualize_init( h ) ) - return (void *)-1; -#endif /* init stats */ memset( &h->stat.frame, 0, sizeof(h->stat.frame) ); @@ -2020,24 +2082,69 @@ static void *x264_slices_write( x264_t *h ) } h->sh.i_last_mb = X264_MIN( h->sh.i_last_mb, last_thread_mb ); if( x264_stack_align( x264_slice_write, h ) ) - return (void *)-1; + return -1; h->sh.i_first_mb = h->sh.i_last_mb + 1; } #ifdef HAVE_VISUALIZE if( h->param.b_visualize ) - { x264_visualize_show( h ); - x264_visualize_close( h ); - } #endif + return 0; +} + +static int x264_slices_write( x264_t *h ) +{ +#ifdef HAVE_MMX + /* Misalign mask has to be set separately for each thread. */ + if( h->param.cpu&X264_CPU_SSE_MISALIGN ) + x264_cpu_mask_misalign_sse(); +#endif + + if( x264_slices_write_internal( h ) ) + return -1; + + return 0; +} + +static void *x264_slices_write_thread( x264_t *h ) +{ + if( h->param.i_sync_lookahead ) + x264_lower_thread_priority( 10 ); + +#ifdef HAVE_MMX + /* Misalign mask has to be set separately for each thread. */ + if( h->param.cpu&X264_CPU_SSE_MISALIGN ) + x264_cpu_mask_misalign_sse(); +#endif + + for(;;) + { + int b_exit; + x264_t *t = NULL; + + // get one frame from the queue + x264_pthread_mutex_lock( &h->thread_queue_mutex ); + while( !h->thread_queue[0] && !h->thread_exit ) + x264_pthread_cond_wait( &h->thread_queue_cv, &h->thread_queue_mutex ); + b_exit = h->thread_exit; + if( !b_exit ) + t = (void*)x264_frame_shift( (void*)h->thread_queue ); + x264_pthread_mutex_unlock( &h->thread_queue_mutex ); + if( b_exit ) + break; + + t->thread_error = x264_slices_write_internal( t ); + + x264_int_cond_broadcast( &t->thread_active_cv, &t->thread_active_mutex, &t->thread_active, 0 ); + } + return (void *)0; } static int x264_threaded_slices_write( x264_t *h ) { - void *ret = NULL; /* set first/last mb and sync contexts */ for( int i = 0; i < h->param.i_threads; i++ ) { @@ -2060,18 +2167,10 @@ static int x264_threaded_slices_write( x264_t *h ) /* dispatch */ for( int i = 0; i < h->param.i_threads; i++ ) - { - if( x264_pthread_create( &h->thread[i]->thread_handle, NULL, (void*)x264_slices_write, (void*)h->thread[i] ) ) - return -1; - h->thread[i]->b_thread_active = 1; - } + x264_thread_pool_push( h->thread[i] ); for( int i = 0; i < h->param.i_threads; i++ ) - { - x264_pthread_join( h->thread[i]->thread_handle, &ret ); - h->thread[i]->b_thread_active = 0; - if( (intptr_t)ret ) - return (intptr_t)ret; - } + if( x264_thread_pool_wait( h->thread[i] ) ) + return -1; /* deblocking and hpel filtering */ for( int i = 0; i <= h->sps->i_mb_height; i++ ) @@ -2448,18 +2547,14 @@ int x264_encoder_encode( x264_t *h, h->i_threadslice_start = 0; h->i_threadslice_end = h->sps->i_mb_height; if( h->i_thread_frames > 1 ) - { - if( x264_pthread_create( &h->thread_handle, NULL, (void*)x264_slices_write, h ) ) - return -1; - h->b_thread_active = 1; - } + x264_thread_pool_push( h ); else if( h->param.b_sliced_threads ) { if( x264_threaded_slices_write( h ) ) return -1; } else - if( (intptr_t)x264_slices_write( h ) ) + if( x264_slices_write( h ) ) return -1; return x264_encoder_frame_end( thread_oldest, thread_current, pp_nal, pi_nal, pic_out ); @@ -2472,13 +2567,8 @@ static int x264_encoder_frame_end( x264_t *h, x264_t *thread_current, char psz_message[80]; if( h->b_thread_active ) - { - void *ret = NULL; - x264_pthread_join( h->thread_handle, &ret ); - h->b_thread_active = 0; - if( (intptr_t)ret ) - return (intptr_t)ret; - } + if( x264_thread_pool_wait( h ) ) + return -1; if( !h->out.i_nal ) { pic_out->i_type = X264_TYPE_AUTO; @@ -2749,9 +2839,21 @@ void x264_encoder_close ( x264_t *h ) if( h->param.i_threads > 1 ) { // don't strictly have to wait for the other threads, but it's simpler than canceling them + x264_pthread_mutex_lock( &h->thread_queue_mutex ); + h->thread_exit = 1; + x264_pthread_cond_broadcast( &h->thread_queue_cv ); + x264_pthread_mutex_unlock( &h->thread_queue_mutex ); for( int i = 0; i < h->param.i_threads; i++ ) - if( h->thread[i]->b_thread_active ) - x264_pthread_join( h->thread[i]->thread_handle, NULL ); + x264_pthread_join( h->thread_handle[i], NULL ); + for( int i = 0; i < h->param.i_threads; i++ ) + { + x264_pthread_cond_destroy( &h->thread[i]->thread_active_cv ); + x264_pthread_mutex_destroy( &h->thread[i]->thread_active_mutex ); + } + x264_pthread_cond_destroy( &h->thread_queue_cv ); + x264_pthread_mutex_destroy( &h->thread_queue_mutex ); + x264_free( h->thread_handle ); + x264_free( h->thread_queue ); if( h->i_thread_frames > 1 ) { for( int i = 0; i < h->i_thread_frames; i++ ) @@ -3053,6 +3155,10 @@ void x264_encoder_close ( x264_t *h ) x264_frame_delete( *frame ); x264_macroblock_cache_end( h->thread[i] ); } +#ifdef HAVE_VISUALIZE + if( h->param.b_visualize ) + x264_visualize_close( h->thread[i] ); +#endif x264_free( h->thread[i]->scratch_buffer ); x264_free( h->thread[i]->out.p_bitstream ); x264_free( h->thread[i]->out.nal); diff --git a/encoder/lookahead.c b/encoder/lookahead.c index 7a0c6d3..108f4cf 100644 --- a/encoder/lookahead.c +++ b/encoder/lookahead.c @@ -151,7 +151,7 @@ int x264_lookahead_init( x264_t *h, int i_slicetype_length ) if( x264_macroblock_cache_init( look_h ) ) goto fail; - if( x264_pthread_create( &look_h->thread_handle, NULL, (void *)x264_lookahead_thread, look_h ) ) + if( x264_pthread_create( &h->thread_handle[h->param.i_threads], NULL, (void *)x264_lookahead_thread, look_h ) ) goto fail; look->b_thread_active = 1; @@ -169,7 +169,7 @@ void x264_lookahead_delete( x264_t *h ) h->lookahead->b_exit_thread = 1; x264_pthread_cond_broadcast( &h->lookahead->ifbuf.cv_fill ); x264_pthread_mutex_unlock( &h->lookahead->ifbuf.mutex ); - x264_pthread_join( h->thread[h->param.i_threads]->thread_handle, NULL ); + x264_pthread_join( h->thread_handle[h->param.i_threads], NULL ); x264_macroblock_cache_end( h->thread[h->param.i_threads] ); x264_free( h->thread[h->param.i_threads]->scratch_buffer ); x264_free( h->thread[h->param.i_threads] ); -- 1.7.0.2.msysgit.0