MicroStreams JS

Single SourceUnlimited Power

Michael Rosata

Lead Developer

Someday, you'll see an awesome quote written here.

About MicroStream JS

(since: Nov. 23 2016)

MicroStream JS is a hybrid Reactive/Functional utility designed to be a small library with huge potential. MicroStream JS will help to build better declarative applications. The streams provided in MicroStream JS are not the same "streams" that you may have used natively in NodeJS. These streams are more like those you would find in reactive programming (RxJS). Where this library differs is that MicroStream JS is minimal, basic, but designed to allow you to write simple implementations of reusable logic for use in complex archetecture.

MicroStream JS isn't trying to replace any libraries or become an end-all solution to your troubles. The mission statement at the moment at MicroStream JS is to give you (the developer) an extremely minimal Stream pattern to use in your programs so that you may harness the power of Streams without forcing you into having it become the over arching theme of your application.

One of the reasons you may have never used streams is because it required such a huge shift in your thinking or app architecture. Hopefully we'll be the util that makes your functional programs easier to reason, maybe you'll fall in love with streams but ditch the MicroStream JS package, maybe you'll carry us around from project to project.. and you could, MicroStream JS is super small, we're building at about 4 - 10KB gzipped currently, in beta 0.2.2 at the time of writing.

If you have used Streams before, then the MicroStream JS Streams might not match exactly what you've used in the past, or they act the same but do so in a different way. The Streams themselve resemble functional Functors and reactive Observables. Our streams are pointed, lazy, cancellable, mappable and fluent. Hopefully you enjoy!



Getting Started


Installation

Build Status

MicroStream JS is about ~4KB - 10KB over the wire gzipped, it comes packaged in all the popular module formats, eg: [UMD (Universal Module Definition)][umd-git-repo]. MicroStream JS should integrate easily into your projects using import, require, define, if your project supports it... we support you!

MicroStream JS installs super simple with yarn or npm.

$ # Install using NPM
$ npm install micro-stream --save

$ # Install Micro-Stream JS using Yarn.
$ yarn add micro-stream --save

If your using Webpack/Browserify/RequireJS, AMD or NodeJS then you should be all set to import the main Stream class in the same way which you'd import any other package.

// CommonJS, ES2015 imports
import microStream from 'micro-stream';

// Browserify/RequireJS/NodeJS
const microStream = require('micro-stream');
// now we have { Stream, StreamObserver, StreamReducer }

// AMD
define(['microStream'], function (microStream) {/*...code..*/});

An Introduction to MicroStream JS (The Q & A)

Hi! Welcome to MicroStream JS! How can I help you ?

How about you walk me through how these stream things work?

Alright, first, from what you've read so far, tell me what you think a stream does?

Well, everytime data is emited from the base of a stream, it is pushed from observer to observer, and they all have the opportunity to work with and change the data before they pass it to the next observer, right? Basically a digital manufacturing line on a converyour belt, correct?

Sure, cool analogy. I'd like to mention that each observer (worker in your manufacturing line), doesn't need to know or care what ops happened before they get an item or after, they only worry about what is handed to them

Ok.. So is there any way we can make a stream for my app that emits a random number every second?

I think we can do that

const myStream = Stream.of(function(hook) {
  setInterval(() => hook( Math.random()), 1000 );
});

What is that "hook" argument?

Good eye! The "hook" argument is a function which when called will emit values to all listeners (observers) on the stream. we now have a stream that emits random numbers!

Could we multiply those randoms by 1000 and then log those values out??

myStream
  .map(num => num * 1000)
  .tap(console.log.bind(console));

You realize that nothing is happening right??

Sure! that's because the stream is lazy. If you want it to do work then you'll have to subscribe to it.

const observer = myStream.subscribe;

WHOA! My console is streaming bro! or... sis? bro-sis?!

You can just call me Mike, just as long as you call me... hehe

Ahhh, I think you went to far with that joke there Mike. Anyways, my console is full of random numbers and I think maybe some of those might cause errors in my app. Why don't we ditch the comedy and maybe teach me some trick I can use to handle errors within a stream? ¯_(ツ)_/¯

Your the boss! Let me introduce you to trap, it takes 2 functions, 1 to do some work and another to handle any potential errors.

observer.trap(function (data) {
    return data < 10 && data >= 100 ?
      data : throw new Error("Double digits offend me!")
  }, function (error) {
    /** do some fixin **/
    return 101;
  });

Nice... So now any double digit number throws an error and instead pushes 101 down the stream? (I can see how that could be useful *cough *cough). Maybe we could somehow filter all numbers over 500, but without the error handler? Is it possible to just have the stream throw away those values rather than change them?

Yea, "filter" you said? We have a filter method that handles exactly what your looking for.

observer.filter(num => num < 500)

Great.. so to be clear. If the stream emits the value 32.5, it logged to console, then since that's greater than 10 and less than 100 it throws an error, but trap catches the error and we fix it to push the value 101, then since 101 is less than 500 it will be pushed downstream?

By George I think you got it!

Yea... moving on. Let's imagine that 5 more numbers are emited from the base stream, they all will get past the trap one way or the other I suppose, but say only 2 more get past the filter for a grand total of 3 numbers pushed past "less than 500 filter", now suppose I said my app needs to get the product of those 3 numbers... and the next 3, then the next 3. I guess I'd finally have to write some crazy spagetti code... correct?

Nah, that's easy peasy

observer.curry((a, b, c) => a * b * c)

Really? Whoa! Ok... I'm liking these streams. How bout this Mike? I want all those volumes (products of 3 numbers) pushed to an array, and everytime that array fills with 5 values we should print them into the HTML element #output. How bout that Mike? Is my app too crazy for your streams now or can we do this too?

Lol... you can be a bit intense... but yea, we can do that. Introducing reduce.

observer.reduce(
    // The first argument to reduce is an accumulator.
    function (accum, vol) { return accum.push(vol) && accum; },
    // Then a starting base-value, in our case an empty array.
    [],
    // 3rd param is an optional completition condition function.
    function (myArray) { return myArray.length >= 5 })
  .tap(vols =>
    document.querySelector('#output').innerHTML = vols.join(',<br> '));

Alright. You won me over, I have a question and a confession. Which do you want first?

I'll take the confession I guess

I don't really have an app that needs to generate random numbers and filter out certain ones or throw errors for double digits, and I don't need to aggreggate those numbers into arrays of volumes and print them into HTML, sorry I lied to you Mike.

Ok, I didn't think you had an app that needed to do that stuff, I thought you just wanted me to teach you MicroStream JS.

It's alright to admit when you've been fooled Mike

Hmmmm, And your question is...? Yea, so what is the difference between Map and Trap, it seems you use them interchangably

Map runs an op over a value and returns a new value to pass down stream. Tap runs an op over a value but doesn't need to worry about returning a value, it's handled internally. Use map to transform data into new data, use tap to do "no-op's", which basically is useful for things like console logs and debugging where you aren't going to change the value so it's better to not have to remember to return it. Otherwise you forget and pass undefined downstream. Anything else I can help you with?

Where are the MicroStream JS API Docs located?

Scroll down about 10 inches or use the menu to the left.

How's about you just give me the link?

Fine...Click here to read the docs.



MicroStream JS News

11-29-2016 - v0.2.2, most of the tests have now been written. The build process has had a drastic overhaul and the NPM package has been slimmed down a lot and now uses babel-runtime so if your project is already using some of the features that MicroStream JS requires then MicroStream JS won't load them. In projects such as those the library weighs in over the wire at 4KB gzipped which is very nice imo. Hopefully the API docs will be done tonight or tomorrow so that I can focus on crafting some more practical examples for the site, maybe even a todo-list app (even though MicroStream JS isn't a framework I think that would be fun).

11-27-2016 - Today I created a step by step "interview" demonstrating the basics of streams. There have been a lot of changes over the past couple of days. The biggest of which was choosing how to handle Stream subscriptions. Originally a stream would map to an op by changing it's own output to be the result of that op on its value. So Stream x mapped to f would be Stream f(x) and if we mapped that to g we would have Stream g(f(x)). This is good, but then we are always changing the value of a Stream, and I'd rather return a new Stream subscribed to the previous Stream. This solution is great unless we want lazy streams, in which case subscribing when mapping results in instant lose of laziness. So the solution was to have the base Stream behave as mentioned above (wrapping old value in new op recursively), then once a stream is subscribed to we are returned a sub-class "StreamObserver" and mapping to an observer will return a new observer that has nothing to do with the previous besides that it is subscribed to it. This should make apps easier to reason about and follow more closely inline with functional programming paradigms.

11-25-2016 - The way which Streams map objects through ops/morphisms has been changed today. Previously I had thought that I it would be better to change the stream as new ops were mapped to it. IE: A stream mapped to 2 functions would be a stream that ran a value through the first function call wrapped in the second. This is no longer the case, I wanted to stick closer to the structure of a functor, each time a new op is mapped to a stream it will now return a new stream. For the end user there isn't much difference here. It actually makes the streams a bit easier to reason about when splitting and cancelling. Before cancelling at the end of a chain would cancel the entire chain (even if it was split midway), and that is no good as it causes side-effects which make the pipeline difficult to reason about.

11-23-2016 - I'm very excited to be developing MicroStream JS, although I'm not sure exactly how far I'll take this project, it does have me up at night thinking about Streams. There are some details that I can't decide upon yet and hopefully if this does go public I can get some help from the community. There will be big desicions to make about how certain aspects of the Streams manage data. For now, there is lots of github.io website work and JavaScript to write!



MicroStream JS API Documentation

Terminology Used in MicroStream JS API Docs.

For API documentation purposes we should create common definitions of some terms which may hold different meanings in various other contexts such as CP or Mathematics. These terms, used in the context of the Micro-Stream JS API shall hold specific meaning. This is simply an attempt to set a well defined vocab, and hopefully the Micro-Stream JS method descriptions will be succinct.

  • Input/s -The value/s assigned to the parameter/s of an op.
  • Output/s -The value/s returned from an op.
  • Op - An "operation", read as "function", likely written by you (the developer), that when applied over some value/s, a computation is performed. A simple example: (n) => n + 1 is an op which receives as input the value of n and will output the value of n + 1.
  • No-op - A "no-op(eration)" is an op, most often used by Micro-Streams internally, which does not act upon it's input/s. We can view the instance method Stream.tap as a no-op because its output is the same as its input: (id) => id an op which receives as input the value of id and returns the value of id is considered a no-op. (Tap does perform work on your behalf, but defined soley in terms of input and output, tap is a no-op)
  • Fluent Chain - Fluent chains are patterns where instance methods return the instance rather than values from the op performed. It is what allows us to write code such as Stream.of().subscribe.map(op).tap(noOp).filter(). The methods return the instance so we may continue to call another method on the instance. Sometimes they will actually return new instances of the same class or a sub-class of Stream in the case of MicroStream JS.


Stream

There are 3 main Stream classes, however, you don't have to explicitly create all 3, they will be created implicitly over the course of your fluent chains. In fact you'll typically only use the main Stream class which has methods which create sub-classes of Streams such as StreamObserver, since StreamObserver is a sub-class of Stream it uses a "superset" of Stream methods.

The important thing to remember about the main Stream class is that it is lazy, which means that it won't run any ops until it is subscribed to. When that happens you end up working with a StreamObserver.

// Lets create a stream
const stream = Stream.of();
  // Lets map an op to the stream, it's lazy so it won't run
  .map(val => val + ' World')
  .tap(logger);

// Watch... Nothing will happen.
stream.push('Hello').push('   ... Hello??? ');

// Now subscribe and the stream will be able to push values
const observer = stream.subscribe;

stream.push('Hello');
// output: "Hello World"



StreamObserver

The StreamObserver is what you get back from subscribing to a stream. At this point when we use methods such as map, trap and tap we get back another StreamObserver which listens to the one we mapped from. Additionally the StreamObserver gives us access to great methods such as reduce, filter, curried and curry.

// This is a Stream.
const stream = Stream.fromEvent('click', '#my-button');
// Now it will be a StreamObserver instance
const observer = stream.subscribe;
// Without StreamObservers, Streams would not flow...
observer.map(value => {/* We're mapping over a StreamObserver */})


StreamReducer

The StreamReducer is the most powerful version of a Stream as far as how much control it offers the developer. This is the Stream version of the Array reduce method. It is with this stream that you can actually implement different ways to push data through a stream or even build up state inside part of a stream (to collect items in an array perhaps). The filter, curry and curried stream methods are all actually just instances of StreamReducer. The same as Stream and StreamObserver, you're advised against using the new keyword, instead use StreamReducer.of to create new Iterator Streams.

// This is a Stream that will stream 1 on each click.
const stream = Stream.fromEvent('click').as(1);
// First we make a StreamObserver.
const observer = stream.subscribe;

// Now.. We can make a StreamReducer using (reduce, filter, curry, curried).
const stReducer = observer.reduce((total, one) => total + one, 0);


.of

The Stream.of static method is the recommended method to use when creating new streams. It should be passes 1 argument or none. If the argument is a function then MicroStream JS will pass that function a hook function which pipes data down-stream. If the argument passed to Stream.of is not a function then the stream will use that as a base value and your program will have to push data through the stream manually.

Stream.of(function(hook) {
  /**
    This could be a database connection, an event, something that will
    happen in the future 1 or more times.
  **/
  mydatabase.on('update', hook);
});

The StreamReducer.of method is similar to the Stream.of method above. The difference is that StreamReducer.of takes 3 arguments. You can rely on methods such as curry, curried, reduce and filter to create StreamReducer instances for you, but if you choose to use .of then you will need to pass an accumulator function which controls how data is collected over loops of the reducer. The baseCase is a function that is passed the current reduced value each iteration. If the base case returns true then the reducer will push the accumulated value down-stream and then prepare the reducer all over again for the next values that get pushed down-stream.

This is good if you want to collect values that come down stream til a certain condition is satisfied. baseValue is the starting value, if your collecting items into an array it should be an empty array, if your summing numbers it should be 0. Note: this baseValue is cloned, so if you pass an object you should not expect it to be able to be reached from outside the iterator/reducer. The reason for this is that when the reducer restarts we need to reset the baseValue as well, the best way to do this is to make copies of 1 original stored copy.

StreamReducer.of(accumulator, recursiveCase, baseValue);


.fromEvent

Stream.fromEvent creates a stream from DOM events. It takes 1 or 2 parameters. The required param is an event type, "click", "keyup", ect. The optional 2nd parameter is a query string CSS selector to target which element in the DOM that you want the stream to listen to this event on. The default second value is "body". The below example listens on an input and console logs out what the user typed. This is a simple implementation that can easily be replace by you or your team, all it does is call document.querySelector(selector).addEvent(type, hook, false) under the sheets.

const inputStream = Stream.fromEvent('keyup', '[name="my-input"])
  .subscribe
  .tap(console.log.bind(console, 'Hey! You Typed: '))


#subscribe

.subscribe Subscribe is a special property on a Stream which sets the first observer on a Stream. Just using the subscribe property on a Stream will return a new StreamObserver which also means the base stream is no longer lazy. It now has someone to push it's final values to and that observer can be branched from as well, but the observer doesn't need to be subscribed too. Only base Streams need subscribing.

const myStream = Stream.of().subscribe;

const branch = myStream.map(x => x + 10).map(x => x + 100);  // 0 + 10 + 100 >> Streams 110

branch.subscribe.map(x => x + 1000)
  .tap(alert); // ALERT(1110)
branch.map(x => x + 100)
  .tap(alert); // ALERT (210)

myStream.push(0); // Explicitly push the value 10 to myStream.


#cancel

Some streams you'd like to have forever, others you might not. The cancel method will prevent a stream from continuing to pass data from the point at which it is cancelled. So we can cancel entire streams, or just branches of observers. In the example below the stream emits values every 1 second and is cancelled after 3.5 seconds. The function will still be running, streams don't touch the functions which hook into them, however it won't pass those values downstream.

let logger = console.log.bind(console, 'value: '),
    i = 0;

const stream = Stream.of(function(hook) {
  i++;
  setInterval(() => hook(i * 10), 1000);
}));

stream.subscribe
  .tap(logger);

setTimeout(stream.cancel.bind(stream), 3500);
// output: 'value: 10', 'value: 20', 'value: 30'

To clarify, if you created a function that listened to a websocket for updates from your database, whenever the stream gets cancelled that won't disconnect the database websocket. This is normal behaviour, this ensures that the stream will be ready for new subscribers later (also the stream doesn't know how to properly disconnect from your database, it doesn't even realize that it's hooked up to your database and that's the bueaty of it all). So keep that in mind while creating Streams and Observers.



#push

Normally, a function that hooks into our base stream manages values being pushed through our stream. However, it is possible to push values manually from outside of the stream using the push method. This is good for testing or for passing control of your stream to another source.

const stream = Stream.of().subscribe.tap(alert);

stream
  .push('Popup!')
  .push('Remember me?')
  .push('1 more!');

// alert('Popup!')... alert('Remember me?')... alert('1 more!')...


#as

Sometimes you will want a stream to resolve to a specific value no matter what. It's times like this that as becomes useful. The as method takes a single argument, the value that your stream should map too. Following up as with methods such as reduce can be very powerful.

const clickStream = Stream.fromEvent('click', '#add-one').as(1);
clickStream.subscribe.reduce((total, num) => total + num, 0)
  .tap(logger('clicks: '));

// press the #add-one button a few times, outputs:
// 'clicks: 1', 'clicks: 2', 'clicks: 3', 'clicks: 4'....


#map

Mapping over a value with an op is probably the most common method you will use with your Streams. The behaviour is depicted below in a simple to understand chain of map functions.

const myStream = Stream.of(null).subscribe
  .map(num => num + 15)  // 35... 55... 25... 1015
  .map(num => num + 25)  // 60... 80... 50... 1040
  .map(num => num * 0);  // 0 ... 0 ... 0 ... 0


myStream.push(20).push(40).push(10).push(1000);


#trap

The trap method works like map, additionally it wraps the op in a try catch block, so it is less performant than mapping. Trap is useful if the op throws an error, then a second op can handle the error and return some safe value which is then passed down the stream. The next op in your chain won't even know that an error was thrown, this is one way in which we can write less error handling and more generic/reusable ops.

const myStream = Stream.of(null).subscribe
  .map((_, i) => [x=>x, y=>y, z=>z, undefined][i % 4]) // Every 4 times will stream undefined.
  .trap(
    fn => {
      return fn(val);                     // will throw error if fn === undefined.
    },
    err => {                         // But error handling can rescue us!
      return 'oops!';
    })
    .tap(console.log.bind(console));  // 10, 20, 30, 'oops!'
myStream.push('a').push('b').push('c').push('d');


#tap

The tap method is a no-op, meaning that it doesn't have any effect on the stream. Tap returns the same value that was passed into it. Sound super useful? Well it is, if you wanted to log values or even break some rules and pass them outside the stream, but you don’t want to effect the stream? You could use map and just make sure to return the original value or use tap and let MicroStream JS write the return statement to that effect for you! tap is your friend.

const logger = (label) => console.log.bind(console, label);

const stream = Stream.fromEvent('keyup', '#party')
  .path(['target', 'value'])

stream.subscribe.
  tap(logger('easy!')).  // this will log keyup values
  map(value => {
    // It's the same as writting this:
    console.log('hard!', value);
    return value;
  }).
  tap(x => alert('tap is better and succient!'));


#async

The async method expects a function that returns a promise. Optionally you may pass it a second argument which acts as an error handler if the promise is rejected. You could handle that yourself as well, that's why it's optional. This method is much the same as trap except it works on promises. Keep in mind that streams are lazy, promises are not lazy, which is why you must pass a function that returns a promise, not the promise itself.

This example is based off the fetch/blob example at MDN, the fetch and blob methods are experimental. Fetch is a guarenteed part of our futures, there are already good polyfills out there, blob I can't speak on much.

const stream = Stream.fromEvent('click')
  .path(['target', 'dataset', 'searchUrl'])
  .subscribe

stream.filter()
  .async((url) => fetch(url))
  .async((response) => response.blob())
  .map((myBlob) => {
    var objectURL = URL.createObjectURL(myBlob);
    myImage.src = objectURL;
  }))

If you want to send the value from a promise the you are waiting to resolve already then you should pass it to Stream.of instead like so: Stream.of((hook) => promiseWeHaveAlready.then(hook));



#reduce

The reduce method works much the same as the native Array.reduce method. It takes 2 arguments, the first is a reduction function (fold op), the second is the base (starting) value. The third and optional argument is a "base case" function which is applied over the accumulated "reduction total" after each value is passed into the accumulator, if it evaluates to true then the value is pushed down-stream and the entire reduce begins again. If you don't pass the third param then a value is passed down-stream after the "fold op" everytime but the reduction doesn't restart.

// CASE 1: 2 arguments, (so this pushes down stream each iteration)
const stream1 = Stream.of().subscribe;

stream1
  .reduce((a, b) => a + b, 0)

stream2
  .push(1).push(2).push(3)
  .push(4).push(5).push(6)
  .push(7).push(8).push(9)
//  1.. 3.. 6.. 10.. 15.. 21.. 28.. 36.. 45
// CASE 2: 3 arguments, (pushes down stream once when the base case is true then restarts).
const stream2 = Stream.of().subscribe;

stream2
  .reduce((a, b) => a + b, 0, (val) => val % 3 === 0)

stream2
  .push(1).push(2).push(3)
  .push(4).push(5).push(6)
  .push(7).push(8).push(9)
//  6.. 15.. 24..


#filter

The filter method works much the same as the native Array.filter method, except it filters the data flowing down the stream. Filter is a "no-op" in that it shouldn't effect the data in the stream. If the filter op returns false then that data never sees another op. If the filter op returns true, the data continues on down the stream.

const stream = Stream.of(null).subscribe;
stream
  .map((n) => n + 10)
  .filter((n) =>  n > 20).tap(logger);

stream
  .push(5).push(20).push(15).push(10);
// 30.. 25..


#path

The path method is a helper utility which will resolve a property value from an object as it passes through the stream. Using path is safe, meaning that it won't throw an error if the property doesn't exist or even if the object doesn't exist. The path method is not a filter, if it can't resolve the property value then it will simply pass undefined down the stream. You could easily couple it with filter

const stream = Stream.of(null).path(['a', 'b', 'c']);
stream.subscribe.tap(logger);
stream.push({a:{b:{c: 100}})
      .push({a: null})
      .push({a:{b:c:{d: 10}}})
      .push(null)

//  100.. undefined.. {d: 10}.. undefined..


#curried

The curried method accepts a curried function as it's only param. The curried function is added to the stream similar to using map, except that the curried function doesn't push a value downstream until it's own params are filled. So if the curried function has 3 params, then it would push a value downstream for every 3 values pushed to itself. If your function is not already curried then use the curry method.

const { curry } = _; // _ is ramda, lodash or any utility library with `curry`.
const logger = (label) => console.log.bind(console, label);

const volume = curry(function(l, w, h) {
  return l * w * h;
});

const stream = Stream.of().subscribe;

stream
  .curried(volume)
  .tap(logger('volume:'));

stream
  .push(10)
  .push(20)
  .push(10);

// 'volume: 2000'


#curry

The curry method is the same as curried except that it takes a regular function as it's input. So if your function is not already curried and you want it to act like a curried function in the stream, then pass it into this method and MicroStream JS will handle the currying for you. Save a step!

const observer = Stream.of().subscribe;

observer.curry((a, b, c) => {
  const value = a + b + c;
  console.log(`\
    Beep boop... ${a} + ${b} + ${c} = ${value}`);
  return value;
});

observer
  .push(100)
  .push(300)
  .push(10);
//  'Beep boop... 100 + 300 + 10 = 410'

observer.push(99).push(' das red ').push('balloon');
//  "Beep boop... 99 + ' das red ' + 'balloon' = 99 das red balloon"


#inspect

// TODO: Inspect isn't complete yet. In order for MicroStream to be successful and to grow in a meaningful way, we need meaningful ways to inspect the values of our streams. The plan at the moment is to implement an inspect method (currently there but useless), and to create an optional add on that will ship with MicroStream JS which will implement formatted console logging for intelligent inspections. What this means is that you will be able to console.log a stream and only get the important information without all the other stuff piled on top of it. Additionally there could be room for a state machine that allows fastforward and rewind of state. Not sure, see what the future brings!

 



Thanks, Enjoy!

I hope you enjoy the library, if you'd like to contribute please visit the Github MicroStream JS repo.

 

 

  Copyright 2016, Michael Rosata, License: MIT