diff --git a/doc/api/diagnostics_channel.md b/doc/api/diagnostics_channel.md index 038c7cb29dd2d1..246cd2ffa29dc5 100644 --- a/doc/api/diagnostics_channel.md +++ b/doc/api/diagnostics_channel.md @@ -1576,6 +1576,20 @@ Unlike `http.client.request.start`, this event is emitted before the request has Emitted when client starts a request. +##### Event: `'http.client.request.bodyChunkSent'` + +* `request` {http.ClientRequest} +* `chunk` {string|Buffer|TypedArray|DataView} +* `encoding` {string|null} + +Emitted when a chunk of the client request body is being sent. + +##### Event: `'http.client.request.bodySent'` + +* `request` {http.ClientRequest} + +Emitted after the client request body has been fully sent. + ##### Event: `'http.client.request.error'` * `request` {http.ClientRequest} @@ -1583,6 +1597,14 @@ Emitted when client starts a request. Emitted when an error occurs during a client request. +##### Event: `'http.client.response.bodyChunkReceived'` + +* `request` {http.ClientRequest} +* `response` {http.IncomingMessage} +* `chunk` {Buffer} + +Emitted when a chunk of the client response body is received. + ##### Event: `'http.client.response.finish'` * `request` {http.ClientRequest} diff --git a/lib/_http_client.js b/lib/_http_client.js index c14e899dabbf04..c402aa7d1f3b53 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -60,6 +60,7 @@ const { Buffer } = require('buffer'); const { defaultTriggerAsyncIdScope } = require('internal/async_hooks'); const { URL, urlToHttpOptions, isURL } = require('internal/url'); const { + kIsClientRequest, kOutHeaders, kNeedDrain, isTraceHTTPEnabled, @@ -192,6 +193,7 @@ function rewriteForProxiedHttp(req, reqOptions) { function ClientRequest(input, options, cb) { OutgoingMessage.call(this); + this[kIsClientRequest] = true; if (typeof input === 'string') { const urlStr = input; diff --git a/lib/_http_common.js b/lib/_http_common.js index 3c389ba054decc..39a8e56aab9d50 100644 --- a/lib/_http_common.js +++ b/lib/_http_common.js @@ -27,6 +27,7 @@ const { Uint8Array, } = primordials; const { setImmediate } = require('timers'); +const dc = require('diagnostics_channel'); const { methods, allMethods, HTTPParser } = internalBinding('http_parser'); const { getOptionValue } = require('internal/options'); @@ -50,6 +51,9 @@ const kOnMessageComplete = HTTPParser.kOnMessageComplete | 0; const kOnExecute = HTTPParser.kOnExecute | 0; const kOnTimeout = HTTPParser.kOnTimeout | 0; +const onClientResponseBodyChunkReceivedChannel = + dc.channel('http.client.response.bodyChunkReceived'); + const MAX_HEADER_PAIRS = 2000; // Only called in the slow case where slow means @@ -120,6 +124,7 @@ function parserOnHeadersComplete(versionMajor, versionMinor, headers, method, // client only incoming.statusCode = statusCode; incoming.statusMessage = statusMessage; + incoming.req = socket?._httpMessage; } return parser.onIncoming(incoming, shouldKeepAlive); @@ -134,6 +139,13 @@ function parserOnBody(b) { // Pretend this was the result of a stream._read call. if (!stream._dumped) { + if (stream.req && onClientResponseBodyChunkReceivedChannel.hasSubscribers) { + onClientResponseBodyChunkReceivedChannel.publish({ + request: stream.req, + response: stream, + chunk: b, + }); + } const ret = stream.push(b); if (!ret) readStop(this.socket); diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index c5702bc9eab092..0d2b77383ba401 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -36,9 +36,10 @@ const { const { getDefaultHighWaterMark } = require('internal/streams/state'); const assert = require('internal/assert'); +const dc = require('diagnostics_channel'); const EE = require('events'); const Stream = require('stream'); -const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http'); +const { kIsClientRequest, kOutHeaders, utcDate, kNeedDrain } = require('internal/http'); const { Buffer } = require('buffer'); const { _checkIsHttpToken: checkIsHttpToken, @@ -86,6 +87,12 @@ const kBytesWritten = Symbol('kBytesWritten'); const kErrored = Symbol('errored'); const kHighWaterMark = Symbol('kHighWaterMark'); const kRejectNonStandardBodyWrites = Symbol('kRejectNonStandardBodyWrites'); +const kClientRequestBodyChunksWritten = Symbol('kClientRequestBodyChunksWritten'); + +const onClientRequestBodyChunkSentChannel = + dc.channel('http.client.request.bodyChunkSent'); +const onClientRequestBodySentChannel = + dc.channel('http.client.request.bodySent'); const nop = () => {}; @@ -950,6 +957,17 @@ function write_(msg, chunk, encoding, callback, fromEnd) { } } + if (msg[kIsClientRequest]) { + msg[kClientRequestBodyChunksWritten] = true; + if (onClientRequestBodyChunkSentChannel.hasSubscribers) { + onClientRequestBodyChunkSentChannel.publish({ + request: msg, + chunk, + encoding, + }); + } + } + if (!fromEnd && msg.socket && !msg.socket.writableCorked) { msg.socket.cork(); process.nextTick(connectionCorkNT, msg.socket); @@ -1103,6 +1121,12 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { this.finished = true; + if (this[kIsClientRequest] && + this[kClientRequestBodyChunksWritten] && + onClientRequestBodySentChannel.hasSubscribers) { + onClientRequestBodySentChannel.publish({ request: this }); + } + // There is the first message on the outgoing queue, and we've sent // everything to the socket. debug('outgoing message end.'); diff --git a/lib/internal/http.js b/lib/internal/http.js index 54f1121eb712c0..ee3491278bfe16 100644 --- a/lib/internal/http.js +++ b/lib/internal/http.js @@ -262,6 +262,7 @@ function getGlobalAgent(proxyEnv, Agent) { } module.exports = { + kIsClientRequest: Symbol('kIsClientRequest'), kOutHeaders: Symbol('kOutHeaders'), kNeedDrain: Symbol('kNeedDrain'), kProxyConfig: Symbol('kProxyConfig'), diff --git a/lib/internal/inspector/network_http.js b/lib/internal/inspector/network_http.js index 8d324c8c544eea..05e81af5a91e77 100644 --- a/lib/internal/inspector/network_http.js +++ b/lib/internal/inspector/network_http.js @@ -17,7 +17,7 @@ const { sniffMimeType, } = require('internal/inspector/network'); const { Network } = require('inspector'); -const EventEmitter = require('events'); +const { Buffer } = require('buffer'); const kRequestUrl = Symbol('kRequestUrl'); @@ -95,6 +95,61 @@ function onClientRequestError({ request, error }) { }); } +/** + * When a chunk of the request body is being sent, cache it until + * `getRequestPostData` request. + * https://chromedevtools.github.io/devtools-protocol/1-3/Network/#method-getRequestPostData + * @param {{ request: import('http').ClientRequest, chunk: Uint8Array | string, encoding?: string }} event + */ +function onClientRequestBodyChunkSent({ request, chunk, encoding }) { + if (typeof request[kInspectorRequestId] !== 'string') { + return; + } + + const buffer = typeof chunk === 'string' ? Buffer.from(chunk, encoding) : Buffer.from(chunk); + Network.dataSent({ + requestId: request[kInspectorRequestId], + timestamp: getMonotonicTime(), + dataLength: buffer.byteLength, + data: buffer, + }); +} + +/** + * Mark a request body as fully sent. + * @param {{ request: import('http').ClientRequest }} event + */ +function onClientRequestBodySent({ request }) { + if (typeof request[kInspectorRequestId] !== 'string') { + return; + } + + Network.dataSent({ + requestId: request[kInspectorRequestId], + finished: true, + }); +} + +/** + * When a chunk of the response body is received, cache the raw bytes until + * `getResponseBody` request. + * https://chromedevtools.github.io/devtools-protocol/1-3/Network/#method-getResponseBody + * @param {{ request: import('http').ClientRequest, chunk: Uint8Array }} event + */ +function onClientResponseBodyChunkReceived({ request, chunk }) { + if (typeof request[kInspectorRequestId] !== 'string') { + return; + } + + Network.dataReceived({ + requestId: request[kInspectorRequestId], + timestamp: getMonotonicTime(), + dataLength: chunk.byteLength, + encodedDataLength: chunk.byteLength, + data: chunk, + }); +} + /** * When response headers are received, emit Network.responseReceived event. * https://chromedevtools.github.io/devtools-protocol/1-3/Network/#event-responseReceived @@ -121,17 +176,6 @@ function onClientResponseFinish({ request, response }) { }, }); - // Unlike response.on('data', ...), this does not put the stream into flowing mode. - EventEmitter.prototype.on.call(response, 'data', (chunk) => { - Network.dataReceived({ - requestId: request[kInspectorRequestId], - timestamp: getMonotonicTime(), - dataLength: chunk.byteLength, - encodedDataLength: chunk.byteLength, - data: chunk, - }); - }); - // Wait until the response body is consumed by user code. response.once('end', () => { Network.loadingFinished({ @@ -143,6 +187,9 @@ function onClientResponseFinish({ request, response }) { module.exports = registerDiagnosticChannels([ ['http.client.request.created', onClientRequestCreated], + ['http.client.request.bodyChunkSent', onClientRequestBodyChunkSent], + ['http.client.request.bodySent', onClientRequestBodySent], ['http.client.request.error', onClientRequestError], + ['http.client.response.bodyChunkReceived', onClientResponseBodyChunkReceived], ['http.client.response.finish', onClientResponseFinish], ]); diff --git a/test/parallel/test-diagnostics-channel-http.js b/test/parallel/test-diagnostics-channel-http.js index fd371a5d259f0b..ed89f876d74abd 100644 --- a/test/parallel/test-diagnostics-channel-http.js +++ b/test/parallel/test-diagnostics-channel-http.js @@ -14,6 +14,16 @@ const isError = (error) => error instanceof Error; dc.subscribe('http.client.request.start', common.mustCall(({ request }) => { assert.strictEqual(isOutgoingMessage(request), true); +}, 4)); + +dc.subscribe('http.client.request.bodyChunkSent', common.mustCall(({ request, chunk, encoding }) => { + assert.strictEqual(isOutgoingMessage(request), true); + assert.ok(typeof chunk === 'string' || chunk instanceof Uint8Array); + assert.strictEqual(typeof encoding === 'string' || encoding == null, true); +}, 3)); + +dc.subscribe('http.client.request.bodySent', common.mustCall(({ request }) => { + assert.strictEqual(isOutgoingMessage(request), true); }, 2)); dc.subscribe('http.client.request.error', common.mustCall(({ request, error }) => { @@ -21,13 +31,23 @@ dc.subscribe('http.client.request.error', common.mustCall(({ request, error }) = assert.strictEqual(isError(error), true); })); +dc.subscribe('http.client.response.bodyChunkReceived', common.mustCall(({ + request, + response, + chunk, +}) => { + assert.strictEqual(isOutgoingMessage(request), true); + assert.strictEqual(isIncomingMessage(response), true); + assert.ok(chunk instanceof Uint8Array); +}, 3)); + dc.subscribe('http.client.response.finish', common.mustCall(({ request, response }) => { assert.strictEqual(isOutgoingMessage(request), true); assert.strictEqual(isIncomingMessage(response), true); -})); +}, 3)); dc.subscribe('http.server.request.start', common.mustCall(({ request, @@ -39,7 +59,7 @@ dc.subscribe('http.server.request.start', common.mustCall(({ assert.strictEqual(isOutgoingMessage(response), true); assert.strictEqual(isNetSocket(socket), true); assert.strictEqual(isHTTPServer(server), true); -})); +}, 3)); dc.subscribe('http.server.response.finish', common.mustCall(({ request, @@ -51,7 +71,7 @@ dc.subscribe('http.server.response.finish', common.mustCall(({ assert.strictEqual(isOutgoingMessage(response), true); assert.strictEqual(isNetSocket(socket), true); assert.strictEqual(isHTTPServer(server), true); -})); +}, 3)); dc.subscribe('http.server.response.created', common.mustCall(({ request, @@ -59,16 +79,29 @@ dc.subscribe('http.server.response.created', common.mustCall(({ }) => { assert.strictEqual(isIncomingMessage(request), true); assert.strictEqual(isOutgoingMessage(response), true); -})); +}, 3)); dc.subscribe('http.client.request.created', common.mustCall(({ request }) => { assert.strictEqual(isOutgoingMessage(request), true); assert.strictEqual(isHTTPServer(server), true); -}, 2)); +}, 4)); const server = http.createServer(common.mustCall((req, res) => { - res.end('done'); -})); + const chunks = []; + req.on('data', (chunk) => chunks.push(chunk)); + req.on('end', common.mustCall(() => { + if (req.method === 'POST' && req.url === '/string-body') { + assert.strictEqual(Buffer.concat(chunks).toString(), 'foobar'); + } else if (req.method === 'POST' && req.url === '/binary-body') { + assert.deepStrictEqual(Buffer.concat(chunks), Buffer.from([0, 1, 2, 3])); + } else { + assert.strictEqual(req.method, 'GET'); + assert.strictEqual(req.url, '/'); + assert.strictEqual(Buffer.concat(chunks).byteLength, 0); + } + res.end('done'); + })); +}, 3)); server.listen(async () => { const { port } = server.address(); @@ -78,10 +111,33 @@ server.listen(async () => { await new Promise((resolve) => { invalidRequest.on('error', resolve); }); - http.get(`http://localhost:${port}`, (res) => { - res.resume(); - res.on('end', () => { - server.close(); + await new Promise((resolve, reject) => { + http.get(`http://localhost:${port}`, (res) => { + res.setEncoding('utf8'); + res.resume(); + res.on('end', resolve); + }).on('error', reject); + }); + await new Promise((resolve, reject) => { + const req = http.request(`http://localhost:${port}/string-body`, { + method: 'POST', + }, (res) => { + res.resume(); + res.on('end', resolve); + }); + req.on('error', reject); + req.write('foo'); + req.end('bar'); + }); + await new Promise((resolve, reject) => { + const req = http.request(`http://localhost:${port}/binary-body`, { + method: 'POST', + }, (res) => { + res.resume(); + res.on('end', resolve); }); + req.on('error', reject); + req.end(Buffer.from([0, 1, 2, 3])); }); + server.close(); }); diff --git a/test/parallel/test-inspector-network-http.js b/test/parallel/test-inspector-network-http.js index 3f0014f2459253..d5190c3b37bf32 100644 --- a/test/parallel/test-inspector-network-http.js +++ b/test/parallel/test-inspector-network-http.js @@ -22,6 +22,16 @@ const requestHeaders = { 'x-header1': ['value1', 'value2'] }; +const requestBodyHeaders = { + ...requestHeaders, + 'content-type': 'text/plain; charset=utf-8', +}; + +const binaryRequestBodyHeaders = { + ...requestHeaders, + 'content-type': 'application/octet-stream', +}; + const setResponseHeaders = (res) => { res.setHeader('server', 'node'); res.setHeader('etag', 12345); @@ -46,6 +56,34 @@ const handleRequest = (req, res) => { res.end('hello world\n'); }, kTimeout); break; + case '/text-body': { + const chunks = []; + req.on('data', (chunk) => chunks.push(chunk)); + req.on('end', () => { + if (Buffer.concat(chunks).toString() !== 'foobar') { + throw new Error('Unexpected text request body'); + } + setResponseHeaders(res); + res.writeHead(200); + res.end('hello world\n'); + }); + break; + } + case '/binary-body': { + const chunks = []; + req.on('data', (chunk) => chunks.push(chunk)); + req.on('end', () => { + const body = Buffer.concat(chunks); + const expectedBody = Buffer.from([0, 1, 2, 3]); + if (!body.equals(expectedBody)) { + throw new Error('Unexpected binary request body'); + } + setResponseHeaders(res); + res.writeHead(200); + res.end('hello world\n'); + }); + break; + } default: assert.fail(`Unexpected path: ${path}`); } @@ -77,12 +115,15 @@ function verifyRequestWillBeSent({ method, params }, expect) { assert.ok(params.requestId.startsWith('node-network-event-')); assert.strictEqual(params.request.url, expect.url); - assert.strictEqual(params.request.method, 'GET'); + assert.strictEqual(params.request.method, expect.method); assert.strictEqual(typeof params.request.headers, 'object'); assert.strictEqual(params.request.headers['accept-language'], 'en-US'); assert.strictEqual(params.request.headers.cookie, 'k1=v1; k2=v2'); assert.strictEqual(params.request.headers.age, '1000'); assert.strictEqual(params.request.headers['x-header1'], 'value1, value2'); + if (expect.contentType) { + assert.strictEqual(params.request.headers['content-type'], expect.contentType); + } assert.strictEqual(typeof params.timestamp, 'number'); assert.strictEqual(typeof params.wallTime, 'number'); @@ -130,8 +171,11 @@ function verifyLoadingFailed({ method, params }) { assert.strictEqual(typeof params.errorText, 'string'); } -function verifyHttpResponse(response) { +function verifyHttpResponse(response, expectedBody = '\nhello world\n', responseEncoding) { assert.strictEqual(response.statusCode, 200); + if (responseEncoding) { + response.setEncoding(responseEncoding); + } const chunks = []; // Verifies that the inspector does not put the response into flowing mode. @@ -146,15 +190,15 @@ function verifyHttpResponse(response) { })); response.on('end', common.mustCall(() => { - const body = Buffer.concat(chunks).toString(); - assert.strictEqual(body, '\nhello world\n'); + const body = responseEncoding ? chunks.join('') : Buffer.concat(chunks).toString(); + assert.strictEqual(body, expectedBody); })); } async function testHttpGet() { const url = `http://127.0.0.1:${httpServer.address().port}/hello-world`; const requestWillBeSentFuture = once(session, 'Network.requestWillBeSent') - .then(([event]) => verifyRequestWillBeSent(event, { url })); + .then(([event]) => verifyRequestWillBeSent(event, { url, method: 'GET' })); const responseReceivedFuture = once(session, 'Network.responseReceived') .then(([event]) => verifyResponseReceived(event, { url })); @@ -186,7 +230,7 @@ async function testHttpGet() { async function testHttpsGet() { const url = `https://127.0.0.1:${httpsServer.address().port}/hello-world`; const requestWillBeSentFuture = once(session, 'Network.requestWillBeSent') - .then(([event]) => verifyRequestWillBeSent(event, { url })); + .then(([event]) => verifyRequestWillBeSent(event, { url, method: 'GET' })); const responseReceivedFuture = once(session, 'Network.responseReceived') .then(([event]) => verifyResponseReceived(event, { url })); @@ -219,7 +263,7 @@ async function testHttpsGet() { async function testHttpError() { const url = `http://${addresses.INVALID_HOST}/`; const requestWillBeSentFuture = once(session, 'Network.requestWillBeSent') - .then(([event]) => verifyRequestWillBeSent(event, { url })); + .then(([event]) => verifyRequestWillBeSent(event, { url, method: 'GET' })); session.on('Network.responseReceived', common.mustNotCall()); session.on('Network.loadingFinished', common.mustNotCall()); @@ -238,7 +282,7 @@ async function testHttpError() { async function testHttpsError() { const url = `https://${addresses.INVALID_HOST}/`; const requestWillBeSentFuture = once(session, 'Network.requestWillBeSent') - .then(([event]) => verifyRequestWillBeSent(event, { url })); + .then(([event]) => verifyRequestWillBeSent(event, { url, method: 'GET' })); session.on('Network.responseReceived', common.mustNotCall()); session.on('Network.loadingFinished', common.mustNotCall()); @@ -254,11 +298,122 @@ async function testHttpsError() { await loadingFailedFuture; } +async function makeHttpRequest( + requestModule, + options, + bodyWriter, + expectedBody = 'hello world\n', + responseEncoding, +) { + return new Promise((resolve, reject) => { + const req = requestModule.request(options, common.mustCall((res) => { + verifyHttpResponse(res, expectedBody, responseEncoding); + resolve(res); + })); + req.on('error', reject); + bodyWriter(req); + }); +} + +async function testTextBodyRequest({ requestModule, protocol, port, requestOptions }) { + const url = `${protocol}://127.0.0.1:${port}/text-body`; + requestOptions ??= {}; + const responseEncoding = protocol === 'http' ? 'utf8' : undefined; + const requestWillBeSentFuture = once(session, 'Network.requestWillBeSent') + .then(([event]) => verifyRequestWillBeSent(event, { + url, + method: 'POST', + contentType: 'text/plain; charset=utf-8', + })); + const responseReceivedFuture = once(session, 'Network.responseReceived') + .then(([event]) => verifyResponseReceived(event, { url })); + const loadingFinishedFuture = once(session, 'Network.loadingFinished') + .then(([event]) => verifyLoadingFinished(event)); + + await makeHttpRequest(requestModule, { + host: '127.0.0.1', + port, + path: '/text-body', + method: 'POST', + ...requestOptions, + headers: requestBodyHeaders, + }, (req) => { + req.write('foo'); + req.end('bar'); + }, 'hello world\n', responseEncoding); + + await requestWillBeSentFuture; + const responseReceived = await responseReceivedFuture; + const loadingFinished = await loadingFinishedFuture; + + assert.ok(loadingFinished.timestamp >= responseReceived.timestamp); + + const requestBody = await session.post('Network.getRequestPostData', { + requestId: responseReceived.requestId, + }); + assert.strictEqual(requestBody.postData, 'foobar'); + + const responseBody = await session.post('Network.getResponseBody', { + requestId: responseReceived.requestId, + }); + assert.strictEqual(responseBody.base64Encoded, false); + assert.strictEqual(responseBody.body, 'hello world\n'); +} + +async function testBinaryBodyRequest() { + const url = `http://127.0.0.1:${httpServer.address().port}/binary-body`; + const requestWillBeSentFuture = once(session, 'Network.requestWillBeSent') + .then(([event]) => verifyRequestWillBeSent(event, { + url, + method: 'POST', + contentType: 'application/octet-stream', + })); + const responseReceivedFuture = once(session, 'Network.responseReceived') + .then(([event]) => verifyResponseReceived(event, { url })); + const loadingFinishedFuture = once(session, 'Network.loadingFinished') + .then(([event]) => verifyLoadingFinished(event)); + + await makeHttpRequest(http, { + host: '127.0.0.1', + port: httpServer.address().port, + path: '/binary-body', + method: 'POST', + headers: binaryRequestBodyHeaders, + }, (req) => { + req.end(Buffer.from([0, 1, 2, 3])); + }, 'hello world\n'); + + await requestWillBeSentFuture; + const responseReceived = await responseReceivedFuture; + await loadingFinishedFuture; + + await assert.rejects(session.post('Network.getRequestPostData', { + requestId: responseReceived.requestId, + }), { + code: 'ERR_INSPECTOR_COMMAND', + }); +} + const testNetworkInspection = async () => { await testHttpGet(); session.removeAllListeners(); await testHttpsGet(); session.removeAllListeners(); + await testTextBodyRequest({ + requestModule: http, + protocol: 'http', + port: httpServer.address().port, + }); + session.removeAllListeners(); + await testTextBodyRequest({ + requestModule: https, + protocol: 'https', + port: httpsServer.address().port, + requestOptions: { rejectUnauthorized: false }, + }); + session.removeAllListeners(); + await testBinaryBodyRequest(); + session.removeAllListeners(); await testHttpError(); session.removeAllListeners(); await testHttpsError();