Skip to content

Instantly share code, notes, and snippets.

@fabioyamate
Last active December 23, 2015 05:49
Show Gist options
  • Save fabioyamate/6589269 to your computer and use it in GitHub Desktop.
Save fabioyamate/6589269 to your computer and use it in GitHub Desktop.
var quotes = [{ symbol: 'MSFT', price: 27.01 },
{ symbol: 'INTC', price: 21.75 },
{ symbol: 'MSFT', price: 27.96 },
{ symbol: 'MSFT', price: 31.21 },
{ symbol: 'INTC', price: 22.54 },
{ symbol: 'INTC', price: 20.98 },
{ symbol: 'MSFT', price: 30.73 }]
function priceIncrease(w) {
return w[1].price / w[0].price - 1;
}
var stockStream = Bacon.fromArray(quotes).delay(1000);
var quoteFrom = _.curry(function(symbol, quote) { return quote.symbol === symbol; });
function selectMany(stockStream, key) {
return stockStream.filter(quoteFrom(key))
.slidingWindow(2, 1).skip(1) // equal to Buffer(2, 1), which drops the incomplete ones
.filter(function(w) { return priceIncrease(w) > 0.1; })
.map(function(w) {
return { symbol: key, increase: w };
})
.changes(); // need to convert property to stream
}
// bacon.js does not support groupBy/split
var intc = selectMany(stockStream, 'INTC').changes();
var msft = selectMany(stockStream, 'MSFT').changes();
msft.merge(intc).log();
quotes = [{ symbol: 'MSFT', price: 27.01 },
{ symbol: 'INTC', price: 21.75 },
{ symbol: 'MSFT', price: 27.96 },
{ symbol: 'MSFT', price: 31.21 },
{ symbol: 'INTC', price: 22.54 },
{ symbol: 'INTC', price: 20.98 },
{ symbol: 'MSFT', price: 30.73 }]
def price_increase(quotes)
(quotes[1] / quotes[0] ) - 1
end
class Buffer
include Enumerable
def initialize(collection, key, count, skip = nil)
@collection, @key = collection, key
@count, @skip = count, skip || count
@buffer = []
end
def each
@collection.map do |o|
if @buffer.size < @count
@buffer << o[@key]
else
@buffer.shift(@skip)
@buffer << o[@key]
yield @buffer.dup
end
end
end
end
quotes.group_by { |quote| quote[:symbol] } # aggregate
.flat_map do |symbol, quotes| # map and merge
Buffer.new(quotes, :price, 2, 1) # buffer
.select { |w| price_increase(w) > 0.1 }
.map { |increase| { symbol: symbol, increase: increase } }
end
.each { |quote| p quote }
# => [{ symbol: 'MSFT', increase: [27.96, 31.21] }]
var quotes = [{ symbol: 'MSFT', price: 27.01 },
{ symbol: 'INTC', price: 21.75 },
{ symbol: 'MSFT', price: 27.96 },
{ symbol: 'MSFT', price: 31.21 },
{ symbol: 'INTC', price: 22.54 },
{ symbol: 'INTC', price: 20.98 },
{ symbol: 'MSFT', price: 30.73 }]
function priceIncrease(w) {
return w[1].price / w[0].price - 1;
}
Rx.Observable.fromArray(quotes)
.groupBy(function(t) { return t.symbol; })
.selectMany(function(stockStream) {
return stockStream.bufferWithCount(2, 1)
.where(function(w) {
return priceIncrease(w) > 0.1;
})
.select(function(increases) {
return { symbol: stockStream.key, increase: increases };
});
})
.subscribe(function(increase) {
console.log(increase);
});
// { symbol: 'MSFT', increase: [{ symbol: 'MSFT', price: 27.96 }, { symbol: 'MSFT', price: 31.21 }] }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment