import { Readable } from 'stream'; import type { Document, ObjectId } from '../bson'; import type { Collection } from '../collection'; import type { FindCursor } from '../cursor/find_cursor'; import { MongoGridFSChunkError, MongoGridFSStreamError, MongoInvalidArgumentError, MongoRuntimeError } from '../error'; import type { FindOptions } from '../operations/find'; import type { ReadPreference } from '../read_preference'; import type { Sort } from '../sort'; import type { Callback } from '../utils'; import type { GridFSChunk } from './upload'; /** @public */ export interface GridFSBucketReadStreamOptions { sort?: Sort; skip?: number; /** 0-based offset in bytes to start streaming from */ start?: number; /** 0-based offset in bytes to stop streaming before */ end?: number; } /** @public */ export interface GridFSBucketReadStreamOptionsWithRevision extends GridFSBucketReadStreamOptions { /** The revision number relative to the oldest file with the given filename. 0 * gets you the oldest file, 1 gets you the 2nd oldest, -1 gets you the * newest. */ revision?: number; } /** @public */ export interface GridFSFile { _id: ObjectId; length: number; chunkSize: number; filename: string; contentType?: string; aliases?: string[]; metadata?: Document; uploadDate: Date; } /** @internal */ export interface GridFSBucketReadStreamPrivate { bytesRead: number; bytesToTrim: number; bytesToSkip: number; chunks: Collection; cursor?: FindCursor; expected: number; files: Collection; filter: Document; init: boolean; expectedEnd: number; file?: GridFSFile; options: { sort?: Sort; skip?: number; start: number; end: number; }; readPreference?: ReadPreference; } /** * A readable stream that enables you to read buffers from GridFS. * * Do not instantiate this class directly. Use `openDownloadStream()` instead. * @public */ export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableStream { /** @internal */ s: GridFSBucketReadStreamPrivate; /** * An error occurred * @event */ static readonly ERROR = 'error' as const; /** * Fires when the stream loaded the file document corresponding to the provided id. * @event */ static readonly FILE = 'file' as const; /** * Emitted when a chunk of data is available to be consumed. * @event */ static readonly DATA = 'data' as const; /** * Fired when the stream is exhausted (no more data events). * @event */ static readonly END = 'end' as const; /** * Fired when the stream is exhausted and the underlying cursor is killed * @event */ static readonly CLOSE = 'close' as const; /** * @param chunks - Handle for chunks collection * @param files - Handle for files collection * @param readPreference - The read preference to use * @param filter - The filter to use to find the file document * @internal */ constructor( chunks: Collection, files: Collection, readPreference: ReadPreference | undefined, filter: Document, options?: GridFSBucketReadStreamOptions ) { super(); this.s = { bytesToTrim: 0, bytesToSkip: 0, bytesRead: 0, chunks, expected: 0, files, filter, init: false, expectedEnd: 0, options: { start: 0, end: 0, ...options }, readPreference }; } /** * Reads from the cursor and pushes to the stream. * Private Impl, do not call directly * @internal */ override _read(): void { if (this.destroyed) return; waitForFile(this, () => doRead(this)); } /** * Sets the 0-based offset in bytes to start streaming from. Throws * an error if this stream has entered flowing mode * (e.g. if you've already called `on('data')`) * * @param start - 0-based offset in bytes to start streaming from */ start(start = 0): this { throwIfInitialized(this); this.s.options.start = start; return this; } /** * Sets the 0-based offset in bytes to start streaming from. Throws * an error if this stream has entered flowing mode * (e.g. if you've already called `on('data')`) * * @param end - Offset in bytes to stop reading at */ end(end = 0): this { throwIfInitialized(this); this.s.options.end = end; return this; } /** * Marks this stream as aborted (will never push another `data` event) * and kills the underlying cursor. Will emit the 'end' event, and then * the 'close' event once the cursor is successfully killed. * * @param callback - called when the cursor is successfully closed or an error occurred. */ abort(callback?: Callback): void { this.push(null); this.destroyed = true; if (this.s.cursor) { this.s.cursor.close(error => { this.emit(GridFSBucketReadStream.CLOSE); callback && callback(error); }); } else { if (!this.s.init) { // If not initialized, fire close event because we will never // get a cursor this.emit(GridFSBucketReadStream.CLOSE); } callback && callback(); } } } function throwIfInitialized(stream: GridFSBucketReadStream): void { if (stream.s.init) { throw new MongoGridFSStreamError('Options cannot be changed after the stream is initialized'); } } function doRead(stream: GridFSBucketReadStream): void { if (stream.destroyed) return; if (!stream.s.cursor) return; if (!stream.s.file) return; stream.s.cursor.next((error, doc) => { if (stream.destroyed) { return; } if (error) { stream.emit(GridFSBucketReadStream.ERROR, error); return; } if (!doc) { stream.push(null); process.nextTick(() => { if (!stream.s.cursor) return; stream.s.cursor.close(error => { if (error) { stream.emit(GridFSBucketReadStream.ERROR, error); return; } stream.emit(GridFSBucketReadStream.CLOSE); }); }); return; } if (!stream.s.file) return; const bytesRemaining = stream.s.file.length - stream.s.bytesRead; const expectedN = stream.s.expected++; const expectedLength = Math.min(stream.s.file.chunkSize, bytesRemaining); if (doc.n > expectedN) { return stream.emit( GridFSBucketReadStream.ERROR, new MongoGridFSChunkError( `ChunkIsMissing: Got unexpected n: ${doc.n}, expected: ${expectedN}` ) ); } if (doc.n < expectedN) { return stream.emit( GridFSBucketReadStream.ERROR, new MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}, expected: ${expectedN}`) ); } let buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer; if (buf.byteLength !== expectedLength) { if (bytesRemaining <= 0) { return stream.emit( GridFSBucketReadStream.ERROR, new MongoGridFSChunkError( `ExtraChunk: Got unexpected n: ${doc.n}, expected file length ${stream.s.file.length} bytes but already read ${stream.s.bytesRead} bytes` ) ); } return stream.emit( GridFSBucketReadStream.ERROR, new MongoGridFSChunkError( `ChunkIsWrongSize: Got unexpected length: ${buf.byteLength}, expected: ${expectedLength}` ) ); } stream.s.bytesRead += buf.byteLength; if (buf.byteLength === 0) { return stream.push(null); } let sliceStart = null; let sliceEnd = null; if (stream.s.bytesToSkip != null) { sliceStart = stream.s.bytesToSkip; stream.s.bytesToSkip = 0; } const atEndOfStream = expectedN === stream.s.expectedEnd - 1; const bytesLeftToRead = stream.s.options.end - stream.s.bytesToSkip; if (atEndOfStream && stream.s.bytesToTrim != null) { sliceEnd = stream.s.file.chunkSize - stream.s.bytesToTrim; } else if (stream.s.options.end && bytesLeftToRead < doc.data.byteLength) { sliceEnd = bytesLeftToRead; } if (sliceStart != null || sliceEnd != null) { buf = buf.slice(sliceStart || 0, sliceEnd || buf.byteLength); } stream.push(buf); return; }); } function init(stream: GridFSBucketReadStream): void { const findOneOptions: FindOptions = {}; if (stream.s.readPreference) { findOneOptions.readPreference = stream.s.readPreference; } if (stream.s.options && stream.s.options.sort) { findOneOptions.sort = stream.s.options.sort; } if (stream.s.options && stream.s.options.skip) { findOneOptions.skip = stream.s.options.skip; } stream.s.files.findOne(stream.s.filter, findOneOptions, (error, doc) => { if (error) { return stream.emit(GridFSBucketReadStream.ERROR, error); } if (!doc) { const identifier = stream.s.filter._id ? stream.s.filter._id.toString() : stream.s.filter.filename; const errmsg = `FileNotFound: file ${identifier} was not found`; // TODO(NODE-3483) const err = new MongoRuntimeError(errmsg); err.code = 'ENOENT'; // TODO: NODE-3338 set property as part of constructor return stream.emit(GridFSBucketReadStream.ERROR, err); } // If document is empty, kill the stream immediately and don't // execute any reads if (doc.length <= 0) { stream.push(null); return; } if (stream.destroyed) { // If user destroys the stream before we have a cursor, wait // until the query is done to say we're 'closed' because we can't // cancel a query. stream.emit(GridFSBucketReadStream.CLOSE); return; } try { stream.s.bytesToSkip = handleStartOption(stream, doc, stream.s.options); } catch (error) { return stream.emit(GridFSBucketReadStream.ERROR, error); } const filter: Document = { files_id: doc._id }; // Currently (MongoDB 3.4.4) skip function does not support the index, // it needs to retrieve all the documents first and then skip them. (CS-25811) // As work around we use $gte on the "n" field. if (stream.s.options && stream.s.options.start != null) { const skip = Math.floor(stream.s.options.start / doc.chunkSize); if (skip > 0) { filter['n'] = { $gte: skip }; } } stream.s.cursor = stream.s.chunks.find(filter).sort({ n: 1 }); if (stream.s.readPreference) { stream.s.cursor.withReadPreference(stream.s.readPreference); } stream.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize); stream.s.file = doc as GridFSFile; try { stream.s.bytesToTrim = handleEndOption(stream, doc, stream.s.cursor, stream.s.options); } catch (error) { return stream.emit(GridFSBucketReadStream.ERROR, error); } stream.emit(GridFSBucketReadStream.FILE, doc); return; }); } function waitForFile(stream: GridFSBucketReadStream, callback: Callback): void { if (stream.s.file) { return callback(); } if (!stream.s.init) { init(stream); stream.s.init = true; } stream.once('file', () => { callback(); }); } function handleStartOption( stream: GridFSBucketReadStream, doc: Document, options: GridFSBucketReadStreamOptions ): number { if (options && options.start != null) { if (options.start > doc.length) { throw new MongoInvalidArgumentError( `Stream start (${options.start}) must not be more than the length of the file (${doc.length})` ); } if (options.start < 0) { throw new MongoInvalidArgumentError(`Stream start (${options.start}) must not be negative`); } if (options.end != null && options.end < options.start) { throw new MongoInvalidArgumentError( `Stream start (${options.start}) must not be greater than stream end (${options.end})` ); } stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) * doc.chunkSize; stream.s.expected = Math.floor(options.start / doc.chunkSize); return options.start - stream.s.bytesRead; } throw new MongoInvalidArgumentError('Start option must be defined'); } function handleEndOption( stream: GridFSBucketReadStream, doc: Document, cursor: FindCursor, options: GridFSBucketReadStreamOptions ) { if (options && options.end != null) { if (options.end > doc.length) { throw new MongoInvalidArgumentError( `Stream end (${options.end}) must not be more than the length of the file (${doc.length})` ); } if (options.start == null || options.start < 0) { throw new MongoInvalidArgumentError(`Stream end (${options.end}) must not be negative`); } const start = options.start != null ? Math.floor(options.start / doc.chunkSize) : 0; cursor.limit(Math.ceil(options.end / doc.chunkSize) - start); stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize); return Math.ceil(options.end / doc.chunkSize) * doc.chunkSize - options.end; } throw new MongoInvalidArgumentError('End option must be defined'); }