Skip to content

Commit

Permalink
feat(map,filter): additional arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
tpluscode committed Sep 12, 2024
1 parent ad5f176 commit 41c600d
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .changeset/selfish-cougars-carry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"barnard59-base": minor
---

Adds a variadic parameter to `map` and` filter` steps, which will be forwarded to their respecitve callback functions called for each chunk
11 changes: 7 additions & 4 deletions packages/base/filter.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import { obj } from 'through2'

/**
* @typedef {(this: import('barnard59-core').Context, chunk: T, encoding: string) => boolean | Promise<boolean>} Filter<T>
* @typedef {(this: import('barnard59-core').Context, chunk: T, encoding: string, ...args: A) => boolean | Promise<boolean>} Filter<T, A>
* @template T
* @template {Array<unknown>} A
*/

/**
* @template T
* @template {Array<unknown>} A
* @this {import('barnard59-core').Context}
* @param {Filter<T>} func
* @param {Filter<T, A>} func
* @param {A} args
* @return {import('stream').Transform}
*/
function filter(func) {
function filter(func, ...args) {
return obj((chunk, encoding, callback) => {
Promise.resolve().then(() => {
return func.call(this, chunk, encoding)
return func.call(this, chunk, encoding, ...args)
}).then(result => {
if (result) {
return callback(null, chunk)
Expand Down
18 changes: 11 additions & 7 deletions packages/base/map.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
import transform from 'parallel-transform'

/**
* @typedef {(this: import('barnard59-core').Context, chunk: From) => Promise<To> | To} MapCallback
* @typedef {(this: import('barnard59-core').Context, chunk: From, ...args: Args) => Promise<To> | To} MapCallback
* @template From, To
* @template {Array<unknown>} Args
*/

/**
* @typedef {{
* map: MapCallback<From, To>
* map: MapCallback<From, To, Args>
* concurrency?: number
* ordered?: boolean
* objectMode?: boolean
* }|MapCallback<From, To>} MapOptions
* }|MapCallback<From, To, Args>} MapOptions
* @template From, To
* @template {Array<unknown>} Args
*/

/**
* Processes chunks with a transform function
*
* @this {import('barnard59-core').Context}
* @param {MapOptions<From, To>} options Transform function or complex options
* @param {MapOptions<From, To, Args>} options Transform function or complex options
* @param {Args} args Additional arguments to pass to the transform function
* @return {import('stream').Transform}
* @template From, To
* @template {Array<unknown>} Args
*/
export default function map(options) {
export default function map(options, ...args) {
/**
* @type {MapCallback<*, *>}
* @type {MapCallback<*, *, Args>}
*/
let func
let concurrency = 1
Expand All @@ -42,7 +46,7 @@ export default function map(options) {

return transform(concurrency, { ordered, objectMode }, (data, callback) => {
Promise.resolve().then(() => {
return func.call(this, data)
return func.call(this, data, ...args)
}).then(result => {
callback(null, result)
}).catch(callback)
Expand Down
24 changes: 24 additions & 0 deletions packages/base/test/filter.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import filter from '../filter.js'

describe('filter', () => {
it('should pass pipeline context to callback function', async () => {
// given
const input = new Readable({
objectMode: true,
read: () => {
Expand All @@ -17,11 +18,34 @@ describe('filter', () => {
variable: new Map().set('condition', 'a'),
}

// when
const outStream = input.pipe(filter.call(context, function (chunk) {
return this.variable.get('condition') === chunk
}))
const output = await array(outStream)

// then
deepStrictEqual(output, ['a'])
})

it('should pass additional arguments to callback function', async () => {
// given
const input = new Readable({
objectMode: true,
read: () => {
input.push(1)
input.push(2)
input.push(3)
input.push(null)
},
})

// when
const greaterThan = (chunk, _, minValue) => chunk > minValue
const outStream = input.pipe(filter.call(context, greaterThan, 2))
const output = await array(outStream)

// then
deepStrictEqual(output, [3])
})
})
34 changes: 34 additions & 0 deletions packages/base/test/map.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,40 @@ describe('map', () => {
deepStrictEqual(output, ['foo_a', 'foo_b'])
})

it('should pass optional arguments to mapper function', async () => {
const input = new Readable({
read: () => {
input.push('a')
input.push('b')
input.push(null)
},
})

const prependPrefix = (chunk, prefix) => prefix + chunk
const outStream = input.pipe(map.call(context, prependPrefix, 'foo_'))
const output = await array(outStream)

deepStrictEqual(output, ['foo_a', 'foo_b'])
})

it('should pass optional arguments to mapper function when used with options object', async () => {
const input = new Readable({
read: () => {
input.push('a')
input.push('b')
input.push(null)
},
})

const prependPrefix = (chunk, prefix) => prefix + chunk
const outStream = input.pipe(map.call(context, {
map: prependPrefix,
}, 'foo_'))
const output = await array(outStream)

deepStrictEqual(output, ['foo_a', 'foo_b'])
})

it('accepts a function as parameter', async () => {
// given
const transform = letter => letter.toUpperCase()
Expand Down

0 comments on commit 41c600d

Please sign in to comment.