src/demux/transmuxer-interface.ts
import * as work from 'webworkify-webpack';
import { Events } from '../events';
import Transmuxer, { TransmuxConfig, TransmuxState, isPromise } from '../demux/transmuxer';
import { logger } from '../utils/logger';
import { ErrorTypes, ErrorDetails } from '../errors';
import { getMediaSource } from '../utils/mediasource-helper';
import { EventEmitter } from 'eventemitter3';
import Fragment, { Part } from '../loader/fragment';
import { ChunkMetadata, TransmuxerResult } from '../types/transmuxer';
import type Hls from '../hls';
import type { HlsEventEmitter } from '../events';
import type { PlaylistLevelType } from '../types/loader';
const MediaSource = getMediaSource() || { isTypeSupported: () => false };
export default class TransmuxerInterface {
private hls: Hls;
private id: PlaylistLevelType;
private observer: HlsEventEmitter;
private frag: Fragment | null = null;
private part: Part | null = null;
private worker: any;
private onwmsg?: Function;
private transmuxer: Transmuxer | null = null;
private onTransmuxComplete: (transmuxResult: TransmuxerResult) => void;
private onFlush: (chunkMeta: ChunkMetadata) => void;
constructor (hls: Hls, id: PlaylistLevelType, onTransmuxComplete: (transmuxResult: TransmuxerResult) => void, onFlush: (chunkMeta: ChunkMetadata) => void) {
this.hls = hls;
this.id = id;
this.onTransmuxComplete = onTransmuxComplete;
this.onFlush = onFlush;
const config = hls.config;
const forwardMessage = (ev, data) => {
data = data || {};
data.frag = this.frag;
data.id = this.id;
hls.trigger(ev, data);
};
// forward events to main thread
this.observer = new EventEmitter() as HlsEventEmitter;
this.observer.on(Events.FRAG_DECRYPTED, forwardMessage);
this.observer.on(Events.ERROR, forwardMessage);
const typeSupported = {
mp4: MediaSource.isTypeSupported('video/mp4'),
mpeg: MediaSource.isTypeSupported('audio/mpeg'),
mp3: MediaSource.isTypeSupported('audio/mp4; codecs="mp3"')
};
// navigator.vendor is not always available in Web Worker
// refer to https://developer.mozilla.org/en-US/docs/Web/API/WorkerGlobalScope/navigator
const vendor = navigator.vendor;
if (config.enableWorker && (typeof (Worker) !== 'undefined')) {
logger.log('demuxing in webworker');
let worker;
try {
worker = this.worker = work(require.resolve('../demux/transmuxer-worker.ts'));
this.onwmsg = this.onWorkerMessage.bind(this);
worker.addEventListener('message', this.onwmsg);
worker.onerror = (event) => {
hls.trigger(Events.ERROR, { type: ErrorTypes.OTHER_ERROR, details: ErrorDetails.INTERNAL_EXCEPTION, fatal: true, event: 'demuxerWorker', err: { message: event.message + ' (' + event.filename + ':' + event.lineno + ')' } });
};
worker.postMessage({ cmd: 'init', typeSupported: typeSupported, vendor: vendor, id: id, config: JSON.stringify(config) });
} catch (err) {
logger.warn('Error in worker:', err);
logger.error('Error while initializing DemuxerWorker, fallback to inline');
if (worker) {
// revoke the Object URL that was used to create transmuxer worker, so as not to leak it
self.URL.revokeObjectURL(worker.objectURL);
}
this.transmuxer = new Transmuxer(this.observer, typeSupported, config, vendor);
this.worker = null;
}
} else {
this.transmuxer = new Transmuxer(this.observer, typeSupported, config, vendor);
}
}
destroy (): void {
const w = this.worker;
if (w) {
w.removeEventListener('message', this.onwmsg);
w.terminate();
this.worker = null;
} else {
const transmuxer = this.transmuxer;
if (transmuxer) {
transmuxer.destroy();
this.transmuxer = null;
}
}
const observer = this.observer;
if (observer) {
observer.removeAllListeners();
}
// @ts-ignore
this.observer = null;
}
push (data: ArrayBuffer, initSegmentData: Uint8Array, audioCodec: string | undefined, videoCodec: string | undefined, frag: Fragment, part: Part | null, duration: number, accurateTimeOffset: boolean, chunkMeta: ChunkMetadata, defaultInitPTS?: number): void {
chunkMeta.transmuxing.start = self.performance.now();
const { transmuxer, worker } = this;
const timeOffset = part ? part.start : frag.start;
const decryptdata = frag.decryptdata;
const lastFrag = this.frag;
const discontinuity = !(lastFrag && (frag.cc === lastFrag.cc));
const trackSwitch = !(lastFrag && (chunkMeta.level === lastFrag.level));
const snDiff = lastFrag ? chunkMeta.sn - (lastFrag.sn as number) : -1;
const partDiff = this.part ? (chunkMeta.part - this.part.index) : 1;
const contiguous = !trackSwitch && (snDiff === 1 || (snDiff === 0 && partDiff === 1));
const now = self.performance.now();
if (trackSwitch || snDiff || frag.stats.parsing.start === 0) {
frag.stats.parsing.start = now;
}
if (part && (partDiff || !contiguous)) {
part.stats.parsing.start = now;
}
if (!contiguous || discontinuity) {
logger.log(`[transmuxer-interface, ${frag.type}]: Starting new transmux session for sn: ${chunkMeta.sn} p: ${chunkMeta.part} level: ${chunkMeta.level} id: ${chunkMeta.id}
discontinuity: ${discontinuity}
trackSwitch: ${trackSwitch}
contiguous: ${contiguous}
accurateTimeOffset: ${accurateTimeOffset}
timeOffset: ${timeOffset}`);
const config = new TransmuxConfig(audioCodec, videoCodec, new Uint8Array(initSegmentData), duration, defaultInitPTS);
const state = new TransmuxState(discontinuity, contiguous, accurateTimeOffset, trackSwitch, timeOffset);
this.configureTransmuxer(config, state);
}
this.frag = frag;
this.part = part;
// Frags with sn of 'initSegment' are not transmuxed
if (worker) {
// post fragment payload as transferable objects for ArrayBuffer (no copy)
worker.postMessage({
cmd: 'demux',
data,
decryptdata,
chunkMeta
}, data instanceof ArrayBuffer ? [data] : []);
} else if (transmuxer) {
const transmuxResult = transmuxer.push(data, decryptdata, chunkMeta);
if (isPromise(transmuxResult)) {
transmuxResult.then(data => {
this.handleTransmuxComplete(data);
});
} else {
this.handleTransmuxComplete(transmuxResult as TransmuxerResult);
}
}
}
flush (chunkMeta: ChunkMetadata) {
chunkMeta.transmuxing.start = self.performance.now();
const { transmuxer, worker } = this;
if (worker) {
worker.postMessage({
cmd: 'flush',
chunkMeta
});
} else if (transmuxer) {
const transmuxResult = transmuxer.flush(chunkMeta);
if (isPromise(transmuxResult)) {
transmuxResult.then(data => {
this.handleFlushResult(data, chunkMeta);
});
} else {
this.handleFlushResult(transmuxResult as Array<TransmuxerResult>, chunkMeta);
}
}
}
private handleFlushResult (results: Array<TransmuxerResult>, chunkMeta: ChunkMetadata) {
results.forEach(result => {
this.handleTransmuxComplete(result);
});
this.onFlush(chunkMeta);
}
private onWorkerMessage (ev: any): void {
const data = ev.data;
const hls = this.hls;
switch (data.event) {
case 'init': {
// revoke the Object URL that was used to create transmuxer worker, so as not to leak it
self.URL.revokeObjectURL(this.worker.objectURL);
break;
}
case 'transmuxComplete': {
this.handleTransmuxComplete(data.data);
break;
}
case 'flush': {
this.onFlush(data.data);
break;
}
/* falls through */
default: {
data.data = data.data || {};
data.data.frag = this.frag;
data.data.id = this.id;
hls.trigger(data.event, data.data);
break;
}
}
}
private configureTransmuxer (config: TransmuxConfig, state: TransmuxState) {
const { worker, transmuxer } = this;
if (worker) {
worker.postMessage({
cmd: 'configure',
config,
state
});
} else if (transmuxer) {
transmuxer.configure(config, state);
}
}
private handleTransmuxComplete (result: TransmuxerResult) {
result.chunkMeta.transmuxing.end = self.performance.now();
this.onTransmuxComplete(result);
}
}