Mysql源碼學習——Thread Manager

 

一、前言

    上篇的Connection Manager中,曾提及對於一個新到來的Connection,服務器會創建一個新的線程來處理這個連接。

其實沒那麼簡單,為瞭提高系統效率,減少頻繁創建線程和中止線程的系統消耗,Mysql使用瞭線程緩沖區的概念,即如果

一個連接斷開,則並不銷毀承載其的線程,而是將此線程放入線程緩沖區,並處於掛起狀態,當下一個新的Connection到來

時,首先去線程緩沖區去查找是否有空閑的線程,如果有,則使用之,如果沒有則新建線程。本問主要介紹這個線程緩沖區,

首先介紹下基本的概念。

 

二、基本概念

    1.線程創建函數

    大傢知道,Mysql現在是插件式的存儲引擎,隻要實現規定的接口,就可實現自己的存儲引擎。故Mysql的線程創建除瞭

出現在主服務器框架外,存儲引擎也可能會進行線程的創建。通過設置斷點,在我調試的版本中,發現瞭兩個創建線程的函數。

pthread_create:Mysql自用的創建線程函數

os_thread_create:存儲引擎innobase的創建線程的函數

    os_thread_create是存儲引擎innobase的線程函數,先擱淺不研究瞭,重點看下pthread_create,首先看下其源碼。

 

int pthread_create(pthread_t *thread_id, pthread_attr_t *attr,

           pthread_handler func, void *param)

{

  HANDLE hThread;

  struct pthread_map *map;

  DBUG_ENTER("pthread_create");

 

  if (!(map=malloc(sizeof(*map))))

    DBUG_RETURN(-1);

map->func=func; map->param=param;

  pthread_mutex_lock(&THR_LOCK_thread);

#ifdef __BORLANDC__

  hThread=(HANDLE)_beginthread((void(_USERENTRY *)(void *)) pthread_start,

                   attr->dwStackSize ? attr->dwStackSize :

                   65535, (void*) map);

#else

hThread=(HANDLE)_beginthread((void( __cdecl *)(void *)) pthread_start, attr->dwStackSize ? attr->dwStackSize : 65535, (void*) map);

#endif

  DBUG_PRINT("info", ("hThread=%lu",(long) hThread));

  *thread_id=map->pthreadself=hThread;

  pthread_mutex_unlock(&THR_LOCK_thread);

 

  if (hThread == (HANDLE) -1)

  {

    int error=errno;

    DBUG_PRINT("error",

           ("Can't create thread to handle request (error %d)",error));

    DBUG_RETURN(error ? error : -1);

  }

  VOID(SetThreadPriority(hThread, attr->priority)) ;

  DBUG_RETURN(0);

}

上面代碼首先構造瞭一個map結構體,成員分別是函數地址和傳入參數。然後調用操作系統的接口,_beginthread,但是執行函數並不是傳入的函數——func,而是pthread_start,參數為map。繼續跟蹤pthread_start。

pthread_handler_t pthread_start(void *param)

{

  pthread_handler

func=((struct pthread_map *) param)->func

;

  void *func_param=((struct pthread_map *) param)->param;

  my_thread_init();         /* Will always succeed in windows */

  pthread_mutex_lock(&THR_LOCK_thread);   /* Wait for beginthread to return */

  win_pthread_self=((struct pthread_map *) param)->pthreadself;

  pthread_mutex_unlock(&THR_LOCK_thread);

  free((char*) param);            /* Free param from create */

  pthread_exit((void*) (*func)(func_param));

  return 0;               /* Safety */

}

   可以看出,pthread_start中調用瞭map的func元素,作為真正執行的函數體。OK,創建線程的函數跟蹤到此!

   2.服務器啟動時創建瞭哪些函數?

   通過在兩個創建線程的地方設置斷點,總結瞭下,在服務器啟動時,創建瞭如下的線程。

pthread_create創建的線程

創建線程函數 線程執行函數

create_shutdown_thread

handle_shutdown

start_handle_manager

handle_manager

handle_connections_methods

handle_connections_sockets

 

innobase的os_thread_create創建的線程:

 

創建線程函數 線程執行函數

innobase_start_or_create_for_mysql

io_handler_thread(4個)

recv_recovery_from_checkpoint_finish

trx_rollback_or_clean_all_without_sess

innobase_start_or_create_for_mysql

srv_lock_timeout_thread

 

srv_error_monitor_thread

 

srv_monitor_thread

 

srv_master_thread

 

    還可以在調試過程中,通過暫停來看此時服務器中的線程,如下圖:

1

   

三、線程緩沖池

        Mysql支持線程緩存,在多線程連接模式下,如果連接斷開後,將這個線程放入空閑線程緩沖區,在下次有連接到來時,

先去緩沖池中查找是否有空閑線程,有則用之,無則創建。啟動時可以設置線程緩沖池的數目:

Mysqld.exe --thread_cache_size=10

      在一個連接斷開時,會調用cache_thread函數,將空閑的線程加入到cache中,以備後用。源碼如下:

static bool cache_thread()

{

  safe_mutex_assert_owner(&LOCK_thread_count);

  if (

cached_thread_count < thread_cache_size

&&

     ! abort_loop && !kill_cached_threads)

 {

   /* Don't kill the thread, just put it in cache for reuse */

   DBUG_PRINT("info", ("Adding thread to cache"));

   cached_thread_count++;

   while (!abort_loop && ! wake_thread && ! kill_cached_threads)

     (void) pthread_cond_wait(&COND_thread_cache, &LOCK_thread_count);

   cached_thread_count--;

   if (kill_cached_threads)

pthread_cond_signal(&COND_flush_thread_cache);

  if (wake_thread)

  {

    THD *thd;

    wake_thread--;

    thd= thread_cache.get();

    thd->thread_stack= (char*) &thd;          // For store_globals

    (void) thd->store_globals();

    /*

      THD::mysys_var::abort is associated with physical thread rather

      than with THD object. So we need to reset this flag before using

      this thread for handling of new THD object/connection.

    */

    thd->mysys_var->abort= 0;

    thd->thr_create_utime= my_micro_time();

    threads.append(thd);

    return(1);

  }

}

return(0);

}

    上面我們的啟動參數設置線程緩沖區為10,此時對應代碼裡面的thread_cache_size = 10,cached_thread_count記錄

瞭此刻cache中的空閑線程數目,隻有在cache未滿的情況下,才會將新的空閑線程加入緩沖池中。加入到緩沖區其實就是將線

程掛起,pthread_cond_wait函數便是線程等待函數,在此函數中,會調用WaitForMultipleObjects進行事件等待。具體源碼

如下:

int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,

                           struct timespec *abstime)

{

  int result;

  long timeout;

  union ft64 now;

 

  if( abstime != NULL )

  {

    GetSystemTimeAsFileTime(&now.ft);

 

    /*

      Calculate time left to abstime

      - subtract start time from current time(values are in 100ns units)

      - convert to millisec by piding with 10000

    */

    timeout= (long)((abstime->tv.i64 - now.i64) / 10000);

    

    /* Don't allow the timeout to be negative */

    if (timeout < 0)

      timeout= 0L;

 

    /*

      Make sure the calucated timeout does not exceed original timeout

      value which could cause "wait for ever" if system time changes

    */

    if (timeout > abstime->max_timeout_msec)

      timeout= abstime->max_timeout_msec;

 

  }

  else

  {

    /* No time specified; don't expire */

    timeout= INFINITE;

  }

 

  /*

    Block access if previous broadcast hasn't finished.

    This is just for safety and should normally not

    affect the total time spent in this function.

  */

  WaitForSingleObject(cond->broadcast_block_event, INFINITE);

 

  EnterCriticalSection(&cond->lock_waiting);

  cond->waiting++;

  LeaveCriticalSection(&cond->lock_waiting);

 

  LeaveCriticalSection(mutex);

result= WaitForMultipleObjects(2, cond->events, FALSE, timeout);

  EnterCriticalSection(&cond->lock_waiting);

  cond->waiting--;

  

  if (cond->waiting == 0)

  {

    /*

      We're the last waiter to be notified or to stop waiting, so

      reset the manual event.

    */

    /* Close broadcast gate */

    ResetEvent(cond->events[BROADCAST]);

    /* Open block gate */

    SetEvent(cond->broadcast_block_event);

  }

  LeaveCriticalSection(&cond->lock_waiting);

  

  EnterCriticalSection(mutex);

 

  return result == WAIT_TIMEOUT ? ETIMEDOUT : 0;

}

    此處是等待時間,何處進行事件通知呢?我們再次來到上篇所提及的為新的連接創建線程的代碼中:

void create_thread_to_handle_connection(THD *thd)

{

  if (cached_thread_count > wake_thread)

  {

    /* Get thread from cache */

    thread_cache.append(thd);

    wake_thread++;

pthread_cond_signal(&COND_thread_cache);

  }

  Else

...

}

    上篇文章我們其實隻講瞭ELSE分支,而忽略瞭IF分支。wake_thread代表瞭喚醒的線程數,即在線程緩沖區中被再次使用的

線程,如果cache中的總數>被重新使用的數目,說明還有空閑的線程,此時進入if分支,調用phtread_cond_signal喚醒上面掛起

的空閑線程。

 

線程管理就到此為止瞭,這裡隻是介紹瞭下線程緩沖區的工作原理,並沒有具體去介紹如何利用EVENT進行線程的掛起和喚醒,這些都是借助瞭操作系統的特性,有興趣的可以自己研究下。這篇就到此為止,下節會介紹Mysql的用戶身份認證原理和實現。

 

PS. 男怕入錯行,夜半三更忙,一行又一行

 

 

摘自 心中無碼

發佈留言