博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
创建自定义线程池
阅读量:6290 次
发布时间:2019-06-22

本文共 9087 字,大约阅读时间需要 30 分钟。

1. 创建一个单例模式的自定义线程池类

public class CustomThreadPool{    //#region configurable items - for demo let's have these as constants    private const int MAX = 8; // maximum no of threads in pool    private const int MIN = 3; // minimum no of threads in pool    private const int MIN_WAIT = 10; // milliseconds    private const int MAX_WAIT = 15000; // milliseconds - threshold for simple task    private const int CLEANUP_INTERVAL = 60000; // millisecond - to free waiting threads in pool    private const int SCHEDULING_INTERVAL = 10; // millisecond - look for task in queue in loop    //#endregion    //#region singleton instance of threadpool    private static readonly CustomThreadPool _instance = new CustomThreadPool();    private CustomThreadPool() {        InitializeThreadPool();    }    public static CustomThreadPool Instance    {        get        {            return _instance;        }    }    //#endregion    private void InitializeThreadPool() {     //TODO: write initialization code here     }}

 

2. 定义一些基本类型来和线程池进行通讯

public delegate void UserTask();public class ClientHandle{    public Guid ID;    public bool IsSimpleTask = false;}public class TaskStatus{    public bool Success = true;    public Exception InnerException = null;}

看到上面的代码了吗?UserTask是一个代理,代表了线程池中线程将要执行的任务。

3. 接下来我们给我们的自定义线程池类增加一些公共接口的方法

public ClientHandle QueueUserTask(UserTask task, Action
callback){ throw new Exception("not implemented yet.");}public static void CancelUserTask(ClientHandle handle){ //TODO: write implementation code here}

 4. 下面是线程池需要使用的一些内部类和类型

//#region nested private typesenum TaskState // to represent current state of a usertask{    notstarted,    processing,    completed,    aborted}class TaskHandle // Item in waiting queue{    public ClientHandle Token; // generate this everytime an usertask is queued and return to the caller as a reference.     public UserTask task; // the item to be queued - supplied by the caller    public Action
callback; // optional - in case user want's a notification of completion}class TaskItem // running items in the pool - TaskHandle gets a thread to execute it { public TaskHandle taskHandle; public Thread handler; public TaskState taskState = TaskState.notstarted; public DateTime startTime = DateTime.MaxValue;}//#endregion

5. 下面我们需要做的是线程池的初始化工作,并且初始化任务队列

//private instance membersprivate Queue
ReadyQueue = null;private List
Pool = null;private Thread taskScheduler = null;private void InitializeThreadPool(){ ReadyQueue = new Queue
(); Pool = new List
(); taskScheduler = new Thread(() => { //TODO: write scheduling logic here }); taskScheduler.Start();}

  上面需要特别注意的是,taskScheduler这个线程类对象。

  这是始终贯穿线程池生命周期的一个额外的线程,也可以说是主线程。

它的任务是监视用户任务队列,并且尽快地把它们带去执行,另外它还负责强制最大和最小

线程数的限制,做一些清理工作。

6. 接着就是实现线程初始化了,我们使用的是就近完成算法

private void InitializeThreadPool(){    ReadyQueue = new Queue
(); Pool = new List
(); InitPoolWithMinCapacity(); // initialize Pool with Minimum capacity - that much thread must be kept ready DateTime LastCleanup = DateTime.Now; // monitor this time for next cleaning activity taskScheduler = new Thread(() => { do { while (ReadyQueue.Count > 0 && ReadyQueue.Peek().task == null) ReadyQueue.Dequeue(); // remove cancelled item/s - cancelled item will have it's task set to null int itemCount = ReadyQueue.Count; for (int i = 0; i < itemCount; i++) { TaskHandle readyItem = ReadyQueue.Peek(); // the Top item of queue bool Added = false; foreach (TaskItem ti in Pool) { if (ti.taskState == TaskState.completed) { // if in the Pool task state is completed then a different // task can be handed over to that thread ti.taskHandle = readyItem; ti.taskState = TaskState.notstarted; Added = true; ReadyQueue.Dequeue(); break; } } if (!Added && Pool.Count < MAX) { // if all threads in pool are busy and the count is still less than the // Max limit set then create a new thread and add that to pool TaskItem ti = new TaskItem() { taskState = TaskState.notstarted }; ti.taskHandle = readyItem; // add a new TaskItem in the pool AddTaskToPool(ti); Added = true; ReadyQueue.Dequeue(); } if (!Added) break; // It's already crowded so try after sometime } if ((DateTime.Now - LastCleanup) > TimeSpan.FromMilliseconds(CLEANUP_INTERVAL)) // It's long time - so try to cleanup Pool once. { CleanupPool(); LastCleanup = DateTime.Now; } else { // either of these two can work - the combination is also fine for our demo. Thread.Yield(); Thread.Sleep(SCHEDULING_INTERVAL); // Dont run madly in a loop - wait for sometime for things to change. // the wait should be minimal - close to zero } } while (true); }); taskScheduler.Priority = ThreadPriority.AboveNormal; taskScheduler.Start();}private void InitPoolWithMinCapacity(){ for (int i = 0; i <= MIN; i++) { TaskItem ti = new TaskItem() { taskState = TaskState.notstarted }; ti.taskHandle = new TaskHandle() { task = () => { } }; ti.taskHandle.callback = (taskStatus) => { }; ti.taskHandle.Token = new ClientHandle() { ID = Guid.NewGuid() }; AddTaskToPool(ti); }}private void AddTaskToPool(TaskItem taskItem){ taskItem.handler = new Thread(() => { do { bool Enter = false; // if aborted then allow it to exit the loop so that it can complete and free-up thread resource. // this state means it has been removed from Pool already. if (taskItem.taskState == TaskState.aborted) break; if (taskItem.taskState == TaskState.notstarted) { taskItem.taskState = TaskState.processing; taskItem.startTime = DateTime.Now; Enter = true; } if (Enter) { TaskStatus taskStatus = new TaskStatus(); try { taskItem.taskHandle.task.Invoke(); // execute the UserTask taskStatus.Success = true; } catch (Exception ex) { taskStatus.Success = false; taskStatus.InnerException = ex; } if (taskItem.taskHandle.callback != null && taskItem.taskState != TaskState.aborted) { try { taskItem.taskState = TaskState.completed; taskItem.startTime = DateTime.MaxValue; taskItem.taskHandle.callback(taskStatus); // notify callback with task-status } catch { } } } // give other thread a chance to execute as it's current execution completed already Thread.Yield(); Thread.Sleep(MIN_WAIT); //TODO: need to see if Sleep is required here } while (true); // it's a continuous loop until task gets abort request }); taskItem.handler.Start(); Pool.Add(taskItem);}private void CleanupPool(){ throw new NotImplementedException();}

7. 用户任务队列实现

public ClientHandle QueueUserTask(UserTask task, Action
callback){ TaskHandle th = new TaskHandle() { task = task, Token = new ClientHandle() { ID = Guid.NewGuid() }, callback = callback }; ReadyQueue.Enqueue(th); return th.Token;}

8. 最后便是测试代码

CustomThreadPool MyPool;private void Form1_Load(object sender, EventArgs e){    MyPool = CustomThreadPool.Instance;}void showMessage(string message){    MessageBox.Show(message);}int x = 0;private void btnStart_Click(object sender, EventArgs e){    x++;    int arg = x;    MyPool.QueueUserTask(() =>         {             showMessage(arg.ToString());         },         (ts) =>         {             showMessage(ts.Success.ToString());         });}

 

 

 

 

 

 

 

 

 

 

 

转载于:https://www.cnblogs.com/davidgu/archive/2013/03/12/2955561.html

你可能感兴趣的文章
使用Gradle打RPM包
查看>>
“我意识到”的意义
查看>>
淘宝天猫上新辅助工具-新品填表
查看>>
再学 GDI+[43]: 文本输出 - 获取已安装的字体列表
查看>>
nginx反向代理
查看>>
操作系统真实的虚拟内存是什么样的(一)
查看>>
hadoop、hbase、zookeeper集群搭建
查看>>
python中一切皆对象------类的基础(五)
查看>>
modprobe
查看>>
android中用ExpandableListView实现三级扩展列表
查看>>
%Error opening tftp://255.255.255.255/cisconet.cfg
查看>>
java读取excel、txt 文件内容,传到、显示到另一个页面的文本框里面。
查看>>
《从零开始学Swift》学习笔记(Day 51)——扩展构造函数
查看>>
python多线程队列安全
查看>>
[汇编语言学习笔记][第四章第一个程序的编写]
查看>>
android 打开各种文件(setDataAndType)转:
查看>>
补交:最最原始的第一次作业(当时没有选上课,所以不知道)
查看>>
Vue实例初始化的选项配置对象详解
查看>>
PLM产品技术的发展趋势 来源:e-works 作者:清软英泰 党伟升 罗先海 耿坤瑛
查看>>
vue part3.3 小案例ajax (axios) 及页面异步显示
查看>>