【澳门金沙vip】(转)线程池总结(二)

 

MySQL数据库Connection
Manager
的相关知识是本文我们主要要介绍到内容,接下来就让我们一起来了解一下吧。

原文:

1.连接的线程数

1.连接的线程数

上一篇主要讲了线程池的原理和实现,感觉有点意犹未尽,这篇文章是对上篇文章的一个补充,主要围绕以下两点展开,one-connection-per-thread的实现方式以及线程池中epoll的使用。

 

MySQL支持单线程和多线程两种连接线程数。如果是单线程,则在同一时刻,只能有一个connection连接到MySQL,其他的连接会被挂起。如果是多线程,则同一时刻可以支持多个connection同时连接到服务器。可以通过设置服务器的启动参数来设定连接的线程数:

one-connection-per-thread

      
Mysql支持单线程和多线程两种连接线程数。如果是单线程,则在同一时刻,只能有一个connection连接到Mysql,

mysqld.exe --thread-handling=no-threads  mysqld.exe --thread-handling=one-thread-per-connection 

根据scheduler_functions的模板,我们也可以列出one-connection-per-thread方式的几个关键函数。

 

服务器如何通过参数来选择使用哪种方式的呢?且看服务器中的分支代码:

static scheduler_functions con_per_functions=

其他的连接会被挂起。如果是多线程,则同一时刻可以支持多个connection同时连接到服务器。

#ifdef EMBEDDED_LIBRARY  one_thread_scheduler(&thread_scheduler);  #else  if (global_system_variables.thread_handling <=  SCHEDULER_ONE_THREAD_PER_CONNECTION)  one_thread_per_connection_scheduler(&thread_scheduler);  else if (global_system_variables.thread_handling == SCHEDULER_NO_THREADS)  one_thread_scheduler(&thread_scheduler);  else  pool_of_threads_scheduler(&thread_scheduler); /* purecov: tested */  #endif 

{     max_connection+1,      // max_threads   

 

这段代码出现在get_options函数中,此函数是根据传入的服务器参数,设置相应参数的模式。由这段代码可以看出,如果定义了EMBEDDED_LIBRARY宏定义(估计应该是嵌入式使用),则调用one_thread_scheduler,即使用单线程。如果参数小于等于SCHEDULER_ONE_THREAD_PER_CONNECTION,则调用one_thread_per_connection_scheduler,即每个连接一个线程,即多线程。

     NULL,   

     可以通过设置服务器的启动参数来设定连接的线程数:

至于global_system_variables.thread_handling是如何进行设置的呢?其实就是根据我们传递给服务器的参数–thread-handling进行设置的,参数的设置统一在函数get_options中,其调用mysqld_get_one_option,其中有个分支,代码如下:

     NULL,   

 

case OPT_THREAD_HANDLING:  {  global_system_variables.thread_handling=  find_type_or_exit(argument, &thread_handling_typelib, opt->name)-1;  break;  } 

     NULL,                             // init   

mysqld.exe –thread-handling=no-threads

对参数初始化有兴趣的可以具体的看下get_options这个函数,这里就不详细讲解了。
我们来看下one_thread_scheduler和one_thread_per_connection_scheduler的源代码,看下他们都做了些什么?

     Init_new_connection_handler_thread,  //
init_new_connection_thread

mysqld.exe –thread-handling=one-thread-per-connection

void one_thread_scheduler(scheduler_functions* func)  {  func->max_threads= 1;  #ifndef EMBEDDED_LIBRARY  func->add_connection= handle_connection_in_main_thread;  #endif  func->init_new_connection_thread= init_dummy;  func->end_thread= no_threads_end;  }   void one_thread_per_connection_scheduler(scheduler_functions* func)  {  func->max_threads= max_connections;  func->add_connection= create_thread_to_handle_connection;  func->end_thread= one_thread_per_connection_end;  }  

      create_thread_to_handle_connection,  // add_connection   

 

原来就是设置了一个结构体中scheduler_functions的参数,只不过这些参数是一些函数指针罢了,也就是说在具体的调用中,

     NULL,                                   // thd_wait_begin   

  

只需要调用add_connection或end_thread即可,不需要知道到底是调用了哪个函数,这大概就是一种变形的多态性吧。

     NULL,                                  // thd_wait_end   

 

2.初始化网络配置

     NULL,                                  //
post_kill_notification   

服务器如何通过参数来选择使用哪种方式的呢?且看服务器中的分支代码:

网络配置比较简单,就是设置端口,创建套接字,绑定端口,监听端口。实现全部集中在network_init函数中,由于这个函数确

     one_thread_per_connection_end,      // end_thread    

 

实没什么好讲的,如果对socket不熟悉的话,可以去网上搜索下相关知识。这里直接给出相应的伪代码:

     NULL                             // end

#ifdef EMBEDDED_LIBRARY

network_init  {  set_ports; //设置端口号,#define MYSQL_PORT 3306  socket; //创建套接字  bind; //绑定端口号  listen; //监听端口号  } 

};

  one_thread_scheduler(&thread_scheduler);

3.连接的方式

1.init_new_connection_handler_thread

#else

进程间通信的方式不止是SOCKET,还有其他很多方式。Mysql支持三种连接方式:namepipe、socket和shared
memory,即命名管道、套接字和共享内存的方式。这三种方式是可以共存的。默认只使用套接字。

这个接口比较简单,主要是调用pthread_detach,将线程设置为detach状态,线程结束后自动释放所有资源。

  if (global_system_variables.thread_handling <=

TCP/IP套接字方式是MySQL在任何平台下都提供的连接方式,也是网络中使用得最多的一种方式。这种方式在TCP/IP连接上建立一个基于网络的连接请求,一般情况下客户端在一台服务器上,而MySQL实例在另一台服务器上,这两台机器通过一个TCP/IP网络连接。

2.create_thread_to_handle_connection

      SCHEDULER_ONE_THREAD_PER_CONNECTION)

例如,我可以在Windows服务器下请求一台远程Linux服务器下的MySQL实例。在Windows
2000、Windows XP、Windows 2003和Windows Vista以及在此之后的
Windows操作系统中,如果两个需要通信的进程在同一台服务器上,那么可以使用命名管道,SQL
Server数据库默认安装后的本地连接也使用命名管道。在MySQL数据库中,需在配置文件中启用–enable-named-pipe选项。

这个接口是处理新连接的接口,对于线程池而言,会从thread_id%group_size对应的group中获取一个线程来处理,而one-connection-per-thread方式则会判断是否有thread_cache可以使用,如果没有则新建线程来处理。具体逻辑如下:

    one_thread_per_connection_scheduler(&thread_scheduler);

在MySQL
4.1之后的版本中,MySQL还提供了共享内存的连接方式,在配置文件中添加–shared-memory。如果想使用共享内存的方式,在连接时,Mysql客户端还必须使用-protocol=memory选项。启动时可以通过下面的参数进行设置。

(1).判断缓存的线程数是否使用完(比较blocked_pthread_count
和wake_pthread大小)

  else if (global_system_variables.thread_handling ==
SCHEDULER_NO_THREADS)

mysqld.exe --enable-named-pipe  mysqld.exe --shared-memory 

(2).若还有缓存线程,将thd加入waiting_thd_list的队列,唤醒一个等待COND_thread_cache的线程

    one_thread_scheduler(&thread_scheduler);

除了在启动时进行参数设置外,也可以通过修改MY.INI文件进行设置。我们来看下源码中选择连接方式的分支函数handle_connections_methods:

(3).若没有,创建一个新的线程处理,线程的入口函数是do_handle_one_connection

  else

handle_connections_methods()  {  if (hPipe != INVALID_HANDLE_VALUE)  {  handler_count++;  if (pthread_create(&hThread,&connection_attrib,  handle_connections_namedpipes, 0))  {  sql_print_warning("Can't create thread to handle named pipes");  handler_count--;  }  }  if (have_tcpip && !opt_disable_networking)  {  handler_count++;  if (pthread_create(&hThread,&connection_attrib,  handle_connections_sockets, 0))  {  sql_print_warning("Can't create thread to handle TCP/IP");  handler_count--;  }  }   if (opt_enable_shared_memory)  {  handler_count++;  if (pthread_create(&hThread,&connection_attrib,  handle_connections_shared_memory, 0))  {  sql_print_warning("Can't create thread to handle shared memory");  handler_count--;  }  }  } 

(4).调用add_global_thread加入thd数组。

    pool_of_threads_scheduler(&thread_scheduler);  /* purecov:
tested */

由于对于namepipe和memory
share的通信方式不太了解,这里只研究socket的通信方式。从代码中可以看出,handle_connections_sockets便是socket的设置,我们就来看下它。

 3.do_handle_one_connection

#endif

4.socket管理创建新线程

这个接口被create_thread_to_handle_connection调用,处理请求的主要实现接口。

 

socket管理其实比较简单,直接给出其伪代码:

(1).循环调用do_command,从socket中读取网络包,并且解析执行;

  

handle_connections_sockets  {  select; //监视socket文件描述符  new_socket = accept;//处理到来的客户端连接  thd = new THD;创建THD类  vio_tmp = vio_new(new_socket,VIO_TYPE_TCPIP, 0); //初始化VIO结构体  my_net_init(&thd->net, vio_tmp);//初始化thd的net结构体  create_new_thread(thd);//为这个连接创建一个新的线程,如果是单线程模式的话,就不会创建一个新线程  } 

(2).
当远程客户端发送关闭连接COMMAND(比如COM_QUIT,COM_SHUTDOWN)时,退出循环

 

首先是select函数进行监视socket端口,如果监控到有连接,则通过accept函数接受客户端的连接,然后新建一个THD类,将连接参数全部设置到THD类的参数上,最后调用create_new_thread函数,这个函数便是重点。
我们进入这个函数,看下做了啥。

(3).调用close_connection关闭连接(thd->disconnect());

这段代码出现在get_options函数中,此函数是根据传入的服务器参数,设置相应参数的模式。由这段代码可以看出,如果定义了EMBEDDED_LIBRARY宏定义(估计应该是嵌入式使用),则调用one_thread_scheduler,即使用单线程。如果参数小于等于SCHEDULER_ONE_THREAD_PER_CONNECTION,则调用one_thread_per_connection_scheduler,即每个连接一个线程,即多线程。
至于global_system_variables.thread_handling是如何进行设置的呢?其实就是根据我们传递给服务器的参数–thread-handling进行设置的,参数的设置统一在函数get_options中,其调用mysqld_get_one_option,其中有个分支,代码如下:

create_new_thread  {  ++connection_count;//全局连接数自增  thread_count++; //全局线程数自增  thread_scheduler.add_connection(thd);//真正创建线程  } 

(4).调用one_thread_per_connection_end函数,确认是否可以复用线程

 

So
easy,首先将全局连接数+1,全局线程数+1,然后调用add_connection函数,这个函数就是我们在上面第一步设置连接的

(5).根据返回结果,确定退出工作线程还是继续循环执行命令。

case OPT_THREAD_HANDLING:

线程数中,one_thread_scheduler和one_thread_per_connection_scheduler中设置的一个参数。这两者的区别便是是否创建了

4.one_thread_per_connection_end

  {

一个新的线程来处理到来的连接。one_thread_scheduler是单线程方式,木有新建线程。我们重点研究one_thread_per_connection_scheduler,其设置的add_connection函数为create_thread_to_handle_connection:

判断是否可以复用线程(thread_cache)的主要函数,逻辑如下:

    global_system_variables.thread_handling=

create_thread_to_handle_connection(THD *thd)  {  thread_created++;  threads.append(thd); // 创建线程数自增,并加入到threads链表上  pthread_create(&thd->real_id,&connection_attrib,  handle_one_connection,  (void*) thd);//这就是真正创建线程的地方了,函数便是handle_one_connection  } 

(1).调用remove_global_thread,移除线程对应的thd实例

      find_type_or_exit(argument, &thread_handling_typelib,
opt->name)-1;

可见,最后调用了pthread_create函数,这个函数便是创建一个新的线程,新线程的处理函数为handle_one_connection.

(2).调用block_until_new_connection判断是否可以重用thread

    break;

5.新线程处理流程

(3).判断缓存的线程是否超过阀值,若没有,则blocked_pthread_count++;

  }

新线程处理函数为handle_one_connection,到此位置,一个新的connection被一个新创建的线程所单独处理。我们看下其中

(4).阻塞等待条件变量COND_thread_cache

 

是如何进行处理的。

(5).被唤醒后,表示有新的thd需要重用线程,将thd从waiting_thd_list中移除,使用thd初始化线程的thd->thread_stack

  

handle_one_connection(void *arg)  {  for (;;)  {  lex_start(thd); //初始化词法分析结构体  login_connection(thd); //用户认证,失败报错  prepare_new_connection_state(THD* thd);//Initialize THD to handle queries  while (!net->error && net->vio != 0 && //循环处理command  !(thd->killed == THD::KILL_CONNECTION))  {  if (do_command(thd))  break; //处理失败跳出  }  end_connection(thd); //关闭连接  close_connection(thd, 0, 1);  thread_scheduler.end_thread(thd,1);//结束线程  return 0;  }  } 

(6).调用add_global_thread加入thd数组。

 

首先进行了词法分析结构体的初始化,然后进行用户认证,认证成功后通过do_command循环执行客户端发过来的命令。

(7).如果可以重用,返回false,否则返回ture

对参数初始化有兴趣的可以具体的看下get_options这个函数,这里就不详细讲解了。
我们来看下one_thread_scheduler和one_thread_per_connection_scheduler的源代码,看下他们都做了些什么?

6.总结

线程池与epoll

 

整个connection
manager的流程十分清晰,单线程的连接一般很少使用,大多使用多线程方式。多线程连接中其实还涉及到线程缓冲池的概念,即如果一个连接断开后,其所创建的线程不会被销毁掉,而是放到缓冲池中,等待下一个新的connection到来时,首先去线程缓冲池查找是否有空闲的线程,有的话直接使用,木有的话才去创建新的线程来管理这个connection。

     
在引入线程池之前,server层只有一个监听线程,负责监听mysql端口和本地unixsocket的请求,对于每个新的连接,都会分配一个独立线程来处理,因此监听线程的任务比较轻松,mysql通过poll或select方式来实现IO的多路复用。引入线程池后,除了server层的监听线程,每个group都有一个监听线程负责监听group内的所有连接socket的连接请求,工作线程不负责监听,只处理请求。对于overscribe为1000的线程池设置,每个监听线程需要监听1000个socket的请求,监听线程采用epoll方式来实现监听。

void one_thread_scheduler(scheduler_functions* func)

关于MySQL数据库Connection
Manager的相关知识就介绍到这里了,希望本次的介绍能够对您有所收获!

   
Select,poll,epoll都是IO多路复用机制,IO多路复用通过一种机制,可以监听多个fd(描述符),比如socket,一旦某个fd就绪(读就绪或写就绪),能够通知程序进行相应的读写操作。epoll相对于select和poll有了很大的改进,首先epoll通过epoll_ctl函数注册,注册时,将所有fd拷贝进内核,只拷贝一次不需要重复拷贝,而每次调用poll或select时,都需要将fd集合从用户空间拷贝到内核空间(epoll通过epoll_wait进行等待);其次,epoll为每个描述符指定了一个回调函数,当设备就绪时,唤醒等待者,通过回调函数将描述符加入到就绪链表,无需像select,poll方式采用轮询方式;最后select默认只支持1024个fd,epoll则没有限制,具体数字可以参考cat
/proc/sys/fs/file-max的设置。epoll贯穿在线程池使用的过程中,下面我就epoll的创建,使用和销毁生命周期来描述epoll在线程中是如何使用的。

{

数据库 Connection Manager
的相关知识是本文我们主要要介绍到内容,接下来就让我们一起来了解一下吧。
1.连接的线程数 MySQL支持单线程…

  1. 线程池初始化,epoll通过epoll_create函数创建epoll文件描述符,实现函数是thread_group_init;
  2. 端口监听线程监听到请求后,创建socket,并创建THD和connection对象,放在对应的group队列中;
  3. 工作线程获取该connection对象时,若还未登录,则进行登录验证
  4. 若socket还未注册到epoll,则调用epoll_ctl进行注册,注册方式是EPOLL_CTL_ADD,并将connection对象放入epoll_event结构体中
  5. 若是老连接的请求,仍然需要调用epoll_ctl注册,注册方式是EPOLL_CTL_MOD
  6. group内的监听线程调用epoll_wait来监听注册的fd,epoll是一种同步IO方式,所以会进行等待
  7. 请求到来时,获取epoll_event结构体中的connection,放入到group中的队列
  8. 线程池销毁时,调用thread_group_close将epoll关闭。

  func->max_threads= 1;

备注:

#ifndef EMBEDDED_LIBRARY

1.注册在epoll的fd,若请求就绪,则将对应的event放入到events数组,并将该fd的事务类型清空,因此对于老的连接请求,依然需要调用epoll_ctl(pollfd,
EPOLL_CTL_MOD,  fd, &ev)来注册。

  func->add_connection= handle_connection_in_main_thread;

线程池函数调用关系

#endif

(1)创建epoll

  func->init_new_connection_thread= init_dummy;

tp_init->thread_group_init->tp_set_threadpool_size->io_poll_create->epoll_create

  func->end_thread= no_threads_end;

(2)关闭epoll

}

tp_end->thread_group_close->thread_group_destroy->close(pollfd)

 

(3)关联socket描述符

void one_thread_per_connection_scheduler(scheduler_functions*
func)

handle_event->start_io->io_poll_associate_fd->io_poll_start_read->epoll_ctl

{

(4)处理连接请求

  func->max_threads= max_connections;

handle_event->threadpool_process_request->do_command->dispatch_command->mysql_parse->mysql_execute_command

  func->add_connection= create_thread_to_handle_connection;

(5)工作线程空闲时

  func->end_thread= one_thread_per_connection_end;

worker_main->get_event->pthread_cond_timedwait

}

等待thread_pool_idle_timeout后,退出。

 

(6)监听epoll

  

worker_main->get_event->listener->io_poll_wait->epoll_wait

 

(7)端口监听线程

  
原来就是设置了一个结构体中scheduler_functions的参数,只不过这些参数是一些函数指针罢了,也就是说在具体的调用中,

main->mysqld_main->handle_connections_sockets->poll 

 

one-connection-per-thread函数调用关系

只需要调用add_connection或end_thread即可,不需要知道到底是调用了哪个函数,这大概就是一种变形的多态性吧。

(1) 工作线程等待请求

 

handle_one_connection->do_handle_one_connection->do_command->
my_net_read->net_read_packet->net_read_packet_header->net_read_raw_loop->
vio_read->vio_socket_io_wait->vio_io_wait->poll

2.初始化网络配置

备注:与线程池的工作线程有监听线程帮助其监听请求不同,one-connection-per-thread方式的工作线程在空闲时,会调用poll阻塞等待网络包过来;

 

而线程池的工作线程只需要专心处理请求即可,所以使用也更充分。

  
网络配置比较简单,就是设置端口,创建套接字,绑定端口,监听端口。实现全部集中在network_init函数中,由于这个函数确

(2)端口监听线程
与线程池的(7)相同 

 

参考文档

实没什么好讲的,如果对socket不熟悉的话,可以去网上搜索下相关知识。这里直接给出相应的伪代码:

 

network_init

{

    set_ports; //设置端口号,#define MYSQL_PORT            3306

    socket;//创建套接字

    bind;       //绑定端口号

    listen;//监听端口号

}