Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/ready resource #21

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 15 additions & 23 deletions delivery.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const { Asset } = require('./asset')
const { EventEmitter } = require('events')
const { Source } = require('./source')
const { demux } = require('./demux')
const { Pool } = require('nanoresource-pool')
const Pool = require('ready-resource-map')
const assert = require('assert')
const Batch = require('batch')
const uuid = require('uuid/v4')
Expand All @@ -27,15 +27,15 @@ const DEFAULT_DEMUX_CONCURRENCY = os.cpus().length
* @class
* @extends nanoresource-pool
*/
class Delivery extends Pool {
class Delivery {

/**
* `Delivery` class constructor.
* @param {(Object)} opts
* @param {(String)} opts.id
*/
constructor(opts) {
super(Source, opts)
// super(Source, opts)

if (!opts || 'object' !== typeof opts) {
opts = {}
Expand All @@ -45,29 +45,17 @@ class Delivery extends Pool {

this.date = Date.now()
this.dateIso = new Date(this.date).toISOString()
}

/**
* All assets in the delivery pool, including
* child delivery pools.
* @accessor
* @type {Array<Source>}
*/
get assets() {
return this.sources.filter(s => s instanceof Asset)
this.pool = new Pool()
}

/**
* All sources in the delivery pool, including
* child delivery pools.
* @accessor
* @type {Array<Source>}
*/
get sources() {
return this.query()
return Array.from(this.pool.m.values())
}

/**
* DOES NOT WORK YET WITH READY-RESOURCE-MAP
* MUST CONVERT ASSET TO A READYRESOURCE FIRST
* Creates and adds a new Asset from a URI. Associates it with the Delivery.
* @param {String} uri
* @param {Object} opts
Expand All @@ -89,13 +77,17 @@ class Delivery extends Pool {
* @param {Object} opts
* @return {Source}
*/
source(uri, opts) {
// Do not allow a new resource to be created if one already exists in the
// pool with the given `uri`
async source(uri, opts) {
if (this.sources.some(s => s.uri.includes(uri))) {
return this.sources.filter(s => s.uri.includes(uri))[0]
}
return this.resource(uri, opts)
async function createSource() {
const src = new Source(uri)
await src.ready()
return src
}

return await this.pool.open(uuid(), createSource)
}

/**
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
"once": "^1.4.0",
"pretty-ms": "^5.1.0",
"pump": "^3.0.0",
"ready-resource": "^1.0.0",
"ready-resource-map": "^1.1.1",
"rimraf": "^3.0.0",
"simple-get": "^3.1.0",
"smpte-timecode": "^1.2.3",
Expand Down
123 changes: 38 additions & 85 deletions source.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
const { ffmpeg } = require('./ffmpeg')
const { head } = require('simple-get')
const Resource = require('nanoresource')
const Resource = require('ready-resource')
const through = require('through2')
const assert = require('assert')
const getUri = require('get-uri')
const ready = require('nanoresource-ready')
const debug = require('debug')('little-media-box:source')
const path = require('path')
const once = require('once')
const pump = require('pump')
const uuid = require('uuid/v4')
const url = require('url')
Expand Down Expand Up @@ -162,57 +159,38 @@ class Source extends Resource {
* @protected
* @param {Function} callback
*/
_open(callback) {
async _open() {
if (this.byteLength > 0) {
return process.nextTick(callback, null)
}

// try attached stream
if (this.stream) {
ffmpeg(this.stream).ffprobe((err, info) => {
if (err) { return callback() }
this.byteLength = parseInt(info.format.size)
callback(null)
})
}

const uri = url.parse(this.uri)
if (/https?:/.test(uri.protocol)) {
head(this.uri, (err, res) => {
if (err) { return callback(err) }
this.byteLength = parseInt(res.headers['content-length'])
callback(null)
})
} else {
const pathname = path.resolve(this.cwd, uri.path)
fs.stat(pathname, (err, stats) => {
if (err) { return callback(err) }
this.uri = `file://${pathname}`
this.byteLength = stats.size
callback(null)
})
}
}

/**
* Implements the abstract `_close()` method for `nanoresource`
* Closes the source stream and resets internal state.
* @protected
* @param {Function} callback
*/
_close(callback) {
process.nextTick(callback, null)
}

/**
* Wait for source to be ready (opened) calling `callback()`
* when it is.
* @param {Function}
*/
ready(callback) {
assert('function' === typeof callback,
'Expecting callback to be a function.')
ready(this, callback)
return null
}

return new Promise((resolve, reject) => {
// try attached stream
if (this.stream) {
ffmpeg(this.stream).ffprobe((err, info) => {
if (err) reject(err)
this.byteLength = parseInt(info.format.size)
resolve(null)
})
}

const uri = url.parse(this.uri)
if (/https?:/.test(uri.protocol)) {
head(this.uri, (err, res) => {
if (err) reject(err)
this.byteLength = parseInt(res.headers['content-length'])
resolve(null)
})
} else {
const pathname = path.resolve(this.cwd, uri.path)
fs.stat(pathname, (err, stats) => {
if (err) reject(err)
this.uri = `file://${pathname}`
this.byteLength = stats.size
resolve(null)
})
}
})
}

/**
Expand Down Expand Up @@ -248,49 +226,24 @@ class Source extends Resource {
* @param {?(Object)} opts
* @param {Function} callback
*/
probe(opts, callback) {
if ('function' === typeof opts) {
callback = opts
}

if (!opts || 'object' !== typeof opts) {
opts = {}
}

assert('function' === typeof callback,
'Expecting callback to be a function.')

this.ready((err) => {
if (err) { return callback(err) }


const { uri } = this
async probe(opts) {
await this.ready()
return new Promise((resolve, reject) => {
const stream = this.uri !== null
? this.uri
: this.createReadStream(opts).on('error', callback)
: this.createReadStream(opts).on('error', reject)

this.active()
ffmpeg(stream).ffprobe((err, info) => {
this.inactive()

if (err) reject(err)
if (info && info.format) {
info.format.filename = info.format.filename === 'pipe:0'
? path.basename(this.uri)
: info.format.filename
}

callback(err, info)
resolve(info)
})
})
}

/**
* An alias for `probe()`.
* @param {Function} callback
*/
stat(callback) {
this.probe(callback)
}
}

/**
Expand Down
66 changes: 21 additions & 45 deletions track/properties.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
const constants = require('../constants')
const iso639 = require('../iso-639')
const assert = require('assert')
const mutex = require('mutexify')
const {
TrackPropertiesMissingFormatError,
TrackPropertiesMissingStreamError
Expand Down Expand Up @@ -29,7 +27,6 @@ class TrackProperties extends Map {

this.streamIndex = opts.streamIndex
this.track = track
this.lock = mutex()
}

/**
Expand Down Expand Up @@ -176,55 +173,34 @@ class TrackProperties extends Map {
return this.stream.nb_frames
}

/**
* Resets internal state calling `callback()` after it
* is cleared
* @param {Function} callback
*/
reset(callback) {
if ('function' !== typeof callback) {
callback = noop
}

this.lock((release) => {
this.clear()
process.nextTick(release, callback, null)
})
}

/**
* Updates internal map state by probing the track's
* source for stream and format information. This
* function will select the correct stream based on the track's
* stream index.
* @param {Function} callback
*/
update(callback) {
assert('function' === typeof callback,
'Expecting callback to be a function.')
this.lock((release) => {
this.track.source.probe((err, probe) => {
if (err) { return callback(err) }
const { streamIndex } = this
const { format } = probe
const stream = 1 === probe.streams.length
? probe.streams[0]
: probe.streams.find((s) => s.index === streamIndex)

if (!stream) {
release(callback, new TrackPropertiesMissingStreamError())
} else if (!format) {
release(callback, new TrackPropertiesMissingFormatError())
} else {
// clear state before updating
this.clear()
this.set('format', format)
this.set('stream', stream)
this.streamIndex = stream.index

release(callback, null)
}
})
async update() {
return new Promise(async (resolve, reject) => {
const probe = await this.track.source.probe()
const { streamIndex } = this
const { format } = probe
const stream = 1 === probe.streams.length
? probe.streams[0]
: probe.streams.find((s) => s.index === streamIndex)

if (!stream) {
reject(new TrackPropertiesMissingStreamError())
} else if (!format) {
reject(new TrackPropertiesMissingFormatError())
} else {
// clear state before updating
this.clear()
this.set('format', format)
this.set('stream', stream)
this.streamIndex = stream.index
resolve(null)
}
})
}

Expand Down
Loading