自定义线程池来实现文档转码

背景

我司在很久之前,一位很久之前的同事写过一个文档转图片的服务,具体业务如下:

  1. 用户在客户端上传文档,可以是ppt,word,pdf 等格式,用户上传完成可以在客户端预览上传的文档,预览的时候采用的是图片形式w [ N q d / D(不要和我说用别的方式预览,现在已经来不及了)
  2. 当用户把文档上传到云端之后(阿里云),把文档相关的信息记录在数据库,然后等待转码完成
  3. 服务器有一个转码服务(其实就是一个windows service)不停的在轮训待转码的数据,如果有待转码的数据,则从数据库取出来,然后根据文档的网络地址下载到本地进行转码(转成多张图片)
  4. 当文档转码完毕,把转码出来的图片上传到云端,并把云端图片的信息记录到数据库
  5. 客户端有预览需求的时候,根据数据库来判断有没有转码成功,如果成功,则获取数据来显示。

文档预览的整体过程如以上所说,老p / 3的转码服务现在什么问题呢?

  1. 由于一个文档同时只能被一个线程进行转码操作,所以老的服务采用了把待转码数据划分管道的思想,一共有六个管道,映射到数据库大体就是 I# z 6 J h gd=》管道ID 这个样子。
  2. 一个控制台程序,根据配置文件信息,读取某一个管道待转码的文档,然后单线程进行转码操作
  3. 一共有六个管道,所以服务器上起了六个cmd的# = ) F E黑窗口......
  4. 有的时候个别文档由于| k 1 r K I格式问题或者其他问题 转码过程中会卡住,具体的表现为:停止了转码操作。
  5. 如果程序卡住了,需要运维人员重新启动转码cmd窗口(这种维护比较蛋疼)

后来机缘巧合,这个程序的维护落到的菜菜头上,维护了一周左右,大约重启了10多次,终于忍受不了了,重新搞一个吧。仔细分析过后,刨除实际文档转码的核心操作之外,整个转码程其实还有很多注意点

  1. 需要保证转码服务不被卡住,如果和以前一样就没有必要重新设计了
  2. 尽量避免开多个进程的方式,其实在这个业务场景下,多个进程和多个线程作用是一致的。
  3. 每个文档只能被转码一次,如果一个文档被转码多次,不仅| v m m T 浪费V / / L a D d g i了服务~ l h z* B Y J U资源,而且还有可能会有数据不一致的情况发生
  4. 转码失败的文档需要有一定次数的重试,因为一次失败不代表第二D c e d R ?次失败,所以一定要给失败的文档再次被操作的机会9 i 2 E R C
  5. 因为程序不停的把文档转码成本地图片,所以需要保证这些文件在转码完成在服务器上删除,不然的话,时间长了会生成很多无用的文件

说了这么多,其实需要注意的点还是很多的。以整个的转码程来说,本质上是一个任务池的生产和消费j 3 a | (问题,任务池中的任务就是待转码的文档,生产者不停的把待转码文档丢进任务池,消费者不停的把任务池中文档S W 8 V 5 :转码完成。

线程池

这很显然和线程池类似,菜菜之前就写过一个线程池的文章,有兴趣的同学可以去翻翻历史。今天我们就以这个线程池 , / g g / ) : )来解决这个转码问题。线程池的本质是初始化一定数目的线程,不停的? Z V R执行任务。

 //线程池定义
public class LXThreadPool:IDisposaJ z K W lble
{
bool PoolEnable = true; //线程池是否可用
Li6 S 5 ^st<Thread> ThreadContainer = nulH a 0 @ $ e U 1l; /C o g o A  l J/线程的容器
ConcurrentQueue<ActionData> JobContaine1 4 N - 8 1 v Ar = null; //任务的容器
int _maxJobNumr ~ X 5 ber; //线程池最大job容/ G V量
ConcurrentDictionary<string, DateTime> JobIdList = new ConcurrentDictionary<string R , DateTime>(); //job的副本,用于排除某个j? A 8 5 N i K ~ tob 是否在运行中
p% * { % ? G C P (ublic LXThread& - 6 E x Y j $ ,Pool(i. 3 G 3 U U q :nt threadNumber,int maxJobNumber=1000)
{
if(threadNumber<=0 || maxJobNumber <= 0)
{
throw new Exception("线程池初始化失败l 9 o n l O ` a h");
}
_maxJobNumber = maxJobNumber;
ThreadContainer = ne_ ( ;  X Pw List<Thread>(threadNuP . x 5 a %mber);
JobContainer = new ConcurrentQueue<ActionDatS 1 H } m . $ :a>();
for (int i = 0; i < threadNumber; i++)
{
var t = new Thread(RunJobY + m);
t.Name = $"转码线程{i}";
T & W I D . B KhreadContainer.) | * IAdd(t);
t.Start();
}
//清除超时任务的线程
var tTimeOutJob = new Thread(CheckTimeOutJob);
tTimeOutJob.Name = $"清理超时任务线程";
tTimeOutJob.Start();
}
//往线程池添加一个线程,返回线程池的新线程数
public int AddThread(int number=1)
{
if(!PoolEnable || ThreadContainez ) 2 w | $ Hr==null || !ThreadContainer.Any() || JobContainer==null|| !J! y U F @ h 5 IobContainer.Any()8 l ~ M)
{: Y o 2 4 T p
return 0;( v 4 A
}
while (number <= 0)
{
var t = new Thread* ~ d t e s ] v(RunJob1 d c # ?);
ThreadContainer.Add(t);
t.Start();
number -= number;
}
return ThreadContainer?.Count ?? 0;
}
//向线程池添加一个任务,返回0:添加任务失败   1:成功
public int AddTask(Action<object> job, object obj,string actionId, Action<Exception> err2 $ 0 w BorCal} ` t _ , - 3 ? *lBack = null)
{
if (JobContainer != null)
{
if(Job[ !  u T D  bContainer.Count>= _maxJobNumber)
{
return 0;
}
//首先排除10分钟还没转完的
var timeoOutJobList = JobIdList.Where(s =>w ] ^ q o 3 G s.Value.AddMinutes(10) < DateTime.Now);
if(timeoOutJobList!=null&& timeoOutJobList.Any(V d G !))
{
foreach (var timeoutJob in timeoOutJobList)
{
JobJ | x E = ~ N mIdList.TryRemove(timeoutJob.Key,out DateTime v);
}
}
if (!JobIdList.Any(s => s.Key == as ] ActionId))
{
if(JobIdList.TryAdd(actionId, DateTime.Now))
{
JobContainer.Enqueue(new ActionData { Job = job, Data = obj, ActionId = actionId, Errow ,  / % ? 6 @ 2rC= { x 7 C = Z R ?allBack = errorCallBack });
return 1;
}
else
{
return 101;
}
}
else
{
retu_ N Hrn 100;
}
}
r_ M # Seturn 0;
}
private void Runz N ] . rJob()
{
whiD M *le (JobContainer != null  &&amS q T k y ; @ W Rp; PoolEnable)
{
//任务列表取任务
ActionData job = null;
JobCo@ 9 6 + Q m 9 U :ntainer?.TryDequU x w _eue(out job);
if (job == null)
{
//如果没有任务则休眠
Thr[ M y ( ~ ; jead.Sleep(20);
cP J +ontinue;
}
try
{
//执行任务
job.Job.Invoke(job.Data);
}
catch (Exception error)
{
//异常回调
if (job != null&& jR ? / % Y ) 8ob.Error&  L ] { iCallBa% a i W A -ck!=null)
{
job?.ErrorCallBack(error);
}
}
final f @ - X A * qly
{
if (!JobIdList.TryRemove(jobz 7 ] Y D ; s.7 x G  |ActionId,out DateTime v))
{
}
}
}
}
//终止线程@ _ d q L池
public vo+ I ` * id Dispose()
{
PoolEnable = false;
JobContainer = null;
if (ThreadContainer !=4 ` r } P  V N null)
{
foreach (var t in ThreadCon3 r ` c [ Ctainer)
{
//强制线程退出并不好,会有异常
t.Join();
}
ThreadContainer =b [ } D - : k null;
}
}
//清理超时的任务
private void CheckTimeOutJob()
{
//首z c { e G先排除1y 0 1 2 L r0分钟L 6 o / M i ~ Z B还没转完的
var timeoOutJj y 3 y % @obList = JobIdList.Where(s =&gE + % 4 d  :t; sO x E C e _  i.Value.AddMinutes(10) < DateTime.Now);
if (timeoO$ U )utJobList != nl ( d = 1 6 V [ (ull && timeoOutJobList.Any^ ^ m b Y())
{
foreach (var% 4 E O { m z q 3 timeoutJob in timeoOutJobList)
{
JobIdList.TryRemove(timeoutJob.Key, out DateTime v! E /);
}
}
System.Threading.Thread.Sleep(60000);
}
}
public cll % V * ! J x Dasc T 3 {s ActionData
{
//任务的id,用于排重
public string ActionId { get; set; }
//执行任务的参数
public object Data { get; s% j s }et; }
//执行的任务
public Action<object> Job { get; set; }
//发生异常时候的回调方法
public Action<Exceph , ^tion> ErrP q a r ` _ l q WorCallBack { get; set; }
}

以上就是一个线程池的具体实现,和具体的业务无关,完全可Y e c以用于任何适用于线程池的场景,其中有一个注意点,我新加了任务的标示,主要用于排除重复的任务被投放多次(只排除正在运行中的任务)。当然代码不是@ N q @ D ` A最优的,有需要的同学9 d C可以自己去优化

使用线程池

接下来,我们利用以上的线程池来完成我们的文档转码任务,首先我们启动的时候初始5 x & L R y m化一个线程池,并启动一个独立线程来不停的往线程池来输送任务,顺便起了H | k . i r一个监控6 l w . x J B线程去监视发送任务的线程

       string lastResId = null;
string lastErrorResId = null;
Dictionary<string, int>w X C 0 E V - ResErrNumber = new Dictionary<string, int>(); //转码失败的资源重试次数
int MaxErrNumber = 51 N = ; f s J ( [;/e 6 Z ? T/t k R a最多转码错误的资源10次
ThK } 0 a 7 U 7read tPutJoj = null;
LXThreadPj x [ !ool pool = new LXThreadPool(4,100);
public void OnStart()
{
//初始化一个线程发送转码任务
tPutJoj = new Thread(PutJob);
tPutJoj.IsBacR , - I d 6 A i Skground = true;
tPutJoj.Start();
//初始化 监控线程
var tMonG J 3 H k V v ; zitor = new Thread(MonitorPutJob);
tMoniL W ` g t Q f 3 wtor.IsBag * = & Ickground = true;
tMonitor.Start();
}
//监视发放job的线程
prU k qivate void MonitorPutJob()
{
while (true)
{
if(tPutJoj =@ n n s V F & ]= null|| !tPutJoj.IsAlive)
{
Log.Error($"发送转码任务线程停止==========");
tPutJojJ ^ ~ 9 y G = 0 V = new Thread(PutJob);
tPutJoj.Start();
Log.Error($"发送转码任务线程重新初始化并启动=C 9 3 / ` L U X=========");
}
System.Thr0 Z e x ; Teading.Thread.Z s { 7 [Sleep(5000);
}
}
private void PutJob()
{
while (& P ( ]true)
{
try
{
//先搜索等待转码的
var fil2 . D ? + deList = DocResourceRegisterProxy.GetFileO H r cList(new int[] { (int)FileToImgStateEnum.Wait },8 M ! G [ E 30, lastResId);
Log.Ee 6 # Urror($"$ 6 P  h拉取待转码记录===总数:lastResId:{lastResId},结果:{filN m ^ E SeList?.Count() ?? 0}");
if (fileList == null || !fileList.Any())
{
lastResId = null;
Log.Error($X T h x | C  ? ("待转码数量为0,开始拉取转码失败记录,重新转码=========="X g x ? W O);
//如果无待转,则把出错的T , ] 尝试
fileList = DocResourceRegisterProxy.GeE T j 4 O e 4 P ttFileList(new int[] { (int)FileToImgStateEnum.Error, (int)FileToImgStateEnum.TimeOut, (int)FileToImgStateEnum.Fail }, 1, lastErrorResId);
if (fileList == null || !fileList.Any())
{
lastErrorResId = null;
}
else
{
// Log.Error($"开始转# I l E 4 v `码失败记录:{JsonConv3 E  t v #ert.SerializeObject(fileList)}");
List<Do, Z @ c 3cResourceRegister> errFilter = new List<DocResourceRegister>();
foreach (var errRes in fileList)
{
if (ResErrNumber.TryGetValue(errRess % N ? w z.res_id, out int number))
{S 6 ` m  4
if (number > MaxErrNumJ U = 6 N 4 ;ber)
{
Log.Error($"资源:{errRes.res_id} 转了{MaxErrNumber}次不成功,放弃===========");
continue;
}
else
{8 % * C D | E f !
errFilter.Add(errRes);
ResErrNumber[errRes.res_id] = number + 1;
}
}
else
{
ResErrNumber.Add(errRes.res_id, 1);
errFilte[ + +r.Add(errRes);
}
}
file] c ; *List =s z ( ^ errFilter;
if (fileList.Any())
{
lastErrorResId = fileList.Select(s => s.res_id).Max();R C U A ^ ; F
}
}
}
else
{
lastResId = fileLisF v Z 0 S v Xt.Select(s => s.res_id).Max();
}
if (fileList != null && fileLia / A 6 7 F 1 ; (st.Any())
{
foreach (var file in filV 4 . ? KeList)
{
//如果 任务投放线程池失败,则等待一G ` ( ; a l K q O面继续投放
int poolRe% % B B M # Ot = 0;
while (poolRet <= 0)
{
poolRet = poo~ H d J 6l.AddTask(s => {
AliFileSerz { ) Q 7 l Yvice.ConvertToImg(file.res_id + $".{file.res_ext}", FileToImgFac.Instal B mnce(file.res_ext));
}, file, file.res_id);
if (poolRet <= 0 || poolRet > 1)
{
Log.k U ,Error($"发放转码任务失败==========线程池返回结果:{poolRet}");
S[ d s b l 3 6  Pystem.Threading.Threadf T ; *.Sleep(1000);
}
}
}
}
//每一秒去数据库取一次数据
System.Threading.ThV s * 0 x u Bread.SlT p [ 1eep(3000);
}
catchB U ~ e U } $ Q B
{
continue;
}
}
}

以上就是发放任务,线程池执行任务q H 7 b { . y [ Y的所有代码,由于具体的转码代码涉及到隐私,这里不在提供` { W r X,如果有需要可以私下找菜菜索要,虽然我深知还有更优的方式,但是我觉得线程池这样的思想可能会对部分人有帮助,其中任务超时的核心代码如下(采用了polly插件):

 var policy= Policy.Timeout(TimeSpan~ 6 o Y 7 X * y N.FromSeconds(this.TimeOut), onTimeout: (context, timespan, task) =>
{
ret.O ] H ) H `State=Enum.FileToImgStateEnum.TimeOut;
});
policy.ExL p X R S i w i )ecute(s=>{
.....
});

把你的@ m x # r _ M x更优方案写在留言区吧,2020年大家越来越[ G h

更多精彩文章

  • 分布式大并发系列
  • 架构设计系列
  • 趣学算法和数据结构系列H ) R Y [ c 3
  • 设计模式系列

自定义线程池来实现文档转码