This commit is contained in:
Lukian 2023-06-20 15:25:19 +02:00
parent 13ec9babde
commit 68f4b60012
1429 changed files with 2481 additions and 272836 deletions

View file

@ -5,19 +5,17 @@ const diagnosticsChannel = require('diagnostics_channel')
const { uid, states } = require('./constants')
const {
kReadyState,
kResponse,
kExtensions,
kProtocol,
kSentClose,
kByteParser,
kReceivedClose
} = require('./symbols')
const { fireEvent, failWebsocketConnection } = require('./util')
const { CloseEvent } = require('./events')
const { ByteParser } = require('./receiver')
const { makeRequest } = require('../fetch/request')
const { fetching } = require('../fetch/index')
const { getGlobalDispatcher } = require('../..')
const { Headers } = require('../fetch/headers')
const { getGlobalDispatcher } = require('../global')
const { kHeadersList } = require('../core/symbols')
const channels = {}
channels.open = diagnosticsChannel.channel('undici:websocket:open')
@ -29,8 +27,10 @@ channels.socketError = diagnosticsChannel.channel('undici:websocket:socket_error
* @param {URL} url
* @param {string|string[]} protocols
* @param {import('./websocket').WebSocket} ws
* @param {(response: any) => void} onEstablish
* @param {Partial<import('../../types/websocket').WebSocketInit>} options
*/
function establishWebSocketConnection (url, protocols, ws) {
function establishWebSocketConnection (url, protocols, ws, onEstablish, options) {
// 1. Let requestURL be a copy of url, with its scheme set to "http", if urls
// scheme is "ws", and to "https" otherwise.
const requestURL = url
@ -51,6 +51,13 @@ function establishWebSocketConnection (url, protocols, ws) {
redirect: 'error'
})
// Note: undici extension, allow setting custom headers.
if (options.headers) {
const headersList = new Headers(options.headers)[kHeadersList]
request.headersList = headersList
}
// 3. Append (`Upgrade`, `websocket`) to requests header list.
// 4. Append (`Connection`, `Upgrade`) to requests header list.
// Note: both of these are handled by undici currently.
@ -91,7 +98,7 @@ function establishWebSocketConnection (url, protocols, ws) {
const controller = fetching({
request,
useParallelQueue: true,
dispatcher: getGlobalDispatcher(),
dispatcher: options.dispatcher ?? getGlobalDispatcher(),
processResponse (response) {
// 1. If response is a network error or its status is not 101,
// fail the WebSocket connection.
@ -173,67 +180,25 @@ function establishWebSocketConnection (url, protocols, ws) {
return
}
// processResponse is called when the "responses header list has been received and initialized."
// once this happens, the connection is open
ws[kResponse] = response
const parser = new ByteParser(ws)
response.socket.ws = ws // TODO: use symbol
ws[kByteParser] = parser
whenConnectionEstablished(ws)
response.socket.on('data', onSocketData)
response.socket.on('close', onSocketClose)
response.socket.on('error', onSocketError)
parser.on('drain', onParserDrain)
if (channels.open.hasSubscribers) {
channels.open.publish({
address: response.socket.address(),
protocol: secProtocol,
extensions: secExtension
})
}
onEstablish(response)
}
})
return controller
}
/**
* @see https://websockets.spec.whatwg.org/#feedback-from-the-protocol
* @param {import('./websocket').WebSocket} ws
*/
function whenConnectionEstablished (ws) {
const { [kResponse]: response } = ws
// 1. Change the ready state to OPEN (1).
ws[kReadyState] = states.OPEN
// 2. Change the extensions attributes value to the extensions in use, if
// it is not the null value.
// https://datatracker.ietf.org/doc/html/rfc6455#section-9.1
const extensions = response.headersList.get('sec-websocket-extensions')
if (extensions !== null) {
ws[kExtensions] = extensions
}
// 3. Change the protocol attributes value to the subprotocol in use, if
// it is not the null value.
// https://datatracker.ietf.org/doc/html/rfc6455#section-1.9
const protocol = response.headersList.get('sec-websocket-protocol')
if (protocol !== null) {
ws[kProtocol] = protocol
}
// 4. Fire an event named open at the WebSocket object.
fireEvent('open', ws)
if (channels.open.hasSubscribers) {
channels.open.publish({
address: response.socket.address(),
protocol,
extensions
})
}
}
/**
* @param {Buffer} chunk
*/
@ -243,10 +208,6 @@ function onSocketData (chunk) {
}
}
function onParserDrain () {
this.ws[kResponse].socket.resume()
}
/**
* @see https://websockets.spec.whatwg.org/#feedback-from-the-protocol
* @see https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.4