fetch-event-source库使用源码学习

 更新时间:2023年09月21日 08:45:25   作者:河豚学前端  
这篇文章主要为大家介绍了fetch-event-source库源码学习,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

终于遇到一个简单的库来学习它的源码了。这个项目只有2个主要文件,代码加起来不到500行,是真的很mini了。

客户端向服务端发起请求用xhrfetch,客户端与服务端双向通信用websocket,而服务端主动发起请求用ssechatGPT就是用sse回复提问的。

window中有一个叫EventSource的构造函数。一个EventSource实例会对服务器开启一个持久化的连接,以text/event-stream格式发送事件,此连接会一直保持开启直到通过调用EventSource.close()关闭。但使用EventSource时只能把参数加到url后面,而且也不能像fetch请求那样设置header等参数。借助fetch-event-source这个库就可以像发起fetch请求一样发起服务器单向通信请求。

目录结构

入口文件

index.ts是入口文件,里面只有2行代码,导出了fetch.tsparse.ts中部分变量和方法。

export { fetchEventSource, FetchEventSourceInit, EventStreamContentType } from './fetch';
export { EventSourceMessage } from './parse';

export {...} from 'xx'其实是import + export的缩写。

上面的代码其实就是下面代码的缩写:

import { fetchEventSource, FetchEventSourceInit, EventStreamContentType } from './fetch';
import { EventSourceMessage } from './parse';
export {
    fetchEventSource,
    FetchEventSourceInit,
    EventStreamContentType,
    EventSourceMessage,
}

发起请求

  • 首先定义了变量EventStreamContentType,它的值是sseMIME Type。它在2个地方使用。第一处是发起请求时设置headers.accept,告诉服务器只接受text/event-stream格式的数据。第2处是在连接建立时判断response.headers.get('content-type')是否等于EventStreamContentType的值,如果不是的话就抛出一个错误,声明期待的类型是text/event-stream
  • 接下来定义了变量DefaultRetryIntervalsse自动重连机制,这里定义了每次重连的默认间隔为1s。然后定义了变量LastEventId,表示上一次事件的id,添加在headers中发送到服务端。

接下来定义了一个类型FetchEventSourceInit,它声明了fetchEventSource的第2个参数的类型。参数一共有7个。
headers 请求头。

onopen 连接建立时的回调函数,如果没有设置会调用默认的defaultOnOpen,这个默认回调里进行了返回值类型判断。
onmessage每次收到消息时的回调函数,参数是消息对象,它的类型就是parse.ts中定义的EventSourceMessage

onclose 连接关闭时的回调函数。

onerror连接发送错误时的回调函数,如果没有指定这个回调或返回undefined就会发起重新连接请求。

openWhenHidden 默认为false,监听visibilitychange,当页面不可见时关闭连接,当页面重新可见时重新打开连接。

fetch发起请求的方法,默认为window.fetch

Record<string, string>等价于{[key: string]: string}Promise<void>定义了一个异步函数,返回值是voidtypeof fetch 获取fetch的类型,typeof后面跟的是变量,表示类型定义

接下来就是最重要的fetchEventSource,它是一个异步函数,接受2个参数:urlFetchEventSourceInit类型的对象。

在这个方法中,首先定义了接受的媒体类型。然后添加监听visibilitychange事件,然后添加监听abort事件供使用者可以手动打断连接,然后发起连接,拿到返回值后将返回值传递给onopen,然后调用getBytes解析返回值,解析之后关闭连接。

try...catch包裹发起连接和解析返回值以及关闭连接的过程,如果捕获到错误且不是主动打断的就发起重连。

再说一下主动打断连接这里,fetchEventSource的第2个参数可以传入一个信号signal,这个属性在FetchEventSourceInit中没有定义。借助AbortController中断连接,具体信息可以看AbortController-MDN

??类似||,相同点在于根据前面的值判断返回前面的还是后面的,不同点在于??的第一个值为nullundefined时返回第二个值,||会将第一个值先转换为布尔值。比如

0 ?? 2 // 0
0 || 2 // 2

defaultOnOpen定义默认onopen回调,主要是检查返回值类型。

解析消息

首先使用response.body来获取响应的主体内容,并通过getBytes函数将其转换为字节数组。然后,使用getLines函数将字节数组拆分成行,并使用getMessages函数将每行解析为事件消息。

处理ReadableStream数据

// 创建了一个数据读取器
const reader = response.getReader();
// 创建了一个文本解码器
const decoder = new TextDecoder();
reader.read().then(function processText({ done, value }) {
  // Result 对象包含了两个属性:
  // done  - 当 stream 传完所有数据时则变成 true
  // value - 数据片段。当 done 为 true 时始终为 undefined
  if (done) {
 return;
  }
  // 将字节流转换为字符
  const text = decoder.decode(value)
  // 内容
  console.log(text);
  // 再次调用这个函数以读取更多数据
  return reader.read().then(processText);
});

处理过程分析

await getBytes(response.body!, getLines(getMessages(id => {
 if (id) {
     // store the id and send it back on the next retry:
     headers[LastEventId] = id;
 } else {
     // don't send the last-event-id header anymore:
     delete headers[LastEventId];
 }
  }, retry => {
 retryInterval = retry;
}, onmessage)));

首先执行的是getBytes方法,它创建一个读取器,用while循环读取流数据,每读取一段就执行onChunk解析流数据,onChunk就是在fetch.tsgetLines的返回值。

onChunk将字节块按行分割,并将每行的字节子数组和字段长度传递给onLine回调函数。onLine则是getMessages的返回值。
getMessages创建了一个解码器,返回一个名为onLine的函数,用于处理每个传入的行数据。它将行的字节子数组解码为字符串,并根据字段的类型进行相应的处理。比如,如果字段是data,它会将值追加到message.data中,如果message.data已经存在,则在原有值的基础上添加新值,并使用换行符分隔。

将字节流先按行分割再解析是为了更好的处理数据,因为数据都是field:value格式的。

TextDecoder表示一个文本解码器,可以将字节流数据转换成指定码位流,默认是utf-8。

问题

在调试这个库的时候,在html中引入打包后的esm文件会报错文件找不到,因为文件名没有添加后缀。

接口返回值的类型必须是text/event-stream类型的,就算是流数据也不行。

总结

getBytes的第2个参数是getLines的返回值,getLines的参数又是getMessages的返回值,嵌套的比较深。
onChunk将字节块切割成一行一行的字节,涉及字节数据的知识。

以上就是fetch-event-source库源码学习的详细内容,更多关于fetch-event-source的资料请关注脚本之家其它相关文章!

相关文章

最新评论