Ind.ie is now Small Technology Foundation.
Commit 6fb7fc9b authored by Aral Balkan's avatar Aral Balkan

Replicating

parent 040a1dae
......@@ -7,6 +7,12 @@ const { Buffer } = require('buffer')
const ram = require('random-access-memory')
const hypercore = require('hypercore')
// Web socket / replication
const webSocketStream = require('websocket-stream')
const pump = require('pump')
const nextId = require('monotonic-timestamp-base36')
// From libsodium.
function to_hex(input) {
// Disable input checking for this simple spike.
......@@ -88,8 +94,6 @@ function logError(error) {
}
function generatePassphrase () {
console.log('--- generatePassphrase() ---')
resetForm()
showProgressIndicator()
......@@ -120,9 +124,6 @@ function clearOutputFields() {
}
function generateKeys() {
console.log('--- generateKeys() ---')
const passphrase = setupForm.elements.passphrase.value
const domain = setupForm.elements.domain.value
......@@ -171,7 +172,10 @@ function generateKeys() {
feed.on('ready', () => {
console.log(`Feed: [Ready] ${to_hex(feed.key)}`)
const feedKey = feed.key
const feedKeyInHex = to_hex(feedKey)
console.log(`Feed: [Ready] ${feedKeyInHex}`)
blinkSignal('ready')
generatedTextField.value = 'Yes'
......@@ -181,6 +185,30 @@ function generateKeys() {
return
}
// Hypercore feed is ready: connect to web socket and start replicating.
const remoteStream = webSocketStream(`wss://localhost/hypha/${feedKeyInHex}`)
const localStream = feed.replicate({
encrypt: false,
live: true
})
// Create a duplex stream.
//
// What’s actually happening:
//
// remoteStream.write -> localStream.read
// localStream.write -> remoteStream.read
pump(
remoteStream,
localStream,
remoteStream,
(error) => {
console.log(`Pipe closed for ${feedKeyInHex}`, error && error.message)
logError(error.message)
}
)
//
// Note: the order of execution for an append appears to be:
//
......@@ -210,7 +238,6 @@ function generateKeys() {
let counter = 0
const intervalToUpdateInMS = 500
Date.prototype.getUnixTime = function() { return this.getTime()/1000|0 }
updateInterval = setInterval(() => {
counter++
if (counter === NUMBER_TO_APPEND) {
......@@ -219,7 +246,7 @@ function generateKeys() {
updateInterval = null
}
const key = (new Date()).getUnixTime()
const key = nextId()
const value = Math.random()*1000000000000000000 // simple random number
let obj = {}
obj[key] = value
......
......@@ -2,11 +2,21 @@ const fs = require('fs')
const https = require('https')
const { pipeline } = require('stream')
const express = require('express')
const expressWebSocket = require('express-ws')
const websocketStream = require('websocket-stream/stream')
const ram = require('random-access-memory')
const hypercore = require('hypercore')
const budo = require('budo')
const babelify = require('babelify')
const router = express.Router()
const hypercores = {}
const server = budo('client/index.js', {
live: true,
live: false,
port: 443,
ssl: true,
dir: 'client/static/', // Static content directory
......@@ -16,10 +26,73 @@ const server = budo('client/index.js', {
stream: process.stdout, // Log to console
browserify: {
transform: babelify
}
},
middleware: [
router
]
})
server.on('connect', (event) => {
// Setup our web socket server (in addition to Budo’s, which is
// used for live reload).
expressWebSocket(router, event.server, {
perMessageDeflate: false
})
// Add web socket routes.
router.ws('/hypha/:readKey', (webSocket, request) => {
const readKey = request.params.readKey
console.log('Got web socket request for ', readKey)
if (hypercores[readKey] !== undefined) {
console.log(`Hypercore with read key ${readKey} already exists. Ignoring.`)
return
}
// Create a new hypercore with the passed read key and replicate.
const newCore = hypercore((filename) => ram(filename), readKey, {
createIfMissing: false,
overwrite: false,
valueEncoding: 'json',
onwrite: (index, data, peer, next) => {
// console.log(`Feed: [onWrite] index = ${index}, peer = ${peer}, data:`)
// console.log(data)
next()
}
})
newCore.on('ready', () => {
console.log(`Hypercore ready (${readKey})`)
const remoteStream = websocketStream(webSocket)
const localReadStream = newCore.createReadStream({live: true})
localReadStream.on('data', (data) => {
console.log('[Replicate]', data)
})
//
// Replicate :)
//
const localReplicationStream = newCore.replicate({
encrypt: false,
live: true
})
pipeline(
remoteStream,
localReplicationStream,
remoteStream,
(error) => {
console.log(`Pipe closed for ${readKey}`, error && error.message)
}
)
})
})
// TODO: Join swarm, add cancellation, etc.
// Display connection info.
const horizontalRule = new Array(60).fill('').join('')
console.log('\nHypha Spike: DAT 1')
console.log(horizontalRule)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment