首页 > 代码库 > Nodejs - 框架类库 - Nodejs异步流程控制Async
Nodejs - 框架类库 - Nodejs异步流程控制Async
简介
Async是一个流程控制工具包,提供了直接而强大的异步功能
应用场景
业务流程逻辑复杂,适应异步编程,减少回调的嵌套
安装
npm insatll async
函数介绍
- Collections
each: 如果想对同一个集合中的所有元素都执行同一个异步操作。
1 var async = require(‘async‘); 2 3 var t = require(‘./t‘); 4 var log = t.log; 5 6 /** 7 * 8 * async提供了三种方式: 9 * 1. 集合中所有元素并行执行 10 * 2. 一个一个顺序执行 11 * 3. 分批执行,同一批内并行,批与批之间按顺序 12 * 13 * 如果中途出错,则错误将上传给最终的callback处理。其它已经启动的任务继续执行,未启动的忽略。 14 */ 15 // each(arr, iterator(item, callback), callback(err)) 16 17 18 var arr = [{name:‘Jack‘, delay: 200}, 19 {name:‘Mike‘, delay: 100}, 20 {name:‘Freewind‘, delay: 300}]; 21 22 /** 23 * 所有操作并发执行,且全部未出错,最终得到的err为undefined。注意最终callback只有一个参数err。 24 */ 25 // 1.1 26 async.each(arr, function(item, callback) { 27 log(‘1.1 enter: ‘ + item.name); 28 setTimeout(function(){ 29 log(‘1.1 handle: ‘ + item.name); 30 callback(null, item.name); 31 }, item.delay); 32 }, function(err) { 33 log(‘1.1 err: ‘ + err); 34 }); 35 // 输出如下: 36 // 42.244> 1.1 enter: Jack 37 // 42.245> 1.1 enter: Mike 38 // 42.245> 1.1 enter: Freewind 39 // 42.350> 1.1 handle: Mike 40 // 42.445> 1.1 handle: Jack 41 // 42.554> 1.1 handle: Freewind 42 // 42.554> 1.1 err: undefined 43 44 45 /** 46 * 如果中途出错,则出错后马上调用最终的callback。其它未执行完的任务继续执行。 47 */ 48 async.each(arr,function(item, callback) { 49 log(‘1.2 enter: ‘ +item.name); 50 setTimeout(function() { 51 log(‘1.2 handle: ‘ + item.name); 52 if(item.name===‘Jack‘) { 53 callback(‘myerr‘); 54 } 55 }, item.delay); 56 }, function(err) { 57 log(‘1.2 err: ‘ + err); 58 }); 59 // 输出如下: 60 // 42.246> 1.2 enter: Jack 61 // 42.246> 1.2 enter: Mike 62 // 42.246> 1.2 enter: Freewind 63 // 42.350> 1.2 handle: Mike 64 // 42.445> 1.2 handle: Jack 65 // 42.446> 1.2 err: myerr 66 // 42.555> 1.2 handle: Freewind 67 68 /** 69 * 与each相似,但不是并行执行。而是一个个按顺序执行。 70 */ 71 async.eachSeries(arr, function(item, callback) { 72 log(‘1.3 enter: ‘ + item.name); 73 setTimeout(function(){ 74 log(‘1.3 handle: ‘ + item.name); 75 callback(null, item.name); 76 }, item.delay); 77 }, function(err) { 78 log(‘1.3 err: ‘ + err); 79 }); 80 // 42.247> 1.3 enter: Jack 81 // 42.459> 1.3 handle: Jack 82 // 42.459> 1.3 enter: Mike 83 // 42.569> 1.3 handle: Mike 84 // 42.569> 1.3 enter: Freewind 85 // 42.883> 1.3 handle: Freewind 86 // 42.883> 1.3 err: undefined 87 88 /** 89 * 如果中途出错,则马上把错误传给最终的callback,还未执行的不再执行。 90 */ 91 async.eachSeries(arr,function(item, callback) { 92 log(‘1.4 enter: ‘ +item.name); 93 setTimeout(function() { 94 log(‘1.4 handle: ‘ + item.name); 95 if(item.name===‘Jack‘) { 96 callback(‘myerr‘); 97 } 98 }, item.delay); 99 }, function(err) {100 log(‘1.4 err: ‘ + err);101 });102 // 42.247> 1.4 enter: Jack103 // 42.460> 1.4 handle: Jack104 // 42.460> 1.4 err: myerr105 106 /**107 * 分批执行,第二个参数是每一批的个数。每一批内并行执行,但批与批之间按顺序执行。108 */109 async.eachLimit(arr, 2, function(item, callback) {110 log(‘1.5 enter: ‘ + item.name);111 setTimeout(function(){112 log(‘1.5 handle: ‘ + item.name);113 callback(null, item.name);114 }, item.delay);115 }, function(err) {116 log(‘1.5 err: ‘ + err);117 });118 // 42.247> 1.5 enter: Jack119 // 42.248> 1.5 enter: Mike120 // 42.351> 1.5 handle: Mike121 // 42.352> 1.5 enter: Freewind122 // 42.461> 1.5 handle: Jack123 // 42.664> 1.5 handle: Freewind124 // 42.664> 1.5 err: undefined125 126 /**127 * 如果中途出错,错误将马上传给最终的callback。同一批中的未执行完的任务还将继续执行,但下一批及以后的不再执行。128 */129 async.eachLimit(arr,2,function(item, callback) {130 log(‘1.6 enter: ‘ +item.name);131 setTimeout(function() {132 log(‘1.6 handle: ‘ + item.name);133 if(item.name===‘Jack‘) {134 callback(‘myerr‘);135 }136 }, item.delay);137 }, function(err) {138 log(‘1.6 err: ‘ + err);139 });140 // 42.248> 1.6 enter: Jack141 // 42.248> 1.6 enter: Mike142 // 42.352> 1.6 handle: Mike143 // 42.462> 1.6 handle: Jack144 // 42.462> 1.6 err: myerr
map: 对集合中的每一个元素,执行某个异步操作,得到结果。所有的结果将汇总到最终的callback里。与each的区别是,each只关心操作不管最后的值,而map关心的最后产生的值。
1 var async = require(‘async‘); 2 3 var t = require(‘./t‘); 4 var log = t.log; 5 6 /** 7 8 * 提供了两种方式: 9 * 1. 并行执行。同时对集合中所有元素进行操作,结果汇总到最终callback里。如果出错,则立刻返回错误以及已经执行完的任务的结果,未执行完的占个空位 10 * 2. 顺序执行。对集合中的元素一个一个执行操作,结果汇总到最终callback里。如果出错,则立刻返回错误以及已经执行完的结果,未执行的被忽略。 11 */ 12 // map(arr, iterator(item, callback), callback(err, results)) 13 14 var arr = [{name:‘Jack‘, delay:200}, {name:‘Mike‘, delay: 100}, {name:‘Freewind‘, delay:300}, {name:‘Test‘, delay: 50}]; 15 16 /** 17 * 所有操作均正确执行,未出错。所有结果按元素顺序汇总给最终的callback。 18 */ 19 // 1.1 20 async.map(arr, function(item, callback) { 21 log(‘1.1 enter: ‘ + item.name); 22 setTimeout(function() { 23 log(‘1.1 handle: ‘ + item.name); 24 callback(null, item.name + ‘!!!‘); 25 }, item.delay); 26 }, function(err,results) { 27 log(‘1.1 err: ‘, err); 28 log(‘1.1 results: ‘, results); 29 }); 30 // 54.569> 1.1 enter: Jack 31 // 54.569> 1.1 enter: Mike 32 // 54.569> 1.1 enter: Freewind 33 // 54.569> 1.1 enter: Test 34 // 54.629> 1.1 handle: Test 35 // 54.679> 1.1 handle: Mike 36 // 54.789> 1.1 handle: Jack 37 // 54.879> 1.1 handle: Freewind 38 // 54.879> 1.1 err: 39 // 54.879> 1.1 results: [ ‘Jack!!!‘, ‘Mike!!!‘, ‘Freewind!!!‘, ‘Test!!!‘ ] 40 41 /** 42 * 如果中途出错,立刻将错误、以及已经执行完成的结果汇总给最终callback。未执行完的将会在结果数组中用占个空位。 43 */ 44 async.map(arr, function(item, callback) { 45 log(‘1.2 enter: ‘ + item.name); 46 setTimeout(function() { 47 log(‘1.2 handle: ‘ + item.name); 48 if(item.name===‘Jack‘) callback(‘myerr‘); 49 else callback(null, item.name+‘!!!‘); 50 }, item.delay); 51 }, function(err, results) { 52 log(‘1.2 err: ‘, err); 53 log(‘1.2 results: ‘, results); 54 }); 55 // 54.569> 1.2 enter: Jack 56 // 54.569> 1.2 enter: Mike 57 // 54.569> 1.2 enter: Freewind 58 // 54.569> 1.2 enter: Test 59 // 54.629> 1.2 handle: Test 60 // 54.679> 1.2 handle: Mike 61 // 54.789> 1.2 handle: Jack 62 // 54.789> 1.2 err: myerr 63 // 54.789> 1.2 results: [ undefined, ‘Mike!!!‘, , ‘Test!!!‘ ] 64 // 54.879> 1.2 handle: Freewind 65 66 /** 67 * 顺序执行,一个完了才执行下一个。 68 */ 69 async.mapSeries(arr, function(item, callback) { 70 log(‘1.3 enter: ‘ + item.name); 71 setTimeout(function() { 72 log(‘1.3 handle: ‘ + item.name); 73 callback(null, item.name+‘!!!‘); 74 }, item.delay); 75 }, function(err,results) { 76 log(‘1.3 err: ‘, err); 77 log(‘1.3 results: ‘, results); 78 }); 79 // 54.569> 1.3 enter: Jack 80 // 54.789> 1.3 handle: Jack 81 // 54.789> 1.3 enter: Mike 82 // 54.899> 1.3 handle: Mike 83 // 54.899> 1.3 enter: Freewind 84 // 55.209> 1.3 handle: Freewind 85 // 55.209> 1.3 enter: Test 86 // 55.269> 1.3 handle: Test 87 // 55.269> 1.3 err: 88 // 55.269> 1.3 results: [ ‘Jack!!!‘, ‘Mike!!!‘, ‘Freewind!!!‘, ‘Test!!!‘ ] 89 90 /** 91 * 顺序执行过程中出错,只把错误以及执行完的传给最终callback,未执行的忽略。 92 */ 93 async.mapSeries(arr, function(item, callback) { 94 log(‘1.4 enter: ‘ + item.name); 95 setTimeout(function() { 96 log(‘1.4 handle: ‘ + item.name); 97 if(item.name===‘Mike‘) callback(‘myerr‘); 98 else callback(null, item.name+‘!!!‘); 99 }, item.delay);100 }, function(err, results) {101 log(‘1.4 err: ‘, err);102 log(‘1.4 results: ‘, results);103 });104 // 47.616> 1.4 enter: Jack105 // 47.821> 1.4 handle: Jack106 // 47.821> 1.4 enter: Mike107 // 47.931> 1.4 handle: Mike108 // 47.931> 1.4 err: myerr109 // 47.932> 1.4 results: [ ‘Jack!!!‘, undefined ]110 111 /**112 * 并行执行,同时最多2个函数并行,传给最终callback。113 */114 //1.5115 async.mapLimit(arr,2, function(item, callback) {116 log(‘1.5 enter: ‘ + item.name);117 setTimeout(function() {118 log(‘1.5 handle: ‘ + item.name);119 if(item.name===‘Jack‘) callback(‘myerr‘);120 else callback(null, item.name+‘!!!‘);121 }, item.delay);122 }, function(err, results) {123 log(‘1.5 err: ‘, err);124 log(‘1.5 results: ‘, results);125 });126 //57.797> 1.5 enter: Jack127 //57.800> 1.5 enter: Mike128 //57.900> 1.5 handle: Mike129 //57.900> 1.5 enter: Freewind130 //58.008> 1.5 handle: Jack131 //58.009> 1.5 err: myerr132 //58.009> 1.5 results: [ undefined, ‘Mike!!!‘ ]133 //58.208> 1.5 handle: Freewind134 //58.208> 1.5 enter: Test135 //58.273> 1.5 handle: Test
filter: 使用异步操作对集合中的元素进行筛选, 需要注意的是,iterator的callback只有一个参数,只能接收true或false。
reject: reject跟filter正好相反,当测试为true时则抛弃
1 var async = require(‘async‘); 2 3 var t = require(‘./t‘); 4 var log = t.log; 5 6 /** 7 * 对于出错,该函数没有做出任何处理,直接由nodejs抛出。所以需要注意对Error的处理。 8 * 9 * async提供了两种方式: 10 * 1. 并行执行:filter 11 * 2. 顺序执行:filterSereis 12 */ 13 // filter(arr, iterator(item, callback(test)), callback(results)) 14 15 var arr = [1,2,3,4,5]; 16 17 /** 18 * 并行执行,对arr进行筛选。 19 */ 20 async.filter(arr, function(item, callback) { 21 log(‘1.1 enter: ‘ + item); 22 setTimeout(function() { 23 log(‘1.1 test: ‘ + item); 24 callback(item>=3); 25 }, 200); 26 }, function(results) { 27 log(‘1.1 results: ‘, results); 28 }); 29 //16.739> 1.1 enter: 1 30 //16.749> 1.1 enter: 2 31 //16.749> 1.1 enter: 3 32 //16.749> 1.1 enter: 4 33 //16.749> 1.1 enter: 5 34 //16.749> 1.3 enter: 1 35 //16.949> 1.1 test: 1 36 //16.949> 1.1 test: 2 37 //16.949> 1.1 test: 3 38 //16.949> 1.1 test: 4 39 //16.949> 1.1 test: 5 40 //16.949> 1.1 results: [ 3, 4, 5 ] 41 42 43 /** 44 * 串行执行,对arr进行筛选。 45 */ 46 // 1.3 47 async.filterSeries(arr, function(item, callback) { 48 log(‘1.3 enter: ‘ + item); 49 setTimeout(function() { 50 log(‘1.3 handle: ‘ + item); 51 callback(item>=3); 52 }, 200); 53 }, function(results) { 54 log(‘1.3 results: ‘, results); 55 }); 56 // 16.749> 1.3 enter: 1 57 // 16.949> 1.3 handle: 1 58 // 16.949> 1.3 enter: 2 59 // 17.149> 1.3 handle: 2 60 // 17.149> 1.3 enter: 3 61 // 17.369> 1.3 handle: 3 62 // 17.369> 1.3 enter: 4 63 // 17.589> 1.3 handle: 4 64 // 17.589> 1.3 enter: 5 65 // 17.789> 1.3 handle: 5 66 // 17.789> 1.3 results: [ 3, 4, 5 ] 67 68 69 /* 70 * 并行reject 71 */ 72 // reject(arr, iterator(item, callback(test)), callback(results) 73 async.reject(arr, function(item, callback) { 74 log(‘1.4 enter: ‘ + item); 75 setTimeout(function() { 76 log(‘1.4 test: ‘ + item); 77 callback(item>=3); 78 }, 200); 79 }, function(results) { 80 log(‘1.4 results: ‘, results); 81 }); 82 // 31.359> 1.4 enter: 1 83 // 31.359> 1.4 enter: 2 84 // 31.359> 1.4 enter: 3 85 // 31.359> 1.4 enter: 4 86 // 31.359> 1.4 enter: 5 87 // 31.559> 1.4 test: 1 88 // 31.559> 1.4 test: 2 89 // 31.559> 1.4 test: 3 90 // 31.559> 1.4 test: 4 91 // 31.559> 1.4 test: 5 92 // 31.569> 1.4 results: [ 1, 2 ] 93 94 95 /** 96 * 串行执行,对arr进行筛选。 97 */ 98 // 1.3 99 async.rejectSeries(arr, function(item, callback) {100 log(‘1.5 enter: ‘ + item);101 setTimeout(function() {102 log(‘1.5 handle: ‘ + item);103 callback(item>=3);104 }, 200);105 }, function(results) {106 log(‘1.5 results: ‘, results);107 });108 //43.592> 1.5 enter: 1109 //43.799> 1.5 handle: 1110 //43.800> 1.5 enter: 2111 //44.004> 1.5 handle: 2112 //44.007> 1.5 enter: 3113 //44.210> 1.5 handle: 3114 //44.211> 1.5 enter: 4115 //44.412> 1.5 handle: 4116 //44.413> 1.5 enter: 5117 //44.614> 1.5 handle: 5118 //44.616> 1.5 results: [ 1, 2 ]
reduce: 可以让我们给定一个初始值,用它与集合中的每一个元素做运算,最后得到一个值。reduce从左向右来遍历元素,如果想从右向左,可使用reduceRight。
1 var async = require(‘async‘); 2 3 var t = require(‘./t‘); 4 var log = t.log; 5 6 7 //reduce(arr, memo, iterator(memo,item,callback), callback(err,result)) 8 //alias: inject, foldl 9 10 var arr = [1,3,5];11 12 /**13 * 顺序执行14 */15 async.reduce(arr, 100, function(memo, item, callback) {16 log(‘1.1 enter: ‘ + memo +‘, ‘ + item);17 setTimeout(function() {18 callback(null, memo+item);19 }, 100);20 },function(err, result) {21 log(‘1.1 err: ‘, err);22 log(‘1.1 result: ‘, result);23 });24 // 28.789> 1.1 enter: 100, 125 // 28.889> 1.1 enter: 101, 326 // 28.999> 1.1 enter: 104, 527 // 29.109> 1.1 err:28 // 29.109> 1.1 result: 10929 30 /**31 * 顺序执行过程中出错,只把错误传给最终callback,结果是null32 */33 async.reduce(arr, 100, function(memo, item, callback) {34 log(‘1.2 enter: ‘ + memo +‘, ‘ + item);35 setTimeout(function() {36 if(item===3) callback(‘myerr‘);37 else callback(null, memo+item);38 }, 100);39 },function(err, result) {40 log(‘1.2 err: ‘, err);41 log(‘1.2 result: ‘, result);42 });43 // 05.541> 1.2 enter: 100, 144 // 05.649> 1.2 enter: 101, 345 // 05.760> 1.2 err: myerr46 // 05.760> 1.2 result:47 48 /**49 * 顺序执行从右向左50 *51 * alias: foldr52 */53 async.reduceRight(arr, 100, function(memo, item, callback) {54 log(‘1.3 enter: ‘ + memo +‘, ‘ + item);55 setTimeout(function() {56 callback(null, memo+item);57 }, 100);58 },function(err, result) {59 log(‘1.3 err: ‘, err);60 log(‘1.3 result: ‘, result);61 });62 // 28.789> 1.3 enter: 100, 563 // 28.889> 1.3 enter: 105, 364 // 28.999> 1.3 enter: 108, 165 // 29.109> 1.3 err:66 // 29.109> 1.3 result: 109
detect: 用于取得集合中满足条件的第一个元素。
1 var async = require(‘async‘); 2 3 var t = require(‘./t‘); 4 var log = t.log; 5 6 // detect(array, iterator(item,callback(test)), callback(result) 7 8 var arr = [ 9 {value:1,delay:500},10 {value:2,delay:200},11 {value:3,delay:300}12 ];13 14 /**15 * 并行执行,通过t.inc做一个累加器,得到第一个满足条件的结果对象16 */17 async.detect(arr, function(item,callback){18 log(‘1.1 enter: ‘, item.value);19 t.inc(item.value, function(err,n) {20 log(‘1.1 handle: ‘, item.value);21 callback(n%2===0);22 }, item.delay);23 }, function(result) {24 log(‘1.1 result: ‘, result);25 });26 // 09.928> 1.1 enter: 127 // 09.928> 1.1 enter: 228 // 09.928> 1.1 enter: 329 // 10.138> 1.1 handle: 230 // 10.228> 1.1 handle: 331 // 10.228> 1.1 result: { value: 3, delay: 300 }32 // 10.438> 1.1 handle: 133 // 10.438> 1.1 handle: 134 35 /**36 * 串行执行,通过t.inc做一个累加器,得到第一个满足条件的结果对象37 */38 async.detectSeries(arr, function(item,callback) {39 log(‘1.2 enter: ‘, item.value);40 t.inc(item.value, function(err,n) {41 log(‘1.1 handle: ‘, item.value);42 callback(n%2===0);43 }, item.delay);44 }, function(result) {45 log(‘1.2 result: ‘, result);46 });47 // 09.928> 1.2 enter: 148 // 10.438> 1.2 result: { value: 1, delay: 500 }
sortBy: 对集合内的元素进行排序,依据每个元素进行某异步操作后产生的值,从小到大排序。
1 var async = require(‘async‘); 2 3 var t = require(‘./t‘); 4 var log = t.log; 5 6 // sortBy(array, iterator(item,callback(err,result)), callback(err,results)) 7 8 var arr = [3,6,1]; 9 10 /**11 * 通过异步迭代器,对集合进行排序12 */13 async.sortBy(arr, function(item, callback) {14 setTimeout(function() {15 callback(null,item);16 }, 200);17 }, function(err,results) {18 log(‘1.1 err: ‘, err);19 log(‘1.1 results: ‘, results);20 });21 // 26.562> 1.1 err: null22 // 26.562> 1.1 results: [ 1, 3, 6 ]23 24 /**25 * 迭代出错,callback返回err,没有results26 */27 async.sortBy(arr, function(item, callback) {28 setTimeout(function() {29 if(item===6) callback(‘myerr‘);30 else callback(null,item);31 }, 200);32 }, function(err,results) {33 log(‘1.2 err: ‘, err);34 log(‘1.2 results: ‘, results);35 });36 // 26.572> 1.2 err: myerr37 // 26.572> 1.2 results:
some: 当集合中是否有至少一个元素满足条件时,即最终callback得到的值为true,否则为false.
1 var async = require(‘async‘); 2 3 var t = require(‘./t‘); 4 var log = t.log; 5 6 // some(arr, iterator(item,callback(test)), callback(result)) 7 //alias: any 8 9 var arr = [1,2,3,6];10 11 /**12 * 串行执行,集合中至少有一个元素<=3,所以结果为true13 */14 // 1.115 async.some(arr, function(item,callback){16 log(‘1.1 enter: ‘,item);17 setTimeout(function(){18 log(‘1.1 handle: ‘,item);19 callback(item<=3);20 },100);21 }, function(result) {22 log(‘1.1 result: ‘, result);23 });24 // 36.165> 1.1 enter: 125 // 36.165> 1.1 enter: 226 // 36.165> 1.1 enter: 327 // 36.165> 1.1 enter: 628 // 36.275> 1.1 handle: 129 // 36.275> 1.1 result: true30 // 36.275> 1.1 handle: 231 // 36.275> 1.1 handle: 332 // 36.275> 1.1 handle: 633 34 35 /**36 * 串行执行,集合中没有一个元素>10,所以结果为false37 */38 async.some(arr, function(item,callback){39 log(‘1.2 enter: ‘,item);40 setTimeout(function(){41 log(‘1.2 handle: ‘,item);42 callback(item>10);43 },100); 44 }, function(result) {45 log(‘1.2 result: ‘, result);46 });47 // 36.165> 1.2 enter: 148 // 36.165> 1.2 enter: 249 // 36.165> 1.2 enter: 350 // 36.165> 1.2 enter: 651 // 36.275> 1.2 handle: 152 // 36.275> 1.2 handle: 253 // 36.275> 1.2 handle: 354 // 36.275> 1.2 handle: 655 // 36.275> 1.2 result: false
every: 如果集合里每一个元素都满足条件,则传给最终回调的result为true,否则为false
1 var async = require(‘async‘); 2 3 var t = require(‘./t‘); 4 var log = t.log; 5 6 // every(arr, iterator(item,callback), callback(result)) 7 //alias: all 8 9 var arr = [1,2,3,6];10 11 /**12 * 串行执行,集合中所有的元素都<=10,所以为true13 */14 async.every(arr, function(item,callback){15 log(‘1.1 enter: ‘,item);16 setTimeout(function(){17 log(‘1.1 handle: ‘,item);18 callback(item<=10);19 },100); 20 }, function(result) {21 log(‘1.1 result: ‘, result);22 });23 // 32.113> 1.1 enter: 124 // 32.123> 1.1 enter: 225 // 32.123> 1.1 enter: 326 // 32.123> 1.1 enter: 627 // 32.233> 1.1 handle: 128 // 32.233> 1.1 handle: 229 // 32.233> 1.1 handle: 330 // 32.233> 1.1 handle: 631 // 32.233> 1.1 result: true32 33 /**34 * 串行执行,集合中至少有一个元素不大于2,所以为false35 */36 async.every(arr, function(item,callback){37 log(‘1.2 enter: ‘,item);38 setTimeout(function(){39 log(‘1.2 handle: ‘,item);40 callback(item>2);41 },100); 42 }, function(result) {43 log(‘1.2 result: ‘, result);44 });45 // 32.123> 1.2 enter: 146 // 32.123> 1.2 enter: 247 // 32.123> 1.2 enter: 348 // 32.123> 1.2 enter: 649 // 32.233> 1.2 handle: 150 // 32.233> 1.2 result: false51 // 32.233> 1.2 handle: 252 // 32.233> 1.2 handle: 353 // 32.233> 1.2 handle: 6
concat: 将多个异步操作的结果合并为一个数组。
1 var async = require(‘async‘); 2 3 var t = require(‘./t‘); 4 var log = t.log; 5 6 // concat(arr, iterator(item,callback(err,result)), callback(err,result)) 7 8 var data =http://www.mamicode.com/ { 9 aaa: [11,22,33],10 bbb: [44,55],11 ccc: 6612 };13 14 var keys = [15 {name: ‘aaa‘, delay: 300},16 {name: ‘bbb‘, delay: 100},17 {name: ‘ccc‘, delay: 200}18 ];19 20 /**21 * 以并行方式对集合中各元素进行异步操作,然后把得到的结果合并为一个数组,传给最后的callback。22 */23 // 1.124 async.concat(keys, function(key,callback) {25 setTimeout(function() {26 callback(null, data[key.name]);27 }, key.delay);28 }, function(err, values) {29 log(‘1.1 err: ‘, err);30 log(‘1.1 values: ‘, values); 31 });32 // 13.539> 1.1 err:33 // 13.539> 1.1 values: [ 44, 55, 66, 11, 22, 33 ]34 35 /**36 * 如果中途出错,则把错误以及已经完成的操作的结果交给最后callback。未执行完的则忽略。37 */38 // 1.239 async.concat(keys, function(key,callback) {40 setTimeout(function() {41 if(key.name===‘ccc‘) callback(‘myerr‘);42 else callback(null, data[key.name]);43 }, key.delay);44 }, function(err, values) {45 log(‘1.2 err: ‘, err);46 log(‘1.2 values: ‘, values); 47 });48 // 13.439> 1.2 err: myerr49 // 13.439> 1.2 values: [ 44, 55 ]50 51 /**52 * 按数组中的元素顺序来执行异步操作,一个完成后才对下一个进行操作。所有结果会汇集成一个数组交给最后的callback。53 */54 // concatSeries(arr, iterator, callback)55 56 // 1.357 async.concatSeries(keys, function(key,callback) {58 setTimeout(function() {59 callback(null, data[key.name]);60 }, key.delay);61 }, function(err, values) {62 log(‘1.3 err: ‘, err);63 log(‘1.3 values: ‘, values); 64 });65 // 13.859> 1.3 err:66 // 13.859> 1.3 values: [ 11, 22, 33, 44, 55, 66 ]
- ControlFlow
series: 串行执行,一个函数数组中的每个函数,每一个函数执行完成之后才能执行下一个函数。
1 var async = require(‘async‘); 2 3 var t = require(‘./t‘); 4 var log = t.log; 5 6 /** 7 * 如果任何一个函数向它的回调函数中传了一个error,则后面的函数都不会被执行,并且会立刻将该error以及已经执行了的函数的结果,传给series中最后那个callback。 8 * 当所有的函数执行完后(没有出错),则会把每个函数传给其回调函数的结果合并为一个数组,传给series最后的那个callback。 9 * 还可以json的形式来提供tasks。每一个属性都会被当作函数来执行,并且结果也会以json形式传给series最后的那个callback。这种方式可读性更高一些。10 */11 // series(tasks, [callback])12 13 /**14 * 全部函数都正常执行。每个函数产生的值将按顺序合并为一个数组,传给最终的callback。15 */16 // 1.117 async.series([18 function(cb) { t.inc(3, cb); },19 function(cb) { t.inc(8, cb); },20 function(cb) { t.inc(2, cb); }21 ], function(err, results) {22 log(‘1.1 err: ‘, err);23 log(‘1.1 results: ‘, results);24 });25 //05.155> 1.1 err: null26 //05.156> 1.1 results: [ 4, 9, 3 ]27 28 /**29 * 中间有函数出错。出错之后的函数不会执行,错误及之前正常执行的函数结果将传给最终的callback。30 */31 // 1.232 async.series([33 function(cb) { t.inc(3, cb); },34 function(cb) { t.err(‘test_err‘, cb); },35 function(cb) { t.inc(8, cb); }36 ], function (err, results) {37 log(‘1.2 err: ‘, err);38 log(‘1.2 results: ‘, results);39 });40 //04.964> 1.2 err: test_err41 //04.973> 1.2 results: [ 4, undefined ]42 43 /**44 * 如果某个函数传的数据是undefined, null, {}, []等,它们会原样传给最终callback。45 */46 // 1.347 async.series([48 function(cb) { t.fire(3, cb);},49 function(cb) { t.fire(undefined, cb); },50 function(cb) { t.fire(null, cb); },51 function(cb) { t.fire({}, cb); },52 function(cb) { t.fire([], cb); },53 function(cb) { t.fire(‘abc‘, cb) }54 ], function(err, results) {55 log(‘1.3 err: ‘, err);56 log(‘1.3 results: ‘, results);57 });58 //05.794> 1.3 err: null59 //05.795> 1.3 results: [ 3, undefined, null, {}, [], ‘abc‘ ]60 61 /**62 * 以json形式传入tasks。其结果也将以json形式传给最终callback。63 */64 async.series({65 a: function(cb) { t.inc(3, cb); },66 b: function(cb) { t.fire(undefined, cb); },67 c: function (cb) { t.err(‘myerr‘, cb); },68 d: function (cb) { t.inc(8, cb); }69 }, function (err, results) {70 log(‘1.4 err: ‘, err);71 log(‘1.4 results: ‘, results);72 });73 //05.178> 1.4 err: myerr74 //05.179> 1.4 results: { a: 4, b: undefined, c: undefined }
parallel: 并行执行多个函数,每个函数都是立即执行,不需要等待其它函数先执行。传给最终callback的数组中的数据按照tasks中声明的顺序,而不是执行完成的顺序。
1 var async = require(‘async‘); 2 var t = require(‘./t‘); 3 var log = t.log; 4 5 /** 6 * 如果某个函数出错,则立刻将err和已经执行完的函数的结果值传给parallel最终的callback。其它未执行完的函数的值不会传到最终数据,但要占个位置。 7 * 同时支持json形式的tasks,其最终callback的结果也为json形式。 8 */ 9 // parallel(tasks, [callback])10 11 /**12 * 并行执行多个函数,每个函数的值将按函数声明的先后顺序汇成一个数组,传给最终callback。13 */14 // 1.115 async.parallel([16 function(cb) { t.fire(‘a400‘, cb, 400) },17 function(cb) { t.fire(‘a200‘, cb, 200) },18 function(cb) { t.fire(‘a300‘, cb, 300) }19 ], function (err, results) {20 log(‘1.1 err: ‘, err);21 log(‘1.1 results: ‘, results);22 });23 //36.929> 1.1 err: null24 //36.930> 1.1 results: [ ‘a400‘, ‘a200‘, ‘a300‘ ]25 26 /**27 * 如果中途有个函数出错,则将该err和已经完成的函数值汇成一个数组,传给最终的callback。还没有执行完的函数的值将被忽略,但要在最终数组中占个位置28 */29 // 1.230 async.parallel([31 function(cb) { log(‘1.2.1: ‘, ‘start‘); t.fire(‘a400‘, cb, 400) }, // 该函数的值不会传给最终callback,但要占个位置32 function(cb) { log(‘1.2.2: ‘, ‘start‘); t.err(‘e200‘, cb, 200) },33 function(cb) { log(‘1.2.3: ‘, ‘start‘); t.fire(‘a100‘, cb, 100) }34 ], function(err, results) {35 log(‘1.2 err: ‘, err);36 log(‘1.2 results: ‘, results);37 });38 //36.537> 1.2.1: start39 //36.540> 1.2.2: start40 //36.541> 1.2.3: start41 //36.741> 1.2 err: e20042 //36.744> 1.2 results: [ , undefined, ‘a100‘ ]43 44 /**45 * 以json形式传入tasks,最终results也为json46 */47 // 1.348 async.parallel({49 a: function(cb) { t.fire(‘a400‘, cb, 400) },50 b: function(cb) { t.fire(‘c300‘, cb, 300) }51 }, function(err, results) {52 log(‘1.3 err: ‘, err);53 log(‘1.3 results: ‘, results);54 });55 //36.943> 1.3 err: null56 //36.944> 1.3 results: { b: ‘c300‘, a: ‘a400‘ }57 58 /**59 * 如果中途出错,会将err与已经完成的函数值(汇成一个json)传给最终callback。未执行完成的函数值被忽略,不会出现在最终json中。60 */61 // 1.462 async.parallel({63 a: function(cb) { t.fire(‘a400‘, cb, 400) }, // 该函数的值不会传给最终的callback64 b: function(cb) { t.err(‘e300‘, cb, 300) },65 c: function(cb) { t.fire(‘c200‘, cb, 200) }66 }, function(err, results) {67 log(‘1.4 err: ‘, err);68 log(‘1.4 results: ‘, results);69 });70 //36.853> 1.4 err: e30071 //36.854> 1.4 results: { c: ‘c200‘, b: undefined }72 73 /**74 * 并行执行,同时最多2个函数并行,传给最终callback。75 */76 //1.577 async.parallelLimit({78 a:function(cb) { log(‘a start‘); t.fire(‘a400‘, cb, 200) },79 b:function(cb) { log(‘b start‘); t.fire(‘b200‘, cb, 200) },80 c:function(cb) { log(‘c start‘); t.fire(‘c100‘, cb, 100) },81 d:function(cb) { log(‘d start‘); t.fire(‘d600‘, cb, 600) },82 e:function(cb) { log(‘e start‘); t.fire(‘e300‘, cb, 300) }83 },2, function(err, results) {84 log(‘1.5 err: ‘, err);85 log(‘1.5 results: ‘, results);86 });87 //26.993> a start88 //26.996> b start89 //27.200> c start90 //27.202> d start91 //27.313> e start92 //27.809> 1.5 err:93 //27.810> 1.5 results: { a: ‘a400‘, b: ‘b200‘, c: ‘c100‘, e: ‘e300‘, d: ‘d600‘ }
whilst: 相当于while,但其中的异步调用将在完成后才会进行下一次循环。
doWhilst: 相当于do…while, doWhilst交换了fn,test的参数位置,先执行一次循环,再做test判断。
until: until与whilst正好相反,当test为false时循环,与true时跳出。其它特性一致。
doUntil: doUntil与doWhilst正好相反,当test为false时循环,与true时跳出。其它特性一致。
forever: 无论条件循环执行,如果不出错,callback永远不被执行。
1 var async = require(‘async‘); 2 var t = require(‘./t‘); 3 var log = t.log; 4 5 /** 6 * 该函数的功能比较简单,条件变量通常定义在外面,可供每个函数访问。在循环中,异步调用时产生的值实际上被丢弃了,因为最后那个callback只能传入错误信息。 7 * 8 * 另外,第二个函数fn需要能接受一个函数cb,这个cb最终必须被执行,用于表示出错或正常结束。 9 */ 10 // whilst(test, fn, callback) 11 12 /** 13 * 正常情况,没有出错。第二个函数虽然是异步调用,但被同步执行。所以第三个函数被调用时,已经过了3秒。 14 */ 15 // 1.1 16 var count1 = 0; 17 async.whilst( 18 function() { return count1 < 3 }, 19 function(cb) { 20 log(‘1.1 count: ‘, count1); 21 count1++; 22 setTimeout(cb, 1000); 23 }, 24 function(err) { 25 // 3s have passed 26 log(‘1.1 err: ‘, err); 27 } 28 ); 29 //10.318> 1.1 count: 0 30 //11.330> 1.1 count: 1 31 //12.342> 1.1 count: 2 32 //13.356> 1.1 err: 33 34 35 /** 36 * 中途出错。出错后立刻调用第三个函数。 37 */ 38 // 1.2 39 var count2 = 0; 40 async.whilst( 41 function() { return count2 < 3 }, 42 function(cb) { 43 log(‘1.2 count: ‘, count2); 44 if(count2===1) { 45 t.err(‘myerr‘, cb, 200); 46 } else { 47 count2++; 48 setTimeout(cb, 1000); 49 } 50 }, 51 function(err) { 52 // 2s have passed 53 log(‘1.2 err: ‘, err); 54 } 55 ); 56 //12.805> 1.2 count: 0 57 //13.824> 1.2 count: 1 58 //14.026> 1.2 err: myerr 59 60 /** 61 * 第二个函数即使产生值,也会被忽略。第三个函数只能得到err。 62 */ 63 // 1.3 64 var count3 = 0; 65 async.whilst( 66 function() {return count3 < 3 }, 67 function(cb) { 68 log(‘1.3 count: ‘, count3); 69 t.inc(count3++, cb); 70 }, 71 function(err,result){ // result没有用 72 log(‘1.3 err: ‘, err); 73 log(‘1.3 result: ‘, result); 74 } 75 ); 76 //45.311> 1.3 count: 0 77 //45.514> 1.3 count: 1 78 //45.718> 1.3 count: 2 79 //45.920> 1.3 err: 80 //45.923> 1.3 result: 81 82 /** 83 * doWhilst交换了fn,test的参数位置,先执行一次循环,再做test判断。 和javascript中do..while语法一致。 84 */ 85 // doWhilst(fn, test, callback) 86 //1.4 87 var count4 = 0; 88 async.doWhilst( 89 function(cb) { 90 log(‘1.4 count: ‘, count4); 91 t.inc(count4++, cb); 92 }, 93 function() { log("1.4 test"); return count4 < 3 }, 94 function(err,result){ // result没有用 95 log(‘1.4 err: ‘, err); 96 log(‘1.4 result: ‘, result); 97 } 98 ); 99 //33.643> 1.4 count: 0100 //33.848> 1.4 test101 //33.850> 1.4 count: 1102 //34.054> 1.4 test103 //34.057> 1.4 count: 2104 //34.269> 1.4 test105 //34.270> 1.4 err:106 //34.270> 1.4 result:107 108 /**109 * until与whilst正好相反,当test为false时循环,与true时跳出。其它特性一致。110 */111 // 1.5112 var count5 = 0;113 async.until(114 function() { return count5>3 },115 function(cb) {116 log(‘1.5 count: ‘, count5);117 count5++;118 setTimeout(cb, 200);119 },120 function(err) {121 // 4s have passed122 log(‘1.5 err: ‘,err);123 }124 );125 //42.498> 1.5 count: 0126 //42.701> 1.5 count: 1127 //42.905> 1.5 count: 2128 //43.107> 1.5 count: 3129 //43.313> 1.5 err:130 131 /**132 * doUntil与doWhilst正好相反,当test为false时循环,与true时跳出。其它特性一致。133 */134 // doUntil(fn, test, callback)135 // 1.6136 var count6 = 0;137 async.doUntil(138 function(cb) {139 log(‘1.6 count: ‘, count6);140 count6++;141 setTimeout(cb, 200);142 },143 function() { log(‘1.6 test‘);return count6>3 },144 function(err) {145 // 4s have passed146 log(‘1.6 err: ‘,err);147 }148 );149 //41.831> 1.6 count: 0150 //42.035> 1.6 test151 //42.037> 1.6 count: 1152 //42.241> 1.6 test153 //42.244> 1.6 count: 2154 //42.456> 1.6 test155 //42.457> 1.6 count: 3156 //42.660> 1.6 test157 //42.661> 1.6 err:158 159 /**160 * forever,无论条件循环执行,如果不出错,callback永远不被执行161 */162 //forever(fn, callback)163 //1.7164 var count7 = 0;165 async.forever(166 function(cb) {167 log(‘1.7 count: ‘, count7);168 count7++;169 setTimeout(cb, 200);170 },171 function(err) {172 log(‘1.7 err: ‘,err);173 }174 );175 //52.770> 1.7 count: 0176 //52.973> 1.7 count: 1177 //53.175> 1.7 count: 2178 //53.377> 1.7 count: 3179 //53.583> 1.7 count: 4180 //53.785> 1.7 count: 5181 //53.987> 1.7 count: 6182 //54.189> 1.7 count: 7183 //54.391> 1.7 count: 8184 //54.593> 1.7 count: 9185 //54.795> 1.7 count: 10186 //54.997> 1.7 count: 11187 //55.199> 1.7 count: 12
waterfall: 按顺序依次执行一组函数。每个函数产生的值,都将传给下一个。
1 var async = require(‘async‘); 2 var t = require(‘./t‘); 3 var log = t.log; 4 5 /** 6 * 这个函数名为waterfall(瀑布),可以想像瀑布从上到下,中途冲过一层层突起的石头。 7 * 8 * 注意,该函数不支持json格式的tasks 9 */10 // async.waterfall(tasks, [callback]);11 12 /**13 * 所有函数正常执行,每个函数的结果都将变为下一个函数的参数。14 *15 * 注意,所有的callback都必须形如callback(err, result),但err参数在前面各函数中无需声明,它被自动处理。16 */17 // 1.118 async.waterfall([19 function(cb) { log(‘1.1.1: ‘, ‘start‘); cb(null, 3); },20 function(n, cb) { log(‘1.1.2: ‘,n); t.inc(n, cb); },21 function(n, cb) { log(‘1.1.3: ‘,n); t.fire(n*n, cb); }22 ], function (err, result) {23 log(‘1.1 err: ‘, err);24 log(‘1.1 result: ‘, result);25 });26 //31.749> 1.1.1: start27 //31.752> 1.1.2: 328 //31.953> 1.1.3: 429 //32.156> 1.1 err: null30 //32.159> 1.1 result: 1631 32 /**33 * 中途有函数出错,其err直接传给最终callback,结果被丢弃,后面的函数不再执行。34 */35 // 1.236 async.waterfall([37 function(cb) { log(‘1.2.1: ‘, ‘start‘); cb(null, 3); },38 function(n, cb) { log(‘1.2.2: ‘, n); t.inc(n, cb); },39 function(n, cb) { log(‘1.2.3: ‘, n); t.err(‘myerr‘, cb); },40 function(n, cb) { log(‘1.2.4: ‘, n); t.fire(n, cb); }41 ], function (err, result) {42 log(‘1.2 err: ‘, err);43 log(‘1.2 result: ‘, result);44 });45 //44.935> 1.2.1: start46 //44.939> 1.2.2: 347 //45.140> 1.2.3: 448 //45.344> 1.2 err: myerr49 //45.348> 1.2 result:50 51 /**52 * 注意: 以json形式传入tasks,将不会被执行!!53 */54 async.waterfall({55 a: function(cb) { log(‘1.3.1: ‘, ‘start‘); cb(null, 3); },56 b: function(n, cb) { log(‘1.3.2: ‘, n); t.inc(n, cb); },57 c: function(n, cb) { log(‘1.3.3: ‘, n); t.fire(n*n, cb); }58 }, function (err, result) {59 log(‘1.3 err: ‘, err);60 log(‘1.3 result: ‘, result);61 });62 //49.222> 1.3 err: [Error: First argument to waterfall must be an array of functions]63 //49.228> 1.3 result:
compose: 创建一个包括一组异步函数的函数集合,每个函数会消费上一次函数的返回值。把f(),g(),h()异步函数,组合成f(g(h()))的形式,通过callback得到返回值。
1 var async = require(‘async‘); 2 var t = require(‘./t‘); 3 var log = t.log; 4 5 // compose(fn1, fn2...) 6 7 /** 8 * 通过compose组合,f(g(h()))的形式,从内层到外层的执行的顺序。 9 */10 //1.111 function f(n,callback){12 log(‘1.1.f enter: ‘,n);13 setTimeout(function () {14 callback(null, n + 1);15 }, 10);16 }17 function g(n, callback) {18 log(‘1.1.g enter: ‘,n);19 setTimeout(function () {20 callback(null, n * 2);21 }, 10);22 }23 function h(n, callback) {24 log(‘1.1.h enter: ‘,n);25 setTimeout(function () {26 callback(null, n - 10);27 }, 10);28 }29 var fgh = async.compose(f,g,h);30 fgh(4,function(err,result){31 log(‘1.1 err: ‘, err);32 log(‘1.1 result: ‘, result);33 });34 //05.307> 1.1.h enter: 435 //05.329> 1.1.g enter: -636 //05.341> 1.1.f enter: -1237 //05.361> 1.1 err: null38 //05.362> 1.1 result: -11
applyEach: 实现给一数组中每个函数传相同参数,通过callback返回。如果只传第一个参数,将返回一个函数对象,我可以传参调用。
1 var async = require(‘async‘); 2 var t = require(‘./t‘); 3 var log = t.log; 4 5 // applyEach(fns, args..., callback) 6 7 /** 8 * 异步执行,给数组中的函数,他们有相同的参数。 9 */10 //1.111 async.applyEach([12 function (name,cb) {13 setTimeout(function () {14 log("1.1 handler: " + name + " A");15 cb(null, name);16 }, 500);17 }, function (name,cb) {18 setTimeout(function () {19 log("1.1 handler: " + name + " B");20 cb(null, name);21 }, 150);22 }23 ], ‘Hello‘, function (err) {24 log(‘1.1 err: ‘, err);25 });26 //06.739> 1.1 handler: Hello B27 //07.079> 1.1 handler: Hello A28 //07.080> 1.1 err: null29 30 /**31 * 异步执行,当只设置第一参数后,得到函数对象,再传参调用这个函数。32 */33 //1.234 var fn = async.applyEach([35 function (name,cb) {36 setTimeout(function () {37 log("1.2 handler: " + name + " A");38 }, 500);39 }, function (name,cb) {40 setTimeout(function () {41 log("1.2 handler: " + name + " B");42 }, 150);43 }44 ]);45 fn("simgle",function(err){46 log(‘err: ‘,err);47 });48 //29.351> 1.2 handler: simgle B49 //29.688> 1.2 handler: simgle A50 51 /**52 * applyEachSeries与applyEach唯一不同的是,数组的函数同步执行。53 */54 //applyEachSeries(arr, args..., callback)55 //1.356 async.applyEachSeries([57 function (name,cb) {58 setTimeout(function () {59 log("1.3 handler: " + name + " A");60 cb(null, name);61 }, 500);62 }, function (name,cb) {63 setTimeout(function () {64 log("1.3 handler: " + name + " B");65 cb(null, name);66 }, 150);67 }68 ], "aaa", function (err) {69 log(‘1.3 err: ‘, err);70 });71 //10.669> 1.3 handler: aaa A72 //10.831> 1.3 handler: aaa B73 //10.834> 1.3 err: null
queue: 是一个串行的消息队列,通过限制了worker数量,不再一次性全部执行。当worker数量不够用时,新加入的任务将会排队等候,直到有新的worker可用。
1 var async = require(‘async‘); 2 var t = require(‘./t‘); 3 var log = t.log; 4 5 /* 6 * 该函数有多个点可供回调,如worker用完时、无等候任务时、全部执行完时等。 7 */ 8 // queue(worker, concurrency) 9 10 /** 11 * 定义一个queue,设worker数量为2 12 */ 13 var q = async.queue(function(task, callback) { 14 log(‘worker is processing task: ‘, task.name); 15 task.run(callback); 16 }, 2); 17 18 /** 19 * 监听:如果某次push操作后,任务数将达到或超过worker数量时,将调用该函数 20 */ 21 q.saturated = function() { 22 log(‘all workers to be used‘); 23 } 24 25 /** 26 * 监听:当最后一个任务交给worker时,将调用该函数 27 */ 28 q.empty = function() { 29 log(‘no more tasks wating‘); 30 } 31 32 /** 33 * 监听:当所有任务都执行完以后,将调用该函数 34 */ 35 q.drain = function() { 36 log(‘all tasks have been processed‘); 37 } 38 39 /** 40 * 独立加入2个任务 41 */ 42 q.push({name:‘t1‘, run: function(cb){ 43 log(‘t1 is running, waiting tasks: ‘, q.length()); 44 t.fire(‘t1‘, cb, 400); // 400ms后执行 45 }}, function(err) { 46 log(‘t1 executed‘); 47 }); 48 log(‘pushed t1, waiting tasks: ‘, q.length()); 49 50 q.push({name:‘t2‘,run: function(cb){ 51 log(‘t2 is running, waiting tasks: ‘, q.length()); 52 t.fire(‘t2‘, cb, 200); // 200ms后执行 53 }}, function(err) { 54 log(‘t2 executed‘); 55 }); 56 log(‘pushed t2, waiting tasks: ‘, q.length()); 57 //54.448> pushed t1, waiting tasks: 1 58 //54.451> all workers to be used 59 //54.452> pushed t2, waiting tasks: 2 60 //54.452> worker is processing task: t1 61 //54.453> t1 is running, waiting tasks: 1 62 //54.455> no more tasks wating 63 //54.455> worker is processing task: t2 64 //54.455> t2 is running, waiting tasks: 0 65 //54.656> t2 executed 66 //54.867> t1 executed 67 //54.868> all tasks have been processed 68 69 70 // 同时加入多个任务 71 q.push([ 72 { 73 name:‘t3‘, run: function(cb){ 74 log(‘t3 is running, waiting tasks: ‘, q.length()); 75 t.fire(‘t3‘, cb, 300); // 300ms后执行 76 } 77 },{ 78 name:‘t4‘, run: function(cb){ 79 log(‘t4 is running, waiting tasks: ‘, q.length()); 80 t.fire(‘t4‘, cb, 500); // 500ms后执行 81 } 82 },{ 83 name:‘t5‘, run: function(cb){ 84 log(‘t5 is running, waiting tasks: ‘, q.length()); 85 t.fire(‘t5‘, cb, 100); // 100ms后执行 86 } 87 },{ 88 name:‘t6‘, run: function(cb){ 89 log(‘t6 is running, waiting tasks: ‘, q.length()); 90 t.fire(‘t6‘, cb, 400); // 400ms后执行 91 } 92 } 93 ], function(err) { 94 log(‘err: ‘,err); 95 }); 96 log(‘pushed t3,t4,t5,t6 into queue, waiting tasks: ‘, q.length()); 97 //53.755> all workers to be used 98 //53.758> pushed t3,t4,t5,t6 into queue, waiting tasks: 4 99 //53.759> worker is processing task: t3100 //53.760> t3 is running, waiting tasks: 3101 //53.762> worker is processing task: t4102 //53.762> t4 is running, waiting tasks: 2103 //54.073> err: null104 //54.074> worker is processing task: t5105 //54.076> t5 is running, waiting tasks: 1106 //54.183> err: null107 //54.184> no more tasks wating108 //54.185> worker is processing task: t6109 //54.186> t6 is running, waiting tasks: 0110 //54.265> err: null111 //54.588> err: null112 //54.589> all tasks have been processed
cargo: 一个串行的消息队列,类似于queue,通过限制了worker数量,不再一次性全部执行。不同之处在于,cargo每次会加载满额的任务做为任务单元,只有任务单元中全部执行完成后,才会加载新的任务单元。
1 var async = require(‘async‘); 2 var t = require(‘./t‘); 3 var log = t.log; 4 5 // cargo(worker, [payload]) 6 7 /** 8 * 创建cargo实例 9 */10 var cargo = async.cargo(function (tasks, callback) {11 for(var i=0; i<tasks.length; i++){12 log(‘start ‘ + tasks[i].name);13 }14 callback();15 }, 2);16 17 18 /**19 * 监听:如果某次push操作后,任务数将达到或超过worker数量时,将调用该函数20 */21 cargo.saturated = function() {22 log(‘all workers to be used‘);23 }24 25 /**26 * 监听:当最后一个任务交给worker时,将调用该函数27 */28 cargo.empty = function() {29 log(‘no more tasks wating‘);30 }31 32 /**33 * 监听:当所有任务都执行完以后,将调用该函数34 */35 cargo.drain = function() {36 log(‘all tasks have been processed‘);37 }38 39 /**40 * 增加新任务41 */42 cargo.push({name: ‘A‘}, function (err) {43 t.wait(300);44 log(‘finished processing A‘);45 });46 cargo.push({name: ‘B‘}, function (err) {47 t.wait(600);48 log(‘finished processing B‘);49 });50 cargo.push({name: ‘C‘}, function (err) {51 t.wait(500);52 log(‘finished processing C‘);53 });54 cargo.push({name: ‘D‘}, function (err) {55 t.wait(100);56 log(‘finished processing D‘);57 });58 cargo.push({name: ‘E‘}, function (err) {59 t.wait(200);60 log(‘finished processing E‘);61 });62 //40.016> all workers to be used63 //40.020> no more tasks wating64 //40.020> start A65 //40.020> start B66 //40.322> finished processing A67 //40.923> finished processing B68 //40.923> no more tasks wating69 //40.924> start C70 //40.924> start D71 //41.425> finished processing C72 //41.526> finished processing D73 //41.526> no more tasks wating74 //41.527> start E75 //41.728> finished processing E76 //41.728> all tasks have been processed77 //41.729> all tasks have been processed78 //41.729> all tasks have been processed79 //41.729> all tasks have been processed80 //41.730> all tasks have been processed
auto: 用来处理有依赖关系的多个任务的执行。
1 var async = require(‘async‘); 2 var t = require(‘./t‘); 3 var log = t.log; 4 5 /** 6 * 比如某些任务之间彼此独立,可以并行执行;但某些任务依赖于其它某些任务,只能等那些任务完成后才能执行。 7 * 虽然我们可以使用parallel和series结合起来实现该功能,但如果任务之间关系复杂,则代码会相当复杂,以后如果想添加一个新任务,也会很麻烦。 8 * 这时使用auto,则会事半功倍。 9 * 10 * 如果有任务中途出错,则会把该错误传给最终callback,所有任务(包括已经执行完的)产生的数据将被忽略。 11 * 如果不关心错误和最终数据,可以不用写最后那个callback。 12 */ 13 // async.auto(tasks, [callback]) 14 15 /** 16 * 我要写一个程序,它要完成以下几件事: 17 * 1. 从某处取得数据 18 * 2. 在硬盘上建立一个新的目录 19 * 3. 将数据写入到目录下某文件 20 * 4. 发送邮件,将文件以附件形式发送给其它人。 21 * 22 * 分析该任务,可以知道1与2可以并行执行,3需要等1和2完成,4要等3完成。 23 * 可以按以下方式来使用auto函数。 24 */ 25 // 1.1 26 async.auto({ 27 getData: function (callback) { 28 setTimeout(function(){ 29 console.log(‘1.1: got data‘); 30 callback(null, ‘mydata‘); 31 }, 300); 32 }, 33 makeFolder: function (callback) { 34 setTimeout(function(){ 35 console.log(‘1.1: made folder‘); 36 callback(null, ‘myfolder‘); 37 }, 200); 38 }, 39 writeFile: [‘getData‘, ‘makeFolder‘, function(callback) { 40 setTimeout(function(){ 41 console.log(‘1.1: wrote file‘); 42 callback(null, ‘myfile‘); 43 }, 300); 44 }], 45 emailFiles: [‘writeFile‘, function(callback, results) { 46 log(‘1.1: emailed file: ‘, results.writeFile); 47 callback(null, results.writeFile); 48 }] 49 }, function(err, results) { 50 log(‘1.1: err: ‘, err); 51 log(‘1.1: results: ‘, results); 52 }); 53 //1.1: made folder 54 //1.1: got data 55 //1.1: wrote file 56 //20.120> 1.1: emailed file: myfile 57 //20.125> 1.1: err: null 58 //20.127> 1.1: results: { makeFolder: ‘myfolder‘, 59 // getData: ‘mydata‘, 60 // writeFile: ‘myfile‘, 61 // emailFiles: ‘myfile‘ } 62 63 64 65 /** 66 * 如果中途出错,则会把错误交给最终callback,执行完任务的传给最终callback。未执行完成的函数值被忽略 67 */ 68 // 1.2 69 async.auto({ 70 getData: function (callback) { 71 setTimeout(function(){ 72 console.log(‘1.2: got data‘); 73 callback(null, ‘mydata‘); 74 }, 300); 75 }, 76 makeFolder: function (callback) { 77 setTimeout(function(){ 78 console.log(‘1.2: made folder‘); 79 callback(null, ‘myfolder‘); 80 }, 200); 81 }, 82 writeFile: [‘getData‘, ‘makeFolder‘, function(callback, results) { 83 setTimeout(function(){ 84 console.log(‘1.2: wrote file‘); 85 callback(‘myerr‘); 86 }, 300); 87 }], 88 emailFiles: [‘writeFile‘, function(callback, results) { 89 console.log(‘1.2: emailed file: ‘ + results.writeFile); 90 callback(‘err sending email‘, results.writeFile); 91 }] 92 }, function(err, results) { 93 log(‘1.2 err: ‘, err); 94 log(‘1.2 results: ‘, results); 95 }); 96 //1.2: made folder 97 //1.2: got data 98 //1.2: wrote file 99 //51.399> 1.2 err: myerr100 //51.401> 1.2 results: { makeFolder: ‘myfolder‘,101 // getData: ‘mydata‘,102 // writeFile: undefined }
iterator: 将一组函数包装成为一个iterator,初次调用此iterator时,会执行定义中的第一个函数并返回第二个函数以供调用。
1 var async = require(‘async‘); 2 var t = require(‘./t‘); 3 var log = t.log; 4 5 /** 6 * 也可通过手动调用 next() 得到以下一个函数为起点的新的iterator。 7 * 该函数通常由async在内部使用,但如果需要时,也可在我们的代码中使用它。 8 */ 9 // async.iterator(tasks)10 11 var iter = async.iterator([12 function () {log(‘I am 111‘)},13 function () {log(‘I am 222‘)},14 function () {log(‘I am 333‘)}15 ]);16 17 /**18 * 直接调用(),会执行当前函数,并返回一个由下个函数为起点的新的iterator19 */20 //1.121 log(‘1.1 iter()‘);22 var it1 = iter();23 it1();24 it1();25 //28.368> 1.1 iter()26 //28.371> I am 11127 //28.372> I am 22228 //28.372> I am 22229 30 /**31 * 通过iter()来调用下一个函数32 */33 log(‘1.2 iter()‘);34 var it2 = iter();35 var it3 = it2();36 var it4 = it3();37 //it4(); // 这句代码执行会报错38 log(it4); // => ‘null‘39 //32.449> 1.2 iter()40 //32.452> I am 11141 //32.452> I am 22242 //32.453> I am 33343 //32.454> null44 45 /**46 * 调用next(),不会执行当前函数,直接返回由下个函数为起点的新iterator47 * 对于同一个iterator,多次调用next(),不会影响自己48 */49 //1.350 log(‘1.3 iter()‘);51 var it5 = iter.next();52 it5();53 var it6 = iter.next().next();54 it6();55 iter();56 //39.895> 1.3 iter()57 //39.898> I am 22258 //39.899> I am 33359 //39.899> I am 111
apply: 可以让我们给一个函数预绑定多个参数并生成一个可直接调用的新函数,简化代码。
1 var async = require(‘async‘); 2 3 var t = require(‘./t‘); 4 var log = t.log; 5 6 /** 7 * function(callback) { t.inc(3, callback); } 8 * 等价于: 9 * async.apply(t.inc, 3);10 */11 // apply(function, arguments..)12 13 /**14 * 通过名字绑定函数t.inc, t.fire,作为新函数给parallel调用15 */16 //1.117 async.parallel([18 async.apply(t.inc, 3),19 async.apply(t.fire, 100)20 ], function (err, results) {21 log(‘1.1 err: ‘, err);22 log(‘1.1 results: ‘, results);23 });24 //58.605> 1.1 err: null25 //58.613> 1.1 results: [ 4, 100 ]26 27 /**28 * 构造一个加法函数,通过apply简化代码29 */30 //1.231 function inc(a,b,callback,timeout){32 var timeout = timeout || 200;33 t.wait(200);34 setTimeout(function() {35 callback(null, a+b);36 }, timeout);37 }38 var fn = async.apply(inc, 1, 2);39 fn(function(err, n){40 log(‘1.2 inc: ‘ + n);41 });42 //58.616> 1.2 inc: 3
nextTick: 与nodejs的nextTick一样,再最后调用函数。
1 var async = require(‘async‘); 2 var t = require(‘./t‘); 3 var log = t.log; 4 5 /** 6 * 但在浏览器端,只能使用setTimeout(callback,0),但这个方法有时候会让其它高优先级的任务插到前面去。 7 * 所以提供了这个nextTick,让同样的代码在服务器端和浏览器端表现一致。 8 */ 9 // nextTick(callback)10 11 var calls = [];12 async.nextTick(function() {13 calls.push(‘two‘);14 });15 async.nextTick(function() {16 log(‘1.1‘,calls);17 });18 calls.push(‘one‘);19 log(‘1.2‘,calls);20 async.nextTick(function() {21 log(‘1.3‘,calls);22 });23 //09.838> 1.2[ ‘one‘ ]24 //09.842> 1.1[ ‘one‘, ‘two‘ ]25 //09.843> 1.3[ ‘one‘, ‘two‘ ]
times: 异步运行,times可以指定调用几次,并把结果合并到数组中返回
timesSeries: 与time类似,唯一不同的是同步执行
1 var async = require(‘async‘); 2 var t = require(‘./t‘); 3 var log = t.log; 4 5 // times(n, callback) 6 7 function delay(n){return (n+12) % 7 *100;} 8 var createUser = function(id, callback) { 9 callback(null, {10 id: ‘user‘ + id,11 delay:delay(id)12 })13 }14 15 /**16 * 异步执行,调用3次createUser函数,结果被合并到数组返回17 */18 //1.119 async.times(3, function(n, callback){20 log("1.1 enter: "+ n);21 setTimeout(function(){22 log(‘1.1 handler: ‘,n);23 createUser(n, function(err, user) {24 callback(err, user)25 })26 },delay(n));27 }, function(err, users) {28 log(‘1.1 err: ‘,err);29 log(‘1.1 result: ‘,users);30 });31 //07.397> 1.1 enter: 032 //07.400> 1.1 enter: 133 //07.401> 1.1 enter: 234 //07.412> 1.1 handler: 235 //07.912> 1.1 handler: 036 //08.009> 1.1 handler: 137 //08.010> 1.1 err: null38 //08.011> 1.1 result: [ { id: ‘user0‘, delay: 500 },39 // { id: ‘user1‘, delay: 600 },40 // { id: ‘user2‘, delay: 0 } ]41 42 /**43 * timesSeries与time唯一不同的是,同步执行44 */45 //timesSeries(n, callback)46 47 /**48 * 同步执行,调用3次createUser函数,结果被合并到数组返回49 */50 //1.251 async.timesSeries(3, function(n, callback){52 log("1.2 enter: "+ n);53 setTimeout(function(){54 log(‘1.2 handler: ‘,n);55 createUser(n, function(err, user) {56 callback(err, user)57 })58 },delay(n));59 }, function(err, users) {60 log(‘1.2 err: ‘,err);61 log(‘1.2 result: ‘,users);62 });63 //16.642> 1.2 enter: 064 //17.159> 1.2 handler: 065 //17.162> 1.2 enter: 166 //17.763> 1.2 handler: 167 //17.767> 1.2 enter: 268 //17.778> 1.2 handler: 269 //17.779> 1.2 err: null70 //17.780> 1.2 result: [ { id: ‘user0‘, delay: 500 },71 // { id: ‘user1‘, delay: 600 },72 // { id: ‘user2‘, delay: 0 } ]
- Utils
memoize: 让某一个函数在内存中缓存它的计算结果。对于相同的参数,只计算一次,下次就直接拿到之前算好的结果。
unmemoize: 让已经被缓存的函数,返回不缓存的函数引用。
log: 执行某异步函数,并记录它的返回值,日志输出。
dir: 与log类似,不同之处在于,会调用浏览器的console.dir()函数,显示为DOM视图。
fire: 直接将obj的内容返回给async
err: 模拟一个错误的产生,让async各个函数末尾的callback接收到。
wait: 刻意等待mils的时间,mils的单位是毫秒。
Nodejs - 框架类库 - Nodejs异步流程控制Async