concatenate.mjs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. /*
  2. Copyright 2018 Google LLC
  3. Use of this source code is governed by an MIT-style
  4. license that can be found in the LICENSE file or at
  5. https://opensource.org/licenses/MIT.
  6. */
  7. import {logger} from 'workbox-core/_private/logger.mjs';
  8. import {assert} from 'workbox-core/_private/assert.mjs';
  9. import './_version.mjs';
  10. /**
  11. * Takes either a Response, a ReadableStream, or a
  12. * [BodyInit](https://fetch.spec.whatwg.org/#bodyinit) and returns the
  13. * ReadableStreamReader object associated with it.
  14. *
  15. * @param {workbox.streams.StreamSource} source
  16. * @return {ReadableStreamReader}
  17. * @private
  18. */
  19. function _getReaderFromSource(source) {
  20. if (source.body && source.body.getReader) {
  21. return source.body.getReader();
  22. }
  23. if (source.getReader) {
  24. return source.getReader();
  25. }
  26. // TODO: This should be possible to do by constructing a ReadableStream, but
  27. // I can't get it to work. As a hack, construct a new Response, and use the
  28. // reader associated with its body.
  29. return new Response(source).body.getReader();
  30. }
  31. /**
  32. * Takes multiple source Promises, each of which could resolve to a Response, a
  33. * ReadableStream, or a [BodyInit](https://fetch.spec.whatwg.org/#bodyinit).
  34. *
  35. * Returns an object exposing a ReadableStream with each individual stream's
  36. * data returned in sequence, along with a Promise which signals when the
  37. * stream is finished (useful for passing to a FetchEvent's waitUntil()).
  38. *
  39. * @param {Array<Promise<workbox.streams.StreamSource>>} sourcePromises
  40. * @return {Object<{done: Promise, stream: ReadableStream}>}
  41. *
  42. * @memberof workbox.streams
  43. */
  44. function concatenate(sourcePromises) {
  45. if (process.env.NODE_ENV !== 'production') {
  46. assert.isArray(sourcePromises, {
  47. moduleName: 'workbox-streams',
  48. funcName: 'concatenate',
  49. paramName: 'sourcePromises',
  50. });
  51. }
  52. const readerPromises = sourcePromises.map((sourcePromise) => {
  53. return Promise.resolve(sourcePromise).then((source) => {
  54. return _getReaderFromSource(source);
  55. });
  56. });
  57. let fullyStreamedResolve;
  58. let fullyStreamedReject;
  59. const done = new Promise((resolve, reject) => {
  60. fullyStreamedResolve = resolve;
  61. fullyStreamedReject = reject;
  62. });
  63. let i = 0;
  64. const logMessages = [];
  65. const stream = new ReadableStream({
  66. pull(controller) {
  67. return readerPromises[i]
  68. .then((reader) => reader.read())
  69. .then((result) => {
  70. if (result.done) {
  71. if (process.env.NODE_ENV !== 'production') {
  72. logMessages.push(['Reached the end of source:',
  73. sourcePromises[i]]);
  74. }
  75. i++;
  76. if (i >= readerPromises.length) {
  77. // Log all the messages in the group at once in a single group.
  78. if (process.env.NODE_ENV !== 'production') {
  79. logger.groupCollapsed(
  80. `Concatenating ${readerPromises.length} sources.`);
  81. for (const message of logMessages) {
  82. if (Array.isArray(message)) {
  83. logger.log(...message);
  84. } else {
  85. logger.log(message);
  86. }
  87. }
  88. logger.log('Finished reading all sources.');
  89. logger.groupEnd();
  90. }
  91. controller.close();
  92. fullyStreamedResolve();
  93. return;
  94. }
  95. return this.pull(controller);
  96. } else {
  97. controller.enqueue(result.value);
  98. }
  99. }).catch((error) => {
  100. if (process.env.NODE_ENV !== 'production') {
  101. logger.error('An error occurred:', error);
  102. }
  103. fullyStreamedReject(error);
  104. throw error;
  105. });
  106. },
  107. cancel() {
  108. if (process.env.NODE_ENV !== 'production') {
  109. logger.warn('The ReadableStream was cancelled.');
  110. }
  111. fullyStreamedResolve();
  112. },
  113. });
  114. return {done, stream};
  115. }
  116. export {concatenate};