Skip to content

Commit

Permalink
Minor [ci fast]
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso committed Jan 15, 2025
1 parent 14484a5 commit ff74318
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,16 @@ class BufferOp {
final listener = new DataflowEventAdapter() {

@Override
Object controlMessageArrived(final DataflowProcessor processor, final DataflowReadChannel<Object> channel, final int index, final Object message) {
Object controlMessageArrived(final DataflowProcessor dp, final DataflowReadChannel<Object> channel, final int index, final Object message) {
if( message instanceof PoisonPill && remainder && buffer.size() ) {
Op.bind(processor,target, buffer)
Op.bind(dp, target, buffer)
}
return message
}

@Override
void afterStop(DataflowProcessor processor) {
Op.bind(processor, target, Channel.STOP)
void afterStop(DataflowProcessor dp) {
Op.bind(dp, target, Channel.STOP)
}

@Override
Expand All @@ -196,17 +196,17 @@ class BufferOp {
isOpen = true
buffer << it
}
final proc = getDelegate() as DataflowProcessor
final dp = getDelegate() as DataflowProcessor
if( closeCriteria.call(it) ) {
Op.bind(proc, target, buffer)
Op.bind(dp, target, buffer)
buffer = []
// when a *startingCriteria* is defined, close the open frame flag
isOpen = (startingCriteria == null)
}
if( stopOnFirst ) {
if( remainder && buffer )
Op.bind(proc, target, buffer)
proc.terminate()
Op.bind(dp, target, buffer)
dp.terminate()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class CombineOp {
}

opts.onComplete = { DataflowProcessor dp ->
if( stopCount.decrementAndGet()==0) {
if( stopCount.decrementAndGet()==0 ) {
Op.bind(dp, target, Channel.STOP)
}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ class DistinctOp {

def previous = null
final code = {
final proc = getDelegate() as DataflowProcessor
final dp = getDelegate() as DataflowProcessor
final key = comparator.call(it)
if( key != previous ) {
previous = key
Op.bind(proc, target, it)
Op.bind(dp, target, it)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ class FilterOp {
final result = criteria instanceof Closure<Boolean>
? DefaultTypeTransformation.castToBoolean(criteria.call(it))
: discriminator.invoke(criteria, (Object)it)
final proc = getDelegate() as DataflowProcessor
final dp = getDelegate() as DataflowProcessor
if( result ) {
Op.bind(proc, target, it)
Op.bind(dp, target, it)
}
if( stopOnFirst ) {
Op.bind(proc, target, Channel.STOP)
Op.bind(dp, target, Channel.STOP)
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ class FirstOp {
}

final code = {
final proc = getDelegate() as DataflowProcessor
final dp = getDelegate() as DataflowProcessor
final accept = discriminator.invoke(criteria, it)
if( accept )
Op.bind(proc, target, it)
Op.bind(dp, target, it)
if( accept || stopOnFirst )
proc.terminate()
dp.terminate()
}

new Op()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,31 +65,31 @@ class FlatMapOp {
.withListener(stopErrorListener(source,target))
.withCode { Object item ->
final result = mapper != null ? mapper.call(item) : item
final proc = getDelegate() as DataflowProcessor
final dp = getDelegate() as DataflowProcessor

switch( result ) {
case Collection:
result.each { it -> Op.bind(proc, target,it) }
result.each { it -> Op.bind(dp, target,it) }
break

case (Object[]):
result.each { it -> Op.bind(proc, target,it) }
result.each { it -> Op.bind(dp, target,it) }
break

case Map:
result.each { it -> Op.bind(proc, target,it) }
result.each { it -> Op.bind(dp, target,it) }
break

case Map.Entry:
Op.bind(proc, target, (result as Map.Entry).key )
Op.bind(proc, target, (result as Map.Entry).value )
Op.bind(dp, target, (result as Map.Entry).key )
Op.bind(dp, target, (result as Map.Entry).value )
break

case Channel.VOID:
break

default:
Op.bind(proc, target, result)
Op.bind(dp, target, result)
}
}
.apply()
Expand Down
10 changes: 5 additions & 5 deletions modules/nextflow/src/main/groovy/nextflow/extension/op/Op.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ class Op {
obj instanceof Tracker.Msg ? obj : Tracker.Msg.of(obj)
}

static void bind(DataflowProcessor operator, DataflowWriteChannel channel, Object msg) {
static void bind(DataflowProcessor dp, DataflowWriteChannel channel, Object msg) {
try {
if( msg instanceof PoisonPill ) {
channel.bind(msg)
allContexts.remove(operator)
allContexts.remove(dp)
}
else {
final ctx = allContexts.get(operator)
final ctx = allContexts.get(dp)
if( !ctx )
throw new IllegalStateException("Cannot find any context for operator=$operator")
throw new IllegalStateException("Cannot find any context for operator=$dp")
final run = ctx.getOperatorRun()
Prov.getTracker().bindOutput(run, channel, msg)
}
Expand All @@ -81,7 +81,7 @@ class Op {
}
}

static bind(OperatorRun run, DataflowWriteChannel channel, Object msg) {
static void bind(OperatorRun run, DataflowWriteChannel channel, Object msg) {
Prov.getTracker().bindOutput(run, channel, msg)
}

Expand Down

0 comments on commit ff74318

Please sign in to comment.