diff --git a/common/common.h b/common/common.h index 81c7b00..b41249f 100644 --- a/common/common.h +++ b/common/common.h @@ -296,10 +296,18 @@ struct x264_t /* encoder parameters */ x264_param_t param; - x264_t *thread[X264_THREAD_MAX+1]; - x264_pthread_t thread_handle; - int b_thread_active; - int i_thread_phase; /* which thread to use for the next frame */ + x264_t *thread[X264_THREAD_MAX+1]; /* contexts for each frame in progress + lookahead */ + x264_pthread_t *thread_handle; + x264_pthread_cond_t thread_queue_cv; + x264_pthread_mutex_t thread_queue_mutex; + x264_t **thread_queue; /* frames that have been prepared but not yet claimed by a worker thread */ + x264_pthread_cond_t thread_active_cv; + x264_pthread_mutex_t thread_active_mutex; + int thread_active; + int b_thread_active; + int i_thread_phase; /* which thread to use for the next frame */ + int thread_exit; + int thread_error; /* bitstream output */ struct diff --git a/encoder/encoder.c b/encoder/encoder.c index 56e5d5b..efd9aa6 100644 --- a/encoder/encoder.c +++ b/encoder/encoder.c @@ -46,6 +46,34 @@ static int x264_encoder_frame_end( x264_t *h, x264_t *thread_current, x264_nal_t **pp_nal, int *pi_nal, x264_picture_t *pic_out ); +/* threading */ + +static void *x264_slices_write_thread( x264_t *h ); + +#ifdef HAVE_PTHREAD +void x264_int_cond_broadcast( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val ) +{ + x264_pthread_mutex_lock( mutex ); + *var = val; + x264_pthread_cond_broadcast( cv ); + x264_pthread_mutex_unlock( mutex ); +} + +void x264_int_cond_wait( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val ) +{ + x264_pthread_mutex_lock( mutex ); + while( *var != val ) + x264_pthread_cond_wait( cv, mutex ); + x264_pthread_mutex_unlock( mutex ); +} + +#else +void x264_int_cond_broadcast( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val ) +{} +void x264_int_cond_wait( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val ) +{} +#endif + /**************************************************************************** * ******************************* x264 libs ********************************** @@ -879,6 +907,16 @@ x264_t *x264_encoder_open( x264_param_t *param ) for( i = 1; i < h->param.i_threads + !!h->param.i_sync_lookahead; i++ ) CHECKED_MALLOC( h->thread[i], sizeof(x264_t) ); + if( h->param.i_threads > 1 ) + { + CHECKED_MALLOCZERO( h->thread_handle, (h->param.i_threads + 1) * sizeof(x264_pthread_t) ); + CHECKED_MALLOCZERO( h->thread_queue, (h->param.i_threads + 1) * sizeof(x264_t*) ); + if( x264_pthread_cond_init( &h->thread_queue_cv, NULL ) ) + goto fail; + if( x264_pthread_mutex_init( &h->thread_queue_mutex, NULL ) ) + goto fail; + } + for( i = 0; i < h->param.i_threads; i++ ) { if( i > 0 ) @@ -890,6 +928,13 @@ x264_t *x264_encoder_open( x264_param_t *param ) /* Start each thread with room for 8 NAL units; it'll realloc later if needed. */ CHECKED_MALLOC( h->thread[i]->out.nal, 8*sizeof(x264_nal_t) ); h->thread[i]->out.i_nals_allocated = 8; + if( h->param.i_threads > 1 ) + { + if( x264_pthread_cond_init( &h->thread[i]->thread_active_cv, NULL ) ) + goto fail; + if( x264_pthread_mutex_init( &h->thread[i]->thread_active_mutex, NULL ) ) + goto fail; + } if( x264_macroblock_cache_init( h->thread[i] ) < 0 ) goto fail; } @@ -922,6 +967,13 @@ x264_t *x264_encoder_open( x264_param_t *param ) h->sps->i_profile_idc == PROFILE_HIGH ? "High" : "High 4:4:4 Predictive", h->sps->i_level_idc/10, h->sps->i_level_idc%10 ); + if( h->param.i_threads > 1 ) + { + for( i = 0; i < h->param.i_threads; i++ ) + if( x264_pthread_create( &h->thread_handle[i], NULL, (void*)x264_slices_write_thread, h ) ) + return NULL; + } + return h; fail: x264_free( h ); @@ -1406,7 +1458,7 @@ static int x264_slice_write( x264_t *h ) h->mb.b_reencode_mb = 0; #if VISUALIZE - if( h->param.b_visualize ) + if( h->i_threads == 1 && h->param.b_visualize ) x264_visualize_mb( h ); #endif @@ -1517,24 +1569,10 @@ static void x264_thread_sync_stat( x264_t *dst, x264_t *src ) memcpy( &dst->stat.i_frame_count, &src->stat.i_frame_count, sizeof(dst->stat) - sizeof(dst->stat.frame) ); } -static void *x264_slices_write( x264_t *h ) +static int x264_slices_write_internal( x264_t *h ) { int i_frame_size = 0; int i_slice_num = 0; - if( h->param.i_sync_lookahead ) - x264_lower_thread_priority( 10 ); - -#ifdef HAVE_MMX - /* Misalign mask has to be set separately for each thread. */ - if( h->param.cpu&X264_CPU_SSE_MISALIGN ) - x264_cpu_mask_misalign_sse(); -#endif - -#if VISUALIZE - if( h->param.b_visualize ) - if( x264_visualize_init( h ) ) - return (void *)-1; -#endif /* init stats */ memset( &h->stat.frame, 0, sizeof(h->stat.frame) ); @@ -1554,10 +1592,31 @@ static void *x264_slices_write( x264_t *h ) } h->sh.i_last_mb = X264_MIN( h->sh.i_last_mb, h->mb.i_mb_count - 1 ); if( x264_stack_align( x264_slice_write, h ) ) - return (void *)-1; + return -1; h->sh.i_first_mb = h->sh.i_last_mb + 1; i_frame_size += h->out.nal[h->out.i_nal-1].i_payload + NALU_OVERHEAD; } + h->out.i_frame_size = i_frame_size; + + return 0; +} + +static void *x264_slices_write( x264_t *h ) +{ +#ifdef HAVE_MMX + /* Misalign mask has to be set separately for each thread. */ + if( h->param.cpu&X264_CPU_SSE_MISALIGN ) + x264_cpu_mask_misalign_sse(); +#endif + +#if VISUALIZE + if( h->param.b_visualize ) + if( x264_visualize_init( h ) ) + return (void *)-1; +#endif + + if( x264_slices_write_internal( h ) ) + return (void *)-1; #if VISUALIZE if( h->param.b_visualize ) @@ -1567,7 +1626,41 @@ static void *x264_slices_write( x264_t *h ) } #endif - h->out.i_frame_size = i_frame_size; + return (void *)0; +} + +static void *x264_slices_write_thread( x264_t *h ) +{ + if( h->param.i_sync_lookahead ) + x264_lower_thread_priority( 10 ); + +#ifdef HAVE_MMX + /* Misalign mask has to be set separately for each thread. */ + if( h->param.cpu&X264_CPU_SSE_MISALIGN ) + x264_cpu_mask_misalign_sse(); +#endif + + for(;;) + { + x264_t *t = NULL; + + // get one frame from the queue + x264_pthread_mutex_lock( &h->thread_queue_mutex ); + while( !h->thread_queue[0] && !h->thread_exit ) + x264_pthread_cond_wait( &h->thread_queue_cv, &h->thread_queue_mutex ); + if( h->thread_queue[0] ) + t = (void*)x264_frame_shift( (void*)h->thread_queue ); + x264_pthread_mutex_unlock( &h->thread_queue_mutex ); + if( h->thread_exit ) + return (void *)0; + if( !t ) + continue; + + t->thread_error = x264_slices_write_internal( t ); + + x264_int_cond_broadcast( &t->thread_active_cv, &t->thread_active_mutex, &t->thread_active, 0 ); + } + return (void *)0; } @@ -1811,9 +1904,14 @@ int x264_encoder_encode( x264_t *h, /* Write frame */ if( h->param.i_threads > 1 ) { - if( x264_pthread_create( &h->thread_handle, NULL, (void*)x264_slices_write, h ) ) - return -1; + assert( h->thread_active == 0 ); + h->thread_active = 1; + assert( h->b_thread_active == 0 ); h->b_thread_active = 1; + x264_pthread_mutex_lock( &h->thread[0]->thread_queue_mutex ); + x264_frame_push( (void*)h->thread_queue, (void*)h ); + x264_pthread_cond_broadcast( &h->thread[0]->thread_queue_cv ); + x264_pthread_mutex_unlock( &h->thread[0]->thread_queue_mutex ); } else if( (intptr_t)x264_slices_write( h ) ) @@ -1831,11 +1929,10 @@ static int x264_encoder_frame_end( x264_t *h, x264_t *thread_current, if( h->b_thread_active ) { - void *ret = NULL; - x264_pthread_join( h->thread_handle, &ret ); - if( (intptr_t)ret ) - return (intptr_t)ret; + x264_int_cond_wait( &h->thread_active_cv, &h->thread_active_mutex, &h->thread_active, 0 ); h->b_thread_active = 0; + if( h->thread_error ) + return -1; } if( !h->out.i_nal ) { @@ -2015,15 +2112,29 @@ void x264_encoder_close ( x264_t *h ) x264_lookahead_delete( h ); - for( i=0; iparam.i_threads; i++ ) + if( h->param.i_threads > 1 ) { // don't strictly have to wait for the other threads, but it's simpler than canceling them - if( h->thread[i]->b_thread_active ) + x264_pthread_mutex_lock( &h->thread_queue_mutex ); + h->thread_exit = 1; + x264_pthread_cond_broadcast( &h->thread_queue_cv ); + x264_pthread_mutex_unlock( &h->thread_queue_mutex ); + for( i=0; i < h->param.i_threads; i++ ) + x264_pthread_join( h->thread_handle[i], NULL ); + for( i=0; i < h->param.i_threads; i++ ) { - x264_pthread_join( h->thread[i]->thread_handle, NULL ); - assert( h->thread[i]->fenc->i_reference_count == 1 ); - x264_frame_delete( h->thread[i]->fenc ); + x264_pthread_cond_destroy( &h->thread[i]->thread_active_cv ); + x264_pthread_mutex_destroy( &h->thread[i]->thread_active_mutex ); + if( h->thread[i]->b_thread_active ) + { + assert( h->thread[i]->fenc->i_reference_count == 1 ); + x264_frame_delete( h->thread[i]->fenc ); + } } + x264_pthread_cond_destroy( &h->thread_queue_cv ); + x264_pthread_mutex_destroy( &h->thread_queue_mutex ); + x264_free( h->thread_handle ); + x264_free( h->thread_queue ); } /* Slices used and PSNR */ diff --git a/encoder/lookahead.c b/encoder/lookahead.c index 35ed729..069630c 100644 --- a/encoder/lookahead.c +++ b/encoder/lookahead.c @@ -153,7 +153,7 @@ int x264_lookahead_init( x264_t *h, int i_slicetype_length ) if( x264_macroblock_cache_init( look_h ) ) goto fail; - if( x264_pthread_create( &look_h->thread_handle, NULL, (void *)x264_lookahead_thread, look_h ) ) + if( x264_pthread_create( &h->thread_handle[h->param.i_threads], NULL, (void *)x264_lookahead_thread, look_h ) ) goto fail; return 0; @@ -168,7 +168,7 @@ void x264_lookahead_delete( x264_t *h ) { h->lookahead->b_exit_thread = 1; x264_pthread_cond_broadcast( &h->lookahead->ifbuf.cv_fill ); - x264_pthread_join( h->thread[h->param.i_threads]->thread_handle, NULL ); + x264_pthread_join( h->thread_handle[h->param.i_threads], NULL ); x264_macroblock_cache_end( h->thread[h->param.i_threads] ); x264_free( h->thread[h->param.i_threads] ); }