diff --git a/handwritten/storage/src/bucket.ts b/handwritten/storage/src/bucket.ts index 5c796789ebd..527e7396f87 100644 --- a/handwritten/storage/src/bucket.ts +++ b/handwritten/storage/src/bucket.ts @@ -4505,13 +4505,19 @@ class Bucket extends ServiceObject { if (options.onUploadProgress) { writable.on('progress', options.onUploadProgress); } - fs.createReadStream(pathString) - .on('error', bail) + const readStream = fs.createReadStream(pathString); + readStream + .on('error', err => { + readStream.destroy(); + writable.destroy(); + bail(err); + }) .pipe(writable) .on('error', err => { + readStream.destroy(); if ( this.storage.retryOptions.autoRetry && - this.storage.retryOptions.retryableErrorFn!(err) + this.storage.retryOptions.retryableErrorFn!(err as ApiError) ) { return reject(err); } else { diff --git a/handwritten/storage/test/bucket.ts b/handwritten/storage/test/bucket.ts index 555d8e8c1c9..1f80a317afa 100644 --- a/handwritten/storage/test/bucket.ts +++ b/handwritten/storage/test/bucket.ts @@ -100,11 +100,18 @@ class FakeNotification { } let fsStatOverride: Function | null; +let fsCreateReadStreamOverride: Function | null; const fakeFs = { ...fs, stat: (filePath: string, callback: Function) => { return (fsStatOverride || fs.stat)(filePath, callback); }, + createReadStream: (filePath: string, options?: any) => { + return (fsCreateReadStreamOverride || fs.createReadStream)( + filePath, + options + ); + }, }; let pLimitOverride: Function | null; @@ -234,6 +241,7 @@ describe('Bucket', () => { beforeEach(() => { fsStatOverride = null; + fsCreateReadStreamOverride = null; pLimitOverride = null; bucket = new Bucket(STORAGE, BUCKET_NAME); }); @@ -3231,6 +3239,44 @@ describe('Bucket', () => { }); }); + it('should destroy the local read stream if write stream fails', done => { + const fakeFile = new FakeFile(bucket, 'file-name'); + const options = {destination: fakeFile, resumable: false}; + const originalCreateReadStream = fs.createReadStream; + let readStream: fs.ReadStream; + fsCreateReadStreamOverride = (path: string, opts: any) => { + readStream = originalCreateReadStream(path, opts); + return readStream; + }; + + fakeFile.createWriteStream = (options_: CreateWriteStreamOptions) => { + const ws = new stream.Writable({ + write(chunk, encoding, callback) { + callback(new Error('write error')); + }, + }); + return ws; + }; + + const textfilepath = path.join( + getDirName(), + '../../../test/testdata/textfile.txt' + ); + + bucket.upload(textfilepath, options, (err: Error) => { + try { + assert.strictEqual(err.message, 'write error'); + assert.ok(readStream); + assert.ok(readStream.destroyed); + done(); + } catch (e) { + done(e); + } finally { + fsCreateReadStreamOverride = null; + } + }); + }); + it('should allow overriding content type', done => { const fakeFile = new FakeFile(bucket, 'file-name'); const metadata = {contentType: 'made-up-content-type'};