Dart 是单线程,那么怎么异步呢?或者耗时为什么不卡线程呢?
Flutter 引擎并不会创建线程,embedder提供给4个 task runner 引用给Flutter 引擎:
所有的 Dart 代码均运行在一个 isolate 的上下文环境中,该 isolate 中拥有对应 Dart 代码片段运行所需的所有内存。那么在开发中,我们经常会遇到一些耗时的操作,比如网络请求、文件读取等等,那么线程势必会阻塞,无法响应其他时间,UI 卡死,那么怎么在单线程处理耗时操作呢?
通常我们会使用一个 Future 对象用于表示异步操作的结果,这些正在处理的操作或 I/O 将会在稍后完成。那么 Future 是怎么实现单线程异步呢?
事件触发,如点击、重绘,事件循环取到事件,处理,丢弃。就像快递送到收件人手里,被拆开、拿走快递、丢掉快递袋子。而 Future 修饰的函数,类似一个冷冻箱,放在快递人手里,并不拆开,而是别人解冻处理后,然后告诉收件人可以拆了,收件人再拆开。这期间,CPU 则去调度执行其他IO,等异步处理完成,这个结果会被放入事件循环,事件循环处理这个结果。
上面说到异步处理,许多文章都一笔带过,我们不免头大,并没有解惑,为什么单线程可以异步处理?下面从 Future 的建立说起:
Future 会创建 Timer ,并将 timer_impl.dart 中 _Timer 对象的静态方法 _handleMessage() 放入到 isolate_patch.dart 中 _RawReceivePortImpld 对象的静态成员变量 _handlerMap ;并创建 ReceivePort 和 SendPort ,这里就和 Android 线程间通信的 Hander 一样, Future 会把任务交给操作系统去执行,然后自己继续执行别的任务。比如说,网络请求,Socket 本身提供了 select 模型可以异步查询;而文件 IO,操作系统也提供了基于事件的回调机制。等事件处理完,再把结果发回 ReceivePort ,事件循环去处理这个结果。
那么别的负载大的耗时操作呢?比如通用的耗时计算任务,例如求解阶乘,os 并没有异步接口给 Dart 调用,所以异步编程帮助不大,这时候就需要多线程去处理了,而 Dart 的多线程就是 isolate ,但是 isolate 并不是内存共享的,它更像是一个进程。
通常网络返回 json ,我们需要解析成 实体 bean ,如果 json 十分庞大,耗时较多,就卡顿了。所以需要放在 isolate 里处理。
import 'dart:convert';main(List<String> args) { String jsonString = '''{ "id":"123", "name":"张三", "score" : 95}'''; Student student = parseJson(jsonString); print(student.name);}Student parseJson(String json) { Map<String, dynamic> map = jsonDecode(json); return Student.fromJson(map);}class Student { String id; String name; int score; Student({this.id, this.name, this.score}); factory Student.fromJson(Map parsedJson) { return Student(id: parsedJson['id'], name: parsedJson['name'], score: parsedJson['score']); }}
我们把上面代码放入 isolate 中执行:
Future<Student> loadStudent(String json) { return compute(parseJson, json);}Student parseJson(String json) { Map<String, dynamic> map = jsonDecode(json); return Student.fromJson(map);}
compute 是 Flutter 的 api ,帮我们封装了 isolate ,使用十分简单,但是也有局限性, 它没有办法多次返回结果,也没有办法持续性的传值计算,每次调用,相当于新建一个隔离,如果同时调用过多的话反而会多次开辟内存。在某些业务下,我们可以使用compute,但是在另外一些业务下,我们只能使用dart提供的 isolate 了。
我们把上面的代码利用 isolate 实现一遍:
import 'dart:convert';import 'dart:isolate';main(List<String> args) async { await start();}Isolate isolate;start() async { //创建接收端口,用来接收子线程消息 ReceivePort receivePort = ReceivePort(); //创建并发Isolate,并传入主线程发送端口 isolate = await Isolate.spawn(entryPoint, receivePort.sendPort); //监听子线程消息 receivePort.listen((data) { print('Data:$data'); });}//并发IsolateentryPoint(SendPort sendPort) { String jsonString = '''{ "id":"123", "name":"张三", "score" : 95}'''; Student student = parseJson(jsonString); sendPort.send(student);}Student parseJson(String json) { Map<String, dynamic> map = jsonDecode(json); return Student.fromJson(map);}class Student { String id; String name; int score; Student({this.id, this.name, this.score}); factory Student.fromJson(Map parsedJson) { return Student(id: parsedJson['id'], name: parsedJson['name'], score: parsedJson['score']); }}
有时候,我们需要传参给子线程,或者像线程池一样可以管理这个 isolate ,那么我们就需要实现双向通信:
import 'dart:isolate';main(List<String> args) async { await start(); await Future.delayed(Duration(seconds: 1), () { threadPort.send('我来自主线程'); print('1'); }); await Future.delayed(Duration(seconds: 1), () { threadPort.send('我也来自主线程'); print('2'); }); await Future.delayed(Duration(seconds: 1), () { threadPort.send('end'); print('3'); });}Isolate isolate;//子线程发送端口SendPort threadPort;start() async { //创建主线程接收端口,用来接收子线程消息 ReceivePort receivePort = ReceivePort(); //创建并发Isolate,并传入主线程发送端口 isolate = await Isolate.spawn(entryPoint, receivePort.sendPort); //监听子线程消息 receivePort.listen((data) { print('主线程收到来自子线程的消息$data'); if (data is SendPort) { threadPort = data; } });}//并发IsolateentryPoint(dynamic message) { //创建子线程接收端口,用来接收主线程消息 ReceivePort receivePort = ReceivePort(); SendPort sendPort; print('==entryPoint==$message'); if (message is SendPort) { sendPort = message; print('子线程开启'); sendPort.send(receivePort.sendPort); //监听子线程消息 receivePort.listen((data) { print('子线程收到来自主线程的消息$data'); assert(data is String); if (data == 'end') { isolate?.kill(); isolate = null; print('子线程结束'); return; } }); return; }}
==entryPoint==SendPort子线程开启主线程收到来自子线程的消息SendPort1子线程收到来自主线程的消息我来自主线程2子线程收到来自主线程的消息我也来自主线程3子线程收到来自主线程的消息end子线程结束
双向通信比较复杂,所以我们需要封装下,通过 api 让外部调用:
import 'dart:async';import 'dart:isolate';main(List<String> args) async { var worker = Worker(); worker.reuqest('发送消息1').then((data) { print('子线程处理后的消息:$data'); }); Future.delayed(Duration(seconds: 2), () { worker.reuqest('发送消息2').then((data) { print('子线程处理后的消息:$data'); }); });}class Worker { SendPort _sendPort; Isolate _isolate; final _isolateReady = Completer<void>(); final Map<Capability, Completer> _completers = {}; Worker() { init(); } void dispose() { _isolate.kill(); } Future reuqest(dynamic message) async { await _isolateReady.future; final completer = new Completer(); final requestId = new Capability(); _completers[requestId] = completer; _sendPort.send(new _Request(requestId, message)); return completer.future; } Future<void> init() async { final receivePort = ReceivePort(); final errorPort = ReceivePort(); errorPort.listen(print); receivePort.listen(_handleMessage); _isolate = await Isolate.spawn( _isolateEntry, receivePort.sendPort, onError: errorPort.sendPort, ); } void _handleMessage(message) { if (message is SendPort) { _sendPort = message; _isolateReady.complete(); return; } if (message is _Response) { final completer = _completers[message.requestId]; if (completer == null) { print("Invalid request ID received."); } else if (message.success) { completer.complete(message.message); } else { completer.completeError(message.message); } return; } throw UnimplementedError("Undefined behavior for message: $message"); } static void _isolateEntry(dynamic message) { SendPort sendPort; final receivePort = ReceivePort(); receivePort.listen((dynamic message) async { if (message is _Request) { print('子线程收到:${message.message}'); sendPort.send(_Response.ok(message.requestId, '处理后的消息')); return; } }); if (message is SendPort) { sendPort = message; sendPort.send(receivePort.sendPort); return; } }}class _Request { /// The ID of the request so the response may be associated to the request's future completer. final Capability requestId; /// The actual message of the request. final dynamic message; const _Request(this.requestId, this.message);}class _Response { /// The ID of the request this response is meant to. final Capability requestId; /// Indicates if the request succeeded. final bool success; /// If [success] is true, holds the response message. /// Otherwise, holds the error that occured. final dynamic message; const _Response.ok(this.requestId, this.message) : success = true; const _Response.error(this.requestId, this.message) : success = false;}