123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- /*
- Copyright 2018 Google LLC
- Use of this source code is governed by an MIT-style
- license that can be found in the LICENSE file or at
- https://opensource.org/licenses/MIT.
- */
- import {logger} from 'workbox-core/_private/logger.mjs';
- import {assert} from 'workbox-core/_private/assert.mjs';
- import './_version.mjs';
- /**
- * Takes either a Response, a ReadableStream, or a
- * [BodyInit](https://fetch.spec.whatwg.org/#bodyinit) and returns the
- * ReadableStreamReader object associated with it.
- *
- * @param {workbox.streams.StreamSource} source
- * @return {ReadableStreamReader}
- * @private
- */
- function _getReaderFromSource(source) {
- if (source.body && source.body.getReader) {
- return source.body.getReader();
- }
- if (source.getReader) {
- return source.getReader();
- }
- // TODO: This should be possible to do by constructing a ReadableStream, but
- // I can't get it to work. As a hack, construct a new Response, and use the
- // reader associated with its body.
- return new Response(source).body.getReader();
- }
- /**
- * Takes multiple source Promises, each of which could resolve to a Response, a
- * ReadableStream, or a [BodyInit](https://fetch.spec.whatwg.org/#bodyinit).
- *
- * Returns an object exposing a ReadableStream with each individual stream's
- * data returned in sequence, along with a Promise which signals when the
- * stream is finished (useful for passing to a FetchEvent's waitUntil()).
- *
- * @param {Array<Promise<workbox.streams.StreamSource>>} sourcePromises
- * @return {Object<{done: Promise, stream: ReadableStream}>}
- *
- * @memberof workbox.streams
- */
- function concatenate(sourcePromises) {
- if (process.env.NODE_ENV !== 'production') {
- assert.isArray(sourcePromises, {
- moduleName: 'workbox-streams',
- funcName: 'concatenate',
- paramName: 'sourcePromises',
- });
- }
- const readerPromises = sourcePromises.map((sourcePromise) => {
- return Promise.resolve(sourcePromise).then((source) => {
- return _getReaderFromSource(source);
- });
- });
- let fullyStreamedResolve;
- let fullyStreamedReject;
- const done = new Promise((resolve, reject) => {
- fullyStreamedResolve = resolve;
- fullyStreamedReject = reject;
- });
- let i = 0;
- const logMessages = [];
- const stream = new ReadableStream({
- pull(controller) {
- return readerPromises[i]
- .then((reader) => reader.read())
- .then((result) => {
- if (result.done) {
- if (process.env.NODE_ENV !== 'production') {
- logMessages.push(['Reached the end of source:',
- sourcePromises[i]]);
- }
- i++;
- if (i >= readerPromises.length) {
- // Log all the messages in the group at once in a single group.
- if (process.env.NODE_ENV !== 'production') {
- logger.groupCollapsed(
- `Concatenating ${readerPromises.length} sources.`);
- for (const message of logMessages) {
- if (Array.isArray(message)) {
- logger.log(...message);
- } else {
- logger.log(message);
- }
- }
- logger.log('Finished reading all sources.');
- logger.groupEnd();
- }
- controller.close();
- fullyStreamedResolve();
- return;
- }
- return this.pull(controller);
- } else {
- controller.enqueue(result.value);
- }
- }).catch((error) => {
- if (process.env.NODE_ENV !== 'production') {
- logger.error('An error occurred:', error);
- }
- fullyStreamedReject(error);
- throw error;
- });
- },
- cancel() {
- if (process.env.NODE_ENV !== 'production') {
- logger.warn('The ReadableStream was cancelled.');
- }
- fullyStreamedResolve();
- },
- });
- return {done, stream};
- }
- export {concatenate};
|