Commit 0bf85b25 authored by Aral Balkan's avatar Aral Balkan
Browse files

Added pulse event handling and stream weaver. Closes #22. Updated readme with...

Added pulse event handling and stream weaver. Closes #22. Updated readme with note on NFS and warning about not using npm install … --save.
parent b67729a8
......@@ -3,4 +3,5 @@ node_modules
data
views/me
npm-debug.log
config.json
\ No newline at end of file
config.json
db
\ No newline at end of file
################################################################################
#
# PulseEventHandler
#
# Terribly copy/pasted/hacked from the Pulse Event Pipe in Heartbeat.
# TODO: Refactor once it is working to remove any redundancies that may remain.
#
# This is independent technology. See ind.ie/manifesto
#
# Copyright © Aral Balkan. Copyright © Ind.ie
# Released under the GNU AGPLv3 license.
#
################################################################################
request = require 'superagent-as-promised'
StreamWeaver = require './StreamWeaver'
class PulseEventHandler
lastPulseEventID: 0
streamWeaver: null
pollIntervalID: null
constructor: ->
@streamWeaver = new StreamWeaver
@start()
stop: =>
clearInterval @pollIntervalID
start: =>
console.log 'Info: Node: Starting the Pulse event pipe…'
@poll()
@pollIntervalID = setInterval @poll, 5000
poll: =>
#
# Polls Pulse every five seconds for new events and pipes the relevant ones
# to the Cocoa client.
#
# console.log "Asking for Pulse events since ID = #{@lastPulseEventID}"
request.get("http://localhost:8080/rest/events?since=#{@lastPulseEventID}")
.then (response) =>
# console.log 'Got Pulse Events API response.'
events = JSON.parse response.text
numEvents = events.length
firstEvent = events[0]
if !firstEvent
# console.log 'No Pulse events returned.'
return
firstEventID = firstEvent.id
if firstEventID > @lastPulseEventID + 1
console.log "⚠️ We are missing Pulse events (current ID: #{firstEventID}, last ID received previous time: #{@lastPulseEventID})"
# TODO: Load the missing events.
lastEventID = events[numEvents-1].id
@lastPulseEventID = lastEventID
#
# Filter the events and only send the relevant ones to the Cocoa client.
#
eventsToSendToCocoa = []
for event in events
#
# Only send the events that we’re interested in to the Cocoa client.
#
# Local index updated.
if event.type == "LocalIndexUpdated"
eventsToSendToCocoa.push event
break
# Remote index updated event.
if event.type == "RemoteIndexUpdated" and event.data.items > 0
eventsToSendToCocoa.push event
break
# State changed to or from syncing.
if event.type == "StateChanged"
if event.to == "syncing" or event.from == "syncing"
eventsToSendToCocoa.push event
break
# Others
if event.type == "DeviceDiscovered" or event.type == "DeviceConnected" || event.type == "DeviceDisconnected" || event.type == "ItemStarted" || event.type == "ItemFinished" || event.type == "DownloadProgress"
eventsToSendToCocoa.push event
break
#
# Events we care about but may not want to propagate to the Cocoa client.
#
if event.type == "Starting"
console.log "Info: Pulse event: Pulse starting."
break
if event.type == "StartupComplete"
# TODO: Should we send this to Cocoa to use as the notification for Pulse readiness instead of console output?
console.log "Info: Pulse event: Pulse start-up complete."
break
if event.type == "FolderRejected"
console.log "Warning: Pulse event: Folder rejected: Device: #{event.data.device} Folder: #{event.data.folder}."
break
if event.type == "DeviceRejected"
console.log "Warning: Pulse event: Device rejected: Address: #{event.data.address} Device: #{event.data.device}."
break
if event.type == "ConfigSaved"
console.log 'Info: Pulse event: Config saved.'
break
numRelevantEvents = eventsToSendToCocoa.length
# Debug
# console.log "#{numEvents} events received."
# console.log "First event ID = #{firstEventID}"
# console.log "Last event ID = #{lastEventID}"
# console.log "Number of relevant Pulse events to send to Cocoa client: #{numRelevantEvents}"
#
# Send the relevant events back to the Cocoa client.
#
if numRelevantEvents > 0
for event in eventsToSendToCocoa
event.channel = "pulse"
console.log "Sending #{numRelevantEvents} relevant Pulse events back to Cocoa…"
eventsToSendToCocoaString = JSON.stringify eventsToSendToCocoa
#@ws.send(eventsToSendToCocoaString, {binary:true})
# else
# console.log "No relevant Pulse events to send to Cocoa client at this time."
.catch ((error) =>
console.log 'Error getting response from Pulse events API:'
console.log error
)
module.exports = PulseEventHandler
\ No newline at end of file
################################################################################
#
# Ind.ie Heartbeat Node
#
# StreamWeaver — weaves timeline streams using LevelDB
#
# This is a singleton but that’s an implementation detail. Just use with
# new Streamweaver as usual and it’ll handle it for you.
#
# This is independent technology. See ind.ie/manifesto
#
# Copyright © Aral Balkan. Copyright © Ind.ie
# Released under the GNU AGPLv3 license.
#
################################################################################
LevelUp = require 'levelup'
Sublevel = require 'level-sublevel'
WriteStream = require 'write-stream'
liveStream = require 'level-live-stream'
class StreamWeaver
instance = null
timeline: {}
timelines: {}
privateTimeline: null
publicTimeline: null
db: null
constructor: ->
# Singleton access
if instance
return instance
#
# Initialise the singleton instance
#
console.log 'Initialising StreamWeaver singleton.'
# Create database
@db = Sublevel (LevelUp 'db')
@createTimelines()
# @createStreams()
# @debugStreams()
# Save reference to the singleton instance.
instance = @
return instance
createTimelines: =>
#
# Timelines
#
@timeline.private = "private"
@timeline.public = "public"
@privateTimeline = @db.sublevel @timeline.private
@publicTimeline = @db.sublevel @timeline.public
@timelines[@timeline.private] = @privateTimeline
@timelines[@timeline.public] = @publicTimeline
createStreams: =>
#
# Streams
#
options = { min: '\x00', max: '\uffff', limit: 100}
@privateStream = liveStream @privateTimeline, options
@publicStream = liveStream @publicTimeline, options
debugStreams: =>
#
# Debug — live stream the timelines
#
@privateStream.pipe WriteStream (chunk) ->
console.log "Private timeline: #{chunk.type}, #{chunk.key.toString()}, #{chunk.value && chunk.value.toString()}"
@publicStream.pipe WriteStream (chunk) ->
console.log "Public timeline: #{chunk.type}, #{chunk.key.toString()}, #{chunk.value && chunk.value.toString()}"
#
# Add a message to a timeline (a folder in LevelDB)
#
weaveMessage: (messageID, message, timeline) =>
console.log "Weaving #{messageID}: #{message} into #{timeline}"
@timelines[timeline].put messageID, message
#
# Get timeline
#
# Returns a promise to return the requested timeline.
# (We need to stream the timelines contents from the database.)
#
getTimeline: (timeline) =>
# console.log "Streamweaver::getTimeline: #{timeline}"
return new Promise ((fulfill, reject) =>
# Create a new stream for the requested timeline
options = { gt: '\x00', lt: '\uffff', limit: 100}
# The private and public timelines are displayed in reverse order
# (latest post first). However, since we implement that when displaying
# the posts (so that new posts are added correctly) — we are doing the
# opposite here. Confusing, I know.
#
# Also known as document/blog order. All other
# timelines are displayed in conversation order (latest post last).
options.reverse = !(timeline == @timeline.public || timeline == @timeline.private)
# console.log "TIMELINE REVERSED? #{options.reverse}"
# console.log "@timelines[timeline] = #{@timelines[timeline]}"
timelineStream = @timelines[timeline].createReadStream options
timelineStream.on 'error', (err) ->
console.error('timelineStream.on error ' + err.message)
# Stream the “old” posts, in reverse order to an array
# arrayOfMessages = []
toArray = WriteStream.toArray (arrayOfMessages) =>
# console.log "Streamweaver: got the timeline messages!"
#
# We have the old messages in an array, fulfill the promise.
#
process.nextTick(=>
fulfill arrayOfMessages
)
# Pipe the timeline stream to the array using WriteStream.
timelineStream.pipe toArray
)
module.exports = StreamWeaver
#!/bin/bash
#
# This script will mount /Users in the boot2docker VM using NFS (instead of the
# default vboxsf). It's probably not a good idea to run it while there are
# Docker containers running in boot2docker.
#
# Usage: sudo ./boot2docker-use-nfs.sh
#
if [ "$USER" != "root" ]
then
echo "This script must be run with sudo: sudo ${0}"
exit -1
fi
# Run command as non root http://stackoverflow.com/a/10220200/96855
B2D_IP=$(sudo -u ${SUDO_USER} boot2docker ip &> /dev/null)
if [ "$?" != "0" ]
then
sudo -u ${SUDO_USER} boot2docker up
$(sudo -u ${SUDO_USER} boot2docker shellinit)
B2D_IP=$(sudo -u ${SUDO_USER} boot2docker ip &> /dev/null)
#echo "You need to start boot2docker first: boot2docker up && \$(boot2docker shellinit) "
#exit -1
fi
OSX_IP=$(ifconfig en0 | grep --word-regexp inet | awk '{print $2}')
MAP_USER=${SUDO_USER}
MAP_GROUP=$(sudo -u ${SUDO_USER} id -n -g)
# Backup exports file
$(cp -n /etc/exports /etc/exports.bak) && \
echo "Backed up /etc/exports to /etc/exports.bak"
# Delete previously generated line if it exists
grep -v '^/Users ' /etc/exports > /etc/exports
# We are using the OS X IP because the b2d VM is behind NAT
echo "/Users -mapall=${MAP_USER}:${MAP_GROUP} ${OSX_IP}" \
>> /etc/exports
nfsd restart
sudo -u ${SUDO_USER} boot2docker ssh << EOF
echo "Unmounting /Users"
sudo umount /Users 2> /dev/null
echo "Restarting nfs-client"
sudo /usr/local/etc/init.d/nfs-client restart 2> /dev/null
echo "Waiting 10s for nfsd and nfs-client to restart."
sleep 10
echo "Mounting /Users"
sudo mount $OSX_IP:/Users /Users -o rw,async,noatime,rsize=32768,wsize=32768,proto=tcp,nfsvers=3
echo "Mounted /Users:"
ls -al /Users
exit
EOF
......@@ -54,7 +54,8 @@ PulseAPI = require 'indie-pulse-api'
PulseProcess = require 'indie-pulse-process'
PulseConfig = require 'indie-pulse-config'
Gaze = require('gaze').Gaze
# TODO: Re-enable
PulseEventHandler = require './PulseEventHandler'
homeDirectory = path.homedir()
......@@ -252,9 +253,8 @@ gracefulShutdown = () ->
console.log 'Exiting… cleaning up and shutting down Pulse.'
pulseProcess.stop()
#
# Gaze
# Handle Pulse events
#
pulseEventHandler = new PulseEventHandler
# globalWatcher = new Gaze chatPath + '/**'
......@@ -15,12 +15,18 @@
"fs-extra": "0.12.0",
"gaze": "0.5.x",
"indie-set": "0.0.7",
"level-live-stream": "1.4.9",
"level-sublevel": "6.4.6",
"leveldown": "0.10.4",
"levelup": "0.19.0",
"moment-timezone": "0.3.0",
"path": "0.4.9",
"path-extra": "^0.3.0",
"slug": "0.8.0",
"superagent": "0.18.2",
"superagent-as-promised": "2.0.0",
"thrush": "0.0.5",
"write-stream": "^0.4.3",
"ws": "0.4.32"
},
"devDependencies": {
......
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -79,6 +79,26 @@ Then re-run ```./dev``` and access the Web UI from:
192.168.59.103:8080
# On Vbox shared folders (boot2docker) and LevelDB / NFS
Vbox shared folders do not work with LevelDB at this time ([vboxvfs has issues with mmapped access](https://www.virtualbox.org/ticket/819#comment:64)). So we need to use NFS.
[This script will activate NFS](http://syskall.com/using-boot2docker-using-nfs-instead-of-vboxsf/) but you’ll get the following error if you run it:
mount: RPC: Authentication error; why = Client credential too weak
[To fix that](https://gist.github.com/sirkkalap/40261ed82386ad8a6409), we need to add the following to ```etc/nfs.conf```:
nfs.server.mount.require_resv_port = 0
And restart the NFS daemon:
sudo nfsd restart
# Development notes
If you want to add a new Node module to Waystone, do **not** ```npm install <module> --save```. You should not have a node_modules folder in your source. The install script will create the node_modules folder in the Docker container. This is important because native modules need to be compiled under for the correct OS (the one in the container, not your development environment). If you install native modules locally, these will take precedence when Waystone runs and you will get fun errors like ‘invalid ELF header’ (which, in this context, basically means you’re running a binary built for your development platform on the container platform).
# Testing the API
Currently, I’m using [Paw](https://luckymarmot.com/paw) to test.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment