Linux进程池实现的详细指南

1.为什么要有进程池

如果你了解过STL的底层设计,你会发现在其中会有一个叫做内存池的设计。其作用就是先申请出一片空间,如果后续你需要对你的容器进行扩容,所扩展的空间就从内存池里取的。这样可以提高扩容的效率。
比如你要扩容100次,如果每次扩容都向系统申请空间的话,效率就很低,因为向系统申请空间也是需要时间的,所以内存池的作用就是一次性申请一篇空间,当需要扩容时就向内存池要,可以提高扩容的效率。
既然这样,我们也可以如此理解进程池,一次性创建一批进程,如果有任务要执行就交给进程池中空闲的进程来做,而不是一有任务就创建一个新的进程,进程池的目的也是为了提供效率,节省创建进程的时间消耗。
通过预先创建和复用进程,进程池能够提高任务执行效率,避免频繁创建和销毁进程带来的系统开销。

2.进程池的工作原理

进程池的核心思想是创建固定数量的进程,然后将需要执行的任务分配给这些进程来处理。当某个任务完成后,该进程可以继续处理下一个任务,而不是销毁。这样可以减少频繁创建和销毁进程带来的资源浪费。

2.1 进程池的工作流程

  • 初始化:预处理创建一定数量的进程,形成进程池。
  • 任务分配:当有任务需要处理时,将任务分配给某个空闲进程。
  • 任务处理:空闲进程接受任务并执行。
  • 复用进程:任务执行完成后,进程回到池中,等待新的任务。
  • 退出:当没有新的任务且需要关闭进程池,池中进程将逐个退出。

3. 进程池的实现(重点)

本文将着重讲解进程池的实现步骤。

初始化

通过运行可执行文件时传入的参数来创建一定数量的子进程。
如:创建5个子进程

./a.out 5

代码实现:

enum
{
    ArgcError = 1,
    ArgvError,
    PipeError
};

void Usage(const char* tip)
{
    cout<<"Usage:"<<tip<<" sub_process_num"<<endl;
}

int main(int argc,char* argv[])
{
    if(argc != 2)
    {//格式不对
        Usage(argv[0]);
        return ArgcError;
    }
    int sub_process_number = stoi(argv[1]);
    if(sub_process_number<=0)
    {//子进程数量不能小于1
        return ArgvError;
    }
    //创建子进程...
    return 0;
}

现在我们来分析下,我们要实现进程池的功能:创建子进程,发送任务给子进程执行,子进程的轮询,杀死进程,等待进程,一些Debug功能。这样的话我们完全可以创建一个类来封装这些功能。除此之外,我们还需要描述一下子进程,为此也需要创建一个描述管道的类。

3.1 Channel类

Channel类的功能主要是来描述管道的,具有的属性有该管道对应的子进程的id,名字,写端描述符。
Channel类的实现比较简单,直接看代码吧:

class Channel
{
public:
    Channel(int wfd,pid_t sub_process_id,string name)
        : wfd_(wfd),sub_process_id_(sub_process_id),name_(name)
        {}
     void printDebug()
    {
        cout<<"name:"<<name_<<endl;
        cout<<"wfd:"<<wfd_<<endl;
        cout<<"pid:"<<sub_process_id_<<endl;
    }
    string getName()
    {
        return name_;
    }
    pid_t getPid()
    {
        return sub_process_id_;
    }
    int getWfd()
    {
        return wfd_;
    }
    void Close()
    {
        close(wfd_);
    }
private:
    int wfd_;
    pid_t sub_process_id_;
    string name_;
};

3.2 ProcessPool类

ProcessPool类的功能主要是来描述进程池的,具有的属性有该管道对应的子进程的数量,所有的管道。
类似于这样:

Linux进程池实现的详细指南

ProcessPool类的框架:

class ProcessPool
{
public:
	ProcessPool(int sub_process_num)
		: sub_process_num_(sub_process_num)
		{}
	//...
private:
	int sub_process_num_;
	vector<Channel> channels;
};

3.2.1 创建子进程

因为我们需要创建指定数目的进程,用一个循环来写就可以了。在循环中,父进程每次都会创建一个子进程出来,然后用管道于它们链接,注意因为是父进程给子进程分配任务,所以需要把父进程的读端关闭,子进程的写端关闭。
初版:

int CreateProcess()
{
	for(int i = 0;i<sub_process_num_;++i)
	{
		int pipefd[2];
		int n = pipe(pipefd);
		if(n == -1)
		{
			return PipeError;
		}
		pid_t id = fork();
		if(id == 0)
		{
			//子进程,关闭写端
			close(pipefd[1]);
			//work...

		}
		//父进程,关闭读端
		close(pipefd[0]);
		string cname = "channel-"+to_string(i);
		channels.push_back(Channel(pipefd[1],id,cname));
	}
	return 0;
}

为了让子进程执行相应的任务,我们还可以添加一个回调函数workerworker函数主要作用是选择要执行的任务,具体的任务,我们还需要自己创建,为此还可以创建3个测试用的任务,用一个函数指针数组去保存这些函数。
代码如下:

typedef void(* work_t)(int);
typedef void(* task_t)(int,pid_t);

void PrintLog(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : printf log task\n" << endl;
}

void ReloadConf(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : reload conf task\n" << endl;
}

void ConnectMysql(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : connect mysql task\n" << endl;
}

task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};

void worker(int fd)
{
    while(true)
    {
        uint32_t command_code = 0;
        ssize_t n = read(0,&command_code,sizeof(command_code));
        if(n == sizeof(command_code))
        {
            if(command_code>=3)
            {
                continue;
            }
            tasks[command_code](fd,getpid());
        }
        else
        {
            cout<<"sub process:"<<getpid()<<"quit now..."<<endl;
            break;
        }
    }
}

第二版:
第二版相对第一版,多了个回调函数,这个回调函数可以让我实现相对应的工作。同时也多个重定向功能,把原本标准输入的功能给到了pipefd[0],也就是说当子进程去读标准输入内的数据时,会去读管道中的数据。
这是一个典型的标准输入重定向操作,将管道的读端作为当前进程的输入来源

int CreateProcess(work_t work)
{
	//vector<int> fds;
	for(int i = 0;i<sub_process_num_;++i)
	{
		int pipefd[2];
		int n = pipe(pipefd);
		if(n == -1)
		{
			return PipeError;
		}
		pid_t id = fork();
		if(id == 0)
		{
			//子进程,关闭写端
			close(pipefd[1]);
			dup2(pipefd[0],0);
			work(pipefd[0]);
			exit(0);

		}
		//父进程,关闭读端
		close(pipefd[0]);
		string cname = "channel-"+to_string(i);
		channels.push_back(Channel(pipefd[1],id,cname));
	}
	return 0;
}

其实该代码中还存在bug,有个魔鬼细节存在!!!
第三版:
其实对于子进程来说,它的写端并没有全部关闭。下面我们来画图:
创建第一个管道,这个图如果看过我讲匿名管道的那篇的话,还是比较熟悉的。

Linux进程池实现的详细指南

现在我们来创建第二个管道,我们知道文件描述符的创建是遵循当前没有直接使用的最小的一个下标,作为新的文件描述符。所以呢,新创建的管道的pipefd[0]依旧是在先前的位置,可是写端就不是了,原先的写端并没有被关闭,我们新管道创建的pipefd[1]会在其下方被创建。

然后要知道的是,子进程是由父进程创建的,它的各项数据是由父进程复制而来,也就会把上一个管道的写端给复制过了,但是子进程可是关闭不了它的,因为它只能拿到新创建管道的写端pipefd[1]的位置。具体情况如图:

Linux进程池实现的详细指南

所以为了关闭子进程的所有写端,我们需要用有个数组去保存父进程中的写端,然后再子进程中把它们一一关闭。
代码如下:

int CreateProcess(work_t work)
{
	vector<int> fds;
	for(int i = 0;i<sub_process_num_;++i)
	{
		int pipefd[2];
		int n = pipe(pipefd);
		if(n == -1)
		{
			return PipeError;
		}
		pid_t id = fork();
		if(id == 0)
		{
			//子进程,关闭写端
			if(!fds.empty())
			{
				cout<<"close w fd:";
				for(auto fd:fds)
				{
					close(fd);
					cout<<fd<<" ";
				}
				cout<<endl;
			}
			close(pipefd[1]);
			dup2(pipefd[0],0);
			work(pipefd[0]);
			exit(0);

		}
		//父进程,关闭读端
		close(pipefd[0]);
		string cname = "channel-"+to_string(i);
		channels.push_back(Channel(pipefd[1],id,cname));
		fds.push_back(pipefd[1]);
	}
	return 0;
}

3.2.2 杀死所有进程

进程池也有不需要的时候,当进程池不需要了,我们就要回收子进程了,怎么回收呢?当然是进程等待了
杀死子进程也就是等待子进程。
要注意的是别忘了关闭文件描述符
进程等待是必须的,不然的话子进程会变成僵尸进程的。

void KillAllProcess()
{
	//在回收子进程前,我们需要把pipefd[1]全部关闭
	for(auto&channel:channels)
	{
		channel.Close();
		//关闭完文件描述符后,开始等待。等待进程需要子进程的pid,恰巧我们的Channel中存有子进程的pid
		pid_t pid = channel.getPid();
		pid_t rid = waitpid(pid,nullptr,0);
		if(rid == pid)
		{
			//回收成功
			cout<<"wait sub process:"<<pid<<" success..."<<endl;
		}
		cout<<channel.getName()<<"close done"<<" sub process quit now:"<<channel.getPid()<<endl;
	}
}

3.2.3 其他功能

因为这些功能都比较简单就一块讲了吧。
子进程的轮询,我能总不能让一个子进程一直跑任务吧,为了合理利用子进程,我们可以设计也该轮询函数,让子进程的任务分配"雨露均沾"。

int NextChannel()
{
	static int next = 0;//static修饰的变量只会初始化一次。
	int c = next;
	next = (next+1)%sub_process_num_;
	return c;
}

发送任务代码:

void SendTaskCode(int index,uint32_t code)
{
	cout << "send code: " << code << " to " << channels[index].getName() << " sub prorcess id: " << channels[index].getPid() << endl;
	write(channels[index].getPid(), &code, sizeof(code));
}

debug:

void Debug()
{
	for(auto&channel:channels)
	{
		channel.printDebug();
		cout<<endl;
	}
}

3.3 控制进程池

完成上面的功能就需要我们去控制进程池的子进程了。
主要包括创建进程池,控制子进程,回收子进程。

void CtrlSubProcess(ProcessPool* processpool,int cnt)
{
    while(cnt--)
    {
        //选择一个进程和通道
        int channel = processpool->NextChannel();
        //选择一个任务
        uint32_t code = NextTask();
        processpool->SendTaskCode(channel,code);
        sleep(1);
    }
}

int main(int argc,char* argv[])
{
    if(argc!=2)
    {
        Usage(argv[0]);
        return ArgcError;
    }
    int sub_process_num = stoi(argv[1]);
    if(sub_process_num<1)
    {
        return ArgvError;
    }
    srand((unsigned int)time(nullptr));//生成随机种子
    //创建进程池
    ProcessPool* processpool = new ProcessPool(sub_process_num);
    processpool->CreateProcess(worker);
    processpool->Debug();
    //sleep(2);
    //控制子进程
    CtrlSubProcess(processpool,10);
    //sleep(2);
    //回收子进程
    processpool->KillAllProcess();
    delete processpool;

    return 0;
}

运行结果:

Linux进程池实现的详细指南

4. 完整代码

///processpool.cc//
#include <iostream>
#include <vector>
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include <sys/wait.h>
#include <sys/types.h>
#include "bolg.hpp"
using namespace std;
enum
{
    ArgcError = 1,
    ArgvError,
    PipeError
};

class Channel
{
public:
    Channel(int wfd,pid_t sub_process_id,string name)
        : wfd_(wfd),sub_process_id_(sub_process_id),name_(name)
        {}
    void printDebug()
    {
        cout<<"name:"<<name_<<endl;
        cout<<"wfd:"<<wfd_<<endl;
        cout<<"pid:"<<sub_process_id_<<endl;
    }
    string getName()
    {
        return name_;
    }
    pid_t getPid()
    {
        return sub_process_id_;
    }
    int getWfd()
    {
        return wfd_;
    }
    void Close()
    {
        close(wfd_);
    }
private:
    int wfd_;
    pid_t sub_process_id_;
    string name_;
};

class ProcessPool
{
public:
    ProcessPool(int sub_process_num)
        : sub_process_num_(sub_process_num)
        {}
    int CreateProcess(work_t work)
    {
        vector<int> fds;
        for(int i = 0;i<sub_process_num_;++i)
        {
            int pipefd[2];
            int n = pipe(pipefd);
            if(n == -1)
            {
                return PipeError;
            }
            pid_t id = fork();
            if(id == 0)
            {
                //子进程,关闭写端
                if(!fds.empty())
                {
                    cout<<"close w fd:";
                    for(auto fd:fds)
                    {
                        close(fd);
                        cout<<fd<<" ";
                    }
                    cout<<endl;
                }
                close(pipefd[1]);
                dup2(pipefd[0],0);
                work(pipefd[0]);
                exit(0);

            }
            //父进程,关闭读端
            close(pipefd[0]);
            string cname = "channel-"+to_string(i);
            channels.push_back(Channel(pipefd[1],id,cname));
            fds.push_back(pipefd[1]);
        }
        return 0;
    }
    void KillAllProcess()
    {
        //在回收子进程前,我们需要把pipefd[1]全部关闭
        for(auto&channel:channels)
        {
            channel.Close();
            //关闭完文件描述符后,开始等待。等待进程需要子进程的pid,恰巧我们的Channel中存有子进程的pid
            pid_t pid = channel.getPid();
            pid_t rid = waitpid(pid,nullptr,0);
            if(rid == pid)
            {
                //回收成功
                cout<<"wait sub process:"<<pid<<" success..."<<endl;
            }
            cout<<channel.getName()<<"close done"<<" sub process quit now:"<<channel.getPid()<<endl;
        }
    }
    int NextChannel()
    {
        static int next = 0;
        int c = next;
        next = (next+1)%sub_process_num_;
        return c;
    }
    void SendTaskCode(int index,uint32_t code)
    {
        cout << "send code: " << code << " to " << channels[index].getName() << " sub prorcess id: " << channels[index].getPid() << endl;
        write(channels[index].getPid(), &code, sizeof(code));
    }
    void Debug()
    {
        for(auto&channel:channels)
        {
            channel.printDebug();
            cout<<endl;
        }
    }
private:
   int sub_process_num_;
   vector<Channel> channels;
};


void Usage(const char* tip)
{
    cout<<"Usage:"<<tip<<" sub_process_num"<<endl;
}



void CtrlSubProcess(ProcessPool* processpool,int cnt)
{
    while(cnt--)
    {
        //选择一个进程和通道
        int channel = processpool->NextChannel();
        //选择一个任务
        uint32_t code = NextTask();
        processpool->SendTaskCode(channel,code);
        sleep(1);
    }
}

int main(int argc,char* argv[])
{
    if(argc!=2)
    {
        Usage(argv[0]);
        return ArgcError;
    }
    int sub_process_num = stoi(argv[1]);
    if(sub_process_num<1)
    {
        return ArgvError;
    }
    srand((unsigned int)time(nullptr));
    //创建进程池
    ProcessPool* processpool = new ProcessPool(sub_process_num);
    processpool->CreateProcess(worker);
    processpool->Debug();
    //sleep(2);
    //控制子进程
    CtrlSubProcess(processpool,10);
    //sleep(2);
    //回收子进程
    processpool->KillAllProcess();
    delete processpool;

    return 0;
}
///task.hpp
#include <iostream>
#include <vector>
#include <cstdlib>
#include <unistd.h>
using namespace std;
//创建函数指针
typedef void(* work_t)(int);
typedef void(* task_t)(int,pid_t);

void PrintLog(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : printf log task\n" << endl;
}

void ReloadConf(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : reload conf task\n" << endl;
}

void ConnectMysql(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : connect mysql task\n" << endl;
}

uint32_t NextTask()
{
    return rand()%3;
}

task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};

void worker(int fd)
{
    while(true)
    {
        uint32_t command_code = 0;
        ssize_t n = read(0,&command_code,sizeof(command_code));
        if(n == sizeof(command_code))
        {
            if(command_code>=3)
            {
                continue;
            }
            tasks[command_code](fd,getpid());
        }
        else
        {
            cout<<"sub process:"<<getpid()<<"quit now..."<<endl;
            break;
        }
    }
}

5. 总结

进程池的核心思想是创建固定数量的进程,然后将需要执行的任务分配给这些进程来处理。当某个任务完成后,该进程可以继续处理下一个任务,而不是销毁。这样可以减少频繁创建和销毁进程带来的资源浪费。

以上就是Linux进程池实现的详细指南的详细内容,更多关于Linux进程池实现的资料请关注恩蓝小号其它相关文章!

原创文章,作者:KDZQA,如若转载,请注明出处:http://www.wangzhanshi.com/n/783.html

(0)
KDZQA的头像KDZQA
上一篇 2024年12月17日 17:59:29
下一篇 2024年12月17日 17:59:32

相关推荐

  • 一些经典 Linux 命令的现代替代品

    当你 开始学习 Linux 时,最初学习的是一套标准的 Linux 命令,这些命令从 UNIX 时代就已经存在了。随着你作为一个 Linux 用户的年龄增长,你会不断地掌握这套标准…

    2025年1月1日
  • linux下dhcp服务器配置教程

    1、dhcp简介 (1)dhcp(dynamic host configuration protocol,动态主机配置协议)是一个简化主机ip地址分配管理的tcp/ip标准协议,用…

    2025年1月1日
  • linux常用命令小结之yum、源码安装

    一、软件安装 rpm rpm -ivh zziplib-0.13.62-5.el7.x86_64.rpm   //安装zziplib-0.13.62-5.el7.x86_64.rp…

    Linux 2025年1月1日
  • 详解linux根目录空间不足解决方案

    1. 前言 之前新建了个ubuntu虚拟机来构建golang开发环境以及用来运行docker,跟虚拟机分配了20g的空间并挂载到了根目录,但是后来由于用到的镜像越来越多,20g的空…

    2025年1月1日
  • linux日志轮询方案

    logrotate 简介 一般来说,日志是任何故障排除过程中非常重要的一部分,但这些日志会随着时间增长。在这种情况下,我们需要手动执行日志清理以回收空间,这是一件繁琐的管理任务。为…

    Linux 2024年12月17日
  • Linux上查看用户创建日期的几种方法总结

    前言 你知道吗,如何在 Linux 系统上查看帐户的创建日期?如果知道,那么有些什么办法。 你成功了么?如果是的话,该怎么做? 基本上 Linux 系统不会跟踪这些信息,因此,获取…

    Linux 2025年1月1日
  • Linux下双网卡Firewalld的配置流程(推荐)

    实验室拟态存储的项目需要通过lvs-nat模式通过lvs服务器来区隔内外网的服务,所以安全防护的重心则落在了lvs服务器之上。笔者最终选择通过firewalld放行端口的方式来实现…

    2025年1月1日
  • Linux shell tr 命令详解

    Linux shell tr 命令详解 1. 用途 tr,translate的简写,主要用于压缩重复字符,删除文件中的控制字符以及进行字符转换操作。 2. 语法 ? tr [OPT…

    Linux 2025年1月1日
  • VIM实现文件快速跳转插件详解

    前言 日常使用 vim 的时候经常有跳转到特定目录下某个文件的需求: 使用 vimwiki 记笔记时会默认把所有笔记保存在目录 ~/vimwiki 下, 当我们想查看某个笔记时就打…

    2025年1月1日
  • Linux 中_exit和exit的区别

    Linux 中_exit和exit的区别 在Linux的标准库函数中,有一套称作高级I/O的函数,我们熟知的printf 、fopen 、fread 、fwrite都在此列,他们也…

    Linux 2025年1月1日

发表回复

登录后才能评论