Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions doc/api/diagnostics_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -1576,13 +1576,35 @@ Unlike `http.client.request.start`, this event is emitted before the request has

Emitted when client starts a request.

##### Event: `'http.client.request.bodyChunkSent'`

* `request` {http.ClientRequest}
* `chunk` {string|Buffer|TypedArray|DataView}
* `encoding` {string|null}

Emitted when a chunk of the client request body is being sent.

##### Event: `'http.client.request.bodySent'`

* `request` {http.ClientRequest}

Emitted after the client request body has been fully sent.

##### Event: `'http.client.request.error'`

* `request` {http.ClientRequest}
* `error` {Error}

Emitted when an error occurs during a client request.

##### Event: `'http.client.response.bodyChunkReceived'`

* `request` {http.ClientRequest}
* `response` {http.IncomingMessage}
* `chunk` {Buffer}

Emitted when a chunk of the client response body is received.

##### Event: `'http.client.response.finish'`

* `request` {http.ClientRequest}
Expand Down
2 changes: 2 additions & 0 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
const { URL, urlToHttpOptions, isURL } = require('internal/url');
const {
kIsClientRequest,
kOutHeaders,
kNeedDrain,
isTraceHTTPEnabled,
Expand Down Expand Up @@ -192,6 +193,7 @@ function rewriteForProxiedHttp(req, reqOptions) {

function ClientRequest(input, options, cb) {
OutgoingMessage.call(this);
this[kIsClientRequest] = true;

if (typeof input === 'string') {
const urlStr = input;
Expand Down
12 changes: 12 additions & 0 deletions lib/_http_common.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const {
Uint8Array,
} = primordials;
const { setImmediate } = require('timers');
const dc = require('diagnostics_channel');

const { methods, allMethods, HTTPParser } = internalBinding('http_parser');
const { getOptionValue } = require('internal/options');
Expand All @@ -50,6 +51,9 @@ const kOnMessageComplete = HTTPParser.kOnMessageComplete | 0;
const kOnExecute = HTTPParser.kOnExecute | 0;
const kOnTimeout = HTTPParser.kOnTimeout | 0;

const onClientResponseBodyChunkReceivedChannel =
dc.channel('http.client.response.bodyChunkReceived');

const MAX_HEADER_PAIRS = 2000;

// Only called in the slow case where slow means
Expand Down Expand Up @@ -120,6 +124,7 @@ function parserOnHeadersComplete(versionMajor, versionMinor, headers, method,
// client only
incoming.statusCode = statusCode;
incoming.statusMessage = statusMessage;
incoming.req = socket?._httpMessage;
}

return parser.onIncoming(incoming, shouldKeepAlive);
Expand All @@ -134,6 +139,13 @@ function parserOnBody(b) {

// Pretend this was the result of a stream._read call.
if (!stream._dumped) {
if (stream.req && onClientResponseBodyChunkReceivedChannel.hasSubscribers) {
onClientResponseBodyChunkReceivedChannel.publish({
request: stream.req,
response: stream,
chunk: b,
});
}
const ret = stream.push(b);
if (!ret)
readStop(this.socket);
Expand Down
26 changes: 25 additions & 1 deletion lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ const {

const { getDefaultHighWaterMark } = require('internal/streams/state');
const assert = require('internal/assert');
const dc = require('diagnostics_channel');
const EE = require('events');
const Stream = require('stream');
const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http');
const { kIsClientRequest, kOutHeaders, utcDate, kNeedDrain } = require('internal/http');
const { Buffer } = require('buffer');
const {
_checkIsHttpToken: checkIsHttpToken,
Expand Down Expand Up @@ -86,6 +87,12 @@ const kBytesWritten = Symbol('kBytesWritten');
const kErrored = Symbol('errored');
const kHighWaterMark = Symbol('kHighWaterMark');
const kRejectNonStandardBodyWrites = Symbol('kRejectNonStandardBodyWrites');
const kClientRequestBodyChunksWritten = Symbol('kClientRequestBodyChunksWritten');

const onClientRequestBodyChunkSentChannel =
dc.channel('http.client.request.bodyChunkSent');
const onClientRequestBodySentChannel =
dc.channel('http.client.request.bodySent');

const nop = () => {};

Expand Down Expand Up @@ -950,6 +957,17 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
}
}

if (msg[kIsClientRequest]) {
msg[kClientRequestBodyChunksWritten] = true;
if (onClientRequestBodyChunkSentChannel.hasSubscribers) {
onClientRequestBodyChunkSentChannel.publish({
request: msg,
chunk,
encoding,
});
}
}

if (!fromEnd && msg.socket && !msg.socket.writableCorked) {
msg.socket.cork();
process.nextTick(connectionCorkNT, msg.socket);
Expand Down Expand Up @@ -1103,6 +1121,12 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {

this.finished = true;

if (this[kIsClientRequest] &&
this[kClientRequestBodyChunksWritten] &&
onClientRequestBodySentChannel.hasSubscribers) {
onClientRequestBodySentChannel.publish({ request: this });
}

// There is the first message on the outgoing queue, and we've sent
// everything to the socket.
debug('outgoing message end.');
Expand Down
1 change: 1 addition & 0 deletions lib/internal/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ function getGlobalAgent(proxyEnv, Agent) {
}

module.exports = {
kIsClientRequest: Symbol('kIsClientRequest'),
kOutHeaders: Symbol('kOutHeaders'),
kNeedDrain: Symbol('kNeedDrain'),
kProxyConfig: Symbol('kProxyConfig'),
Expand Down
71 changes: 59 additions & 12 deletions lib/internal/inspector/network_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const {
sniffMimeType,
} = require('internal/inspector/network');
const { Network } = require('inspector');
const EventEmitter = require('events');
const { Buffer } = require('buffer');

const kRequestUrl = Symbol('kRequestUrl');

Expand Down Expand Up @@ -95,6 +95,61 @@ function onClientRequestError({ request, error }) {
});
}

/**
* When a chunk of the request body is being sent, cache it until
* `getRequestPostData` request.
* https://chromedevtools.github.io/devtools-protocol/1-3/Network/#method-getRequestPostData
* @param {{ request: import('http').ClientRequest, chunk: Uint8Array | string, encoding?: string }} event
*/
function onClientRequestBodyChunkSent({ request, chunk, encoding }) {
if (typeof request[kInspectorRequestId] !== 'string') {
return;
}

const buffer = typeof chunk === 'string' ? Buffer.from(chunk, encoding) : Buffer.from(chunk);
Network.dataSent({
requestId: request[kInspectorRequestId],
timestamp: getMonotonicTime(),
dataLength: buffer.byteLength,
data: buffer,
});
}

/**
* Mark a request body as fully sent.
* @param {{ request: import('http').ClientRequest }} event
*/
function onClientRequestBodySent({ request }) {
if (typeof request[kInspectorRequestId] !== 'string') {
return;
}

Network.dataSent({
requestId: request[kInspectorRequestId],
finished: true,
});
}

/**
* When a chunk of the response body is received, cache the raw bytes until
* `getResponseBody` request.
* https://chromedevtools.github.io/devtools-protocol/1-3/Network/#method-getResponseBody
* @param {{ request: import('http').ClientRequest, chunk: Uint8Array }} event
*/
function onClientResponseBodyChunkReceived({ request, chunk }) {
if (typeof request[kInspectorRequestId] !== 'string') {
return;
}

Network.dataReceived({
requestId: request[kInspectorRequestId],
timestamp: getMonotonicTime(),
dataLength: chunk.byteLength,
encodedDataLength: chunk.byteLength,
data: chunk,
});
}

/**
* When response headers are received, emit Network.responseReceived event.
* https://chromedevtools.github.io/devtools-protocol/1-3/Network/#event-responseReceived
Expand All @@ -121,17 +176,6 @@ function onClientResponseFinish({ request, response }) {
},
});

// Unlike response.on('data', ...), this does not put the stream into flowing mode.
EventEmitter.prototype.on.call(response, 'data', (chunk) => {
Network.dataReceived({
requestId: request[kInspectorRequestId],
timestamp: getMonotonicTime(),
dataLength: chunk.byteLength,
encodedDataLength: chunk.byteLength,
data: chunk,
});
});

// Wait until the response body is consumed by user code.
response.once('end', () => {
Network.loadingFinished({
Expand All @@ -143,6 +187,9 @@ function onClientResponseFinish({ request, response }) {

module.exports = registerDiagnosticChannels([
['http.client.request.created', onClientRequestCreated],
['http.client.request.bodyChunkSent', onClientRequestBodyChunkSent],
['http.client.request.bodySent', onClientRequestBodySent],
['http.client.request.error', onClientRequestError],
['http.client.response.bodyChunkReceived', onClientResponseBodyChunkReceived],
['http.client.response.finish', onClientResponseFinish],
]);
78 changes: 67 additions & 11 deletions test/parallel/test-diagnostics-channel-http.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,40 @@ const isError = (error) => error instanceof Error;

dc.subscribe('http.client.request.start', common.mustCall(({ request }) => {
assert.strictEqual(isOutgoingMessage(request), true);
}, 4));

dc.subscribe('http.client.request.bodyChunkSent', common.mustCall(({ request, chunk, encoding }) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.ok(typeof chunk === 'string' || chunk instanceof Uint8Array);
assert.strictEqual(typeof encoding === 'string' || encoding == null, true);
}, 3));

dc.subscribe('http.client.request.bodySent', common.mustCall(({ request }) => {
assert.strictEqual(isOutgoingMessage(request), true);
}, 2));

dc.subscribe('http.client.request.error', common.mustCall(({ request, error }) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.strictEqual(isError(error), true);
}));

dc.subscribe('http.client.response.bodyChunkReceived', common.mustCall(({
request,
response,
chunk,
}) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.strictEqual(isIncomingMessage(response), true);
assert.ok(chunk instanceof Uint8Array);
}, 3));

dc.subscribe('http.client.response.finish', common.mustCall(({
request,
response
}) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.strictEqual(isIncomingMessage(response), true);
}));
}, 3));

dc.subscribe('http.server.request.start', common.mustCall(({
request,
Expand All @@ -39,7 +59,7 @@ dc.subscribe('http.server.request.start', common.mustCall(({
assert.strictEqual(isOutgoingMessage(response), true);
assert.strictEqual(isNetSocket(socket), true);
assert.strictEqual(isHTTPServer(server), true);
}));
}, 3));

dc.subscribe('http.server.response.finish', common.mustCall(({
request,
Expand All @@ -51,24 +71,37 @@ dc.subscribe('http.server.response.finish', common.mustCall(({
assert.strictEqual(isOutgoingMessage(response), true);
assert.strictEqual(isNetSocket(socket), true);
assert.strictEqual(isHTTPServer(server), true);
}));
}, 3));

dc.subscribe('http.server.response.created', common.mustCall(({
request,
response,
}) => {
assert.strictEqual(isIncomingMessage(request), true);
assert.strictEqual(isOutgoingMessage(response), true);
}));
}, 3));

dc.subscribe('http.client.request.created', common.mustCall(({ request }) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.strictEqual(isHTTPServer(server), true);
}, 2));
}, 4));

const server = http.createServer(common.mustCall((req, res) => {
res.end('done');
}));
const chunks = [];
req.on('data', (chunk) => chunks.push(chunk));
req.on('end', common.mustCall(() => {
if (req.method === 'POST' && req.url === '/string-body') {
assert.strictEqual(Buffer.concat(chunks).toString(), 'foobar');
} else if (req.method === 'POST' && req.url === '/binary-body') {
assert.deepStrictEqual(Buffer.concat(chunks), Buffer.from([0, 1, 2, 3]));
} else {
assert.strictEqual(req.method, 'GET');
assert.strictEqual(req.url, '/');
assert.strictEqual(Buffer.concat(chunks).byteLength, 0);
}
res.end('done');
}));
}, 3));

server.listen(async () => {
const { port } = server.address();
Expand All @@ -78,10 +111,33 @@ server.listen(async () => {
await new Promise((resolve) => {
invalidRequest.on('error', resolve);
});
http.get(`http://localhost:${port}`, (res) => {
res.resume();
res.on('end', () => {
server.close();
await new Promise((resolve, reject) => {
http.get(`http://localhost:${port}`, (res) => {
res.setEncoding('utf8');
res.resume();
res.on('end', resolve);
}).on('error', reject);
});
await new Promise((resolve, reject) => {
const req = http.request(`http://localhost:${port}/string-body`, {
method: 'POST',
}, (res) => {
res.resume();
res.on('end', resolve);
});
req.on('error', reject);
req.write('foo');
req.end('bar');
});
await new Promise((resolve, reject) => {
const req = http.request(`http://localhost:${port}/binary-body`, {
method: 'POST',
}, (res) => {
res.resume();
res.on('end', resolve);
});
req.on('error', reject);
req.end(Buffer.from([0, 1, 2, 3]));
});
server.close();
});
Loading