1 def log[A](prompt: String): Pipe[Task,A,A] = _.evalMap { a => Task.delay{ println(s"$prompt>"); a }}2 //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]3 def randomDelay[A](max: FiniteDuration): Pipe[Task,A,A] = _.evalMap { a =>4 val delay: Task[Int] = Task.delay { scala.util.Random.nextInt(max.toMillis.toInt) }5 delay.flatMap {d => Task.now(a).schedule(d.millis) }6 } //> randomDelay: [A](max: scala.concurrent.duration.FiniteDuration)fs2.Pipe[fs2.
1 val s1 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicBackend.scala"),1024)2 //> s1 : fs2.Stream[fs2.Task,Byte] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>)3 val s2 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/DatabaseConfig.scala"),1024)4 //> s2 : fs2.Stream[fs2.Task,Byte] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>)5 val s3 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicProfile.scala"),1024)6 //> s3 : fs2.Stream[fs2.Task,Byte] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>)
def readAll[F[_]](path: Path, chunkSize: Int)(implicit F: Effect[F]): Stream[F, Byte] ={...}
readAll分批(by chunks)从文件中读取Byte类型数据(当返回数据量小于chunkSize代表完成读取),返回结果类型是Stream[F,Byte]。我们需要进行Byte>>>String转换及分行等处理。fs2在text对象里提供了相关函数:
object text { private val utf8Charset = Charset.forName("UTF-8") /** Converts UTF-8 encoded byte stream to a stream of `String`. */ def utf8Decode[F[_]]: Pipe[F, Byte, String] = _.chunks.through(utf8DecodeC) /** Converts UTF-8 encoded `Chunk[Byte]` inputs to `String`. */ def utf8DecodeC[F[_]]: Pipe[F, Chunk[Byte], String] = { /** * Returns the number of continuation bytes if `b` is an ASCII byte or a * leading byte of a multi-byte sequence, and -1 otherwise. */ def continuationBytes(b: Byte): Int = { if ((b & 0x80) == 0x00) 0 // ASCII byte else if ((b & 0xE0) == 0xC0) 1 // leading byte of a 2 byte seq else if ((b & 0xF0) == 0xE0) 2 // leading byte of a 3 byte seq else if ((b & 0xF8) == 0xF0) 3 // leading byte of a 4 byte seq else -1 // continuation byte or garbage }.../** Encodes a stream of `String` in to a stream of bytes using the UTF-8 charset. */ def utf8Encode[F[_]]: Pipe[F, String, Byte] = _.flatMap(s => Stream.chunk(Chunk.bytes(s.getBytes(utf8Charset)))) /** Encodes a stream of `String` in to a stream of `Chunk[Byte]` using the UTF-8 charset. */ def utf8EncodeC[F[_]]: Pipe[F, String, Chunk[Byte]] = _.map(s => Chunk.bytes(s.getBytes(utf8Charset))) /** Transforms a stream of `String` such that each emitted `String` is a line from the input. */ def lines[F[_]]: Pipe[F, String, String] = {...
1 val startTime = System.currentTimeMillis //> startTime : Long = 1472444756321 2 val s1lines = s1.through(text.utf8Decode).through(text.lines) 3 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun 4 //> s1lines : Int = 479 5 println(s"reading s1 $s1lines lines in ${System.currentTimeMillis - startTime}ms") 6 //> reading s1 479 lines in 5370ms 7 8 val startTime2 = System.currentTimeMillis //> startTime2 : Long = 1472444761691 9 val s2lines = s2.through(text.utf8Decode).through(text.lines)10 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun11 //> s2lines : Int = 17412 println(s"reading s2 $s2lines lines in ${System.currentTimeMillis - startTime2}ms")13 //> reading s2 174 lines in 1923ms14 val startTime3 = System.currentTimeMillis //> startTime3 : Long = 147244476361415 val s3lines = s3.through(text.utf8Decode).through(text.lines)16 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun17 //> s3lines : Int = 17418 println(s"reading s3 $s3lines lines in ${System.currentTimeMillis - startTime3}ms")19 //> reading s3 174 lines in 1928ms20 println(s"reading all three files ${s1lines+s2lines+s3lines} total lines in ${System.currentTimeMillis - startTime}ms")21 //> reading all three files 827 total lines in 9221ms
def join[F[_],O](maxOpen: Int)(outer: Stream[F,Stream[F,O]])(implicit F: Async[F]): Stream[F,O] = {...}
join运算的对象outer是个两层Stream(Streams of Stream):Stream[F,Stream[F,P]],我们需要先进行类型款式调整:
1 val lines1 = s1.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis))2 //> lines1 : fs2.Stream[fs2.Task,String] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>)3 val lines2 = s2.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis))4 //> lines2 : fs2.Stream[fs2.Task,String] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>)5 val lines3 = s3.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis))6 //> lines3 : fs2.Stream[fs2.Task,String] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>)7 val ss: Stream[Task,Stream[Task,String]] = Stream(lines1,lines2,lines3)8 //> ss : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,String]] = Segment(Emit(Chunk(evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>), evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>), evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>))))
1 val ss_start = System.currentTimeMillis //> ss_start : Long = 14724499626982 val ss_lines = fs2.concurrent.join(3)(ss).runFold(0)((b,_) => b + 1).unsafeRun3 //> ss_lines : Int = 8274 println(s"parallel reading all files ${ss_lines} total lines in ${System.currentTimeMillis - ss_start}ms")5 //> parallel reading all files 827 total lines in 5173ms
1 //c 是个vowl2 def vowls(c: Char): Boolean = List(‘A‘,‘E‘,‘I‘,‘O‘,‘U‘).contains(c)3 //> vowls: (c: Char)Boolean4 //直接用scala标准库实现5 def pipeVowlsCount: Pipe[Task,String,Map[Char,Int]] =6 _.evalMap (text => Task.delay{7 text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size)8 }.schedule((text.length / 10).millis)) //> pipeVowlsCount: => fs2.Pipe[fs2.Task,String,Map[Char,Int]]
注意我们使用了text => Task.delay{...}.schedule(d),实际上我们完全可以用 text => Thread.sleep(d),但是这样会造成了不纯代码,所以我们用evalMap来实现纯代码运算。试试统计全部字串内母音出现的总数:
1 import scalaz.{Monoid} 2 //为runFold提供一个Map[Char,Int]Monoid实例 3 implicit object mapMonoid extends Monoid[Map[Char,Int]] { 4 def zero: Map[Char,Int] = Map() 5 def append(m1: Map[Char,Int], m2: => Map[Char,Int]): Map[Char,Int] = { 6 (m1.keySet ++ m2.keySet).map { k => 7 (k, m1.getOrElse(k,0) + m2.getOrElse(k,0)) 8 }.toMap 9 }10 }11 val vc_start = System.currentTimeMillis //> vc_start : Long = 147246477246512 val vowlsLine = fs2.concurrent.join(3)(ss).through(pipeVowlsCount)13 .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun14 ?//> vowlsLine : scala.collection.immutable.Map[Char,Int] = Map(E -> 3381, U - ?838, A -> 2361, I -> 2031, O -> 1824)15 println(s"parallel reading all files and counted vowls sequencially in ${System.currentTimeMillis - vc_start}ms")16 //> parallel reading all files and counted vowls sequencially in 10466ms
那?我们又如何实现统计功能的并行运算呢? fs2.concurrent.join(maxOpen)(...)函数能把一个Stream截成maxOpen数的子Stream,然后对这些子Stream进行并行运算。那么我们又如何转换Stream[F,Stream[F,O]]类型呢?我们必须把Stream[F,O]的O升格成Stream[F,O]。我们先用一个函数来把O转换成Map[Char,Int],然后把这个函数升格成Stream[Task,Map[Char,Int],这个可以用Stream.eval实现:
1 def fVowlsCount(text: String): Map[Char,Int] =2 text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size)3 //> fVowlsCount: (text: String)Map[Char,Int]4 val parVowlsLine: Stream[Task,Stream[Task,Map[Char,Int]]] = fs2.concurrent.join(3)(ss)5 .map {text => Stream.eval(Task {fVowlsCount(text)}.schedule((text.length / 10).millis))}6 //> parVowlsLine : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,Map[Char,Int]]] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>).mapChunks(<function1>)
1 val parvc_start = System.currentTimeMillis //> parvc_start : Long = 14724658446942 fs2.concurrent.join(8)(parVowlsLine)3 .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun4 //> res0: scala.collection.immutable.Map[Char,Int] = Map(E -> 3381, U -> 838, A-> 2361, I -> 2031, O -> 1824)5 println(s"parallel reading all files and counted vowls in ${System.currentTimeMillis - parvc_start}ms")6 //> parallel reading all files and counted vowls in 4984ms
