You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In order to be a streaming workflow engine, we need to support the ability to pipe between tasks. This does not add much to the "traditional" pipeline which mostly reads/writes files between tasks, but it can open interesting use cases:
stream of query responses from bionode-ncbi can be piped into pipelines
ease of incorporating node transform streams into pipelines
separate tools (do not need a container with A and B to run A | B for example)
cool things with fork, something like: A | fork(B1, B2) | C (however this will be tricky to implement as we will need to create duplicate streams)
Dump from docs:
If either (input or output) is
not provided, it will be assumed the task is then a streaming task - i.e., it
is a duplex stream with writable and/or readable portions. Consider:
constthroughCapitalize=through(function(chunk,env,next){// through = require('through2') - a helper to create through streams// takes chunk, its encoding, and a callback to notify when complete pushing// push a chunk to the readable portion of this through stream withthis.push(chunk.toString().toUpperCase())// then call next so that next chunk can be handlednext()Youcouldconnect`capitalize`toareadable(`readFile`)andwritable(`writeFile`)file
stream with:
constcapitalize=task({name: 'Capitalize Through Stream'},// Here, input is a readable stream that came from the previous task// Let's return a through stream that takes the input and capitalizes it({ input })=>input.pipe(throughCapitalize))
constreadFile=task({input: '*.lowercase',name: 'Read from *.lowercase'},({ input })=>{constrs=fs.createReadStream(input)// Add file information to stream object so we have it laterrs.inFile=input})constwriteFile=task({output: '*.uppercase',name: 'Write to *.uppercase'},({ input })=>fs.createWriteStream(input.inFile.swapExt('uppercase')))// Can now connect the three:join(readFile,capitalize,writeFile)
Of course, this could be written as one single task. This is somewhat simpler,
but the power of splitting up the read, transform, and write portions of a task
will become apparent once we can provide multiple sets of parameters to the
transform and observe the effect, without having to manually rewire input and
output filenames. As a single task the above would become:
Tied to this issue, we should also be able to pass variables, functions, etc... between tasks and still be able to call them somehow in other downstream tasks.
In order to be a streaming workflow engine, we need to support the ability to
pipe
between tasks. This does not add much to the "traditional" pipeline which mostly reads/writes files between tasks, but it can open interesting use cases:A
andB
to runA | B
for example)fork
, something like:A | fork(B1, B2) | C
(however this will be tricky to implement as we will need to create duplicate streams)Dump from docs:
If either (input or output) is
not provided, it will be assumed the task is then a streaming task - i.e., it
is a duplex stream with writable and/or readable portions. Consider:
Of course, this could be written as one single task. This is somewhat simpler,
but the power of splitting up the read, transform, and write portions of a task
will become apparent once we can provide multiple sets of parameters to the
transform and observe the effect, without having to manually rewire input and
output filenames. As a single task the above would become:
The text was updated successfully, but these errors were encountered: