Verified Commit 0ffbe2c6 authored by Aral Balkan's avatar Aral Balkan
Browse files

Add support for unprivileged relay nodes

parent 050b2020
......@@ -47,6 +47,12 @@ class SecureEphemeralMessagingChannel extends EventEmitter {
// send a message to a peer
send (database, remoteId, message) {
// Check that this node can send messages.
if (this.secretKey === undefined) {
throw new Error('Unprivileged nodes cannot send messages.')
}
var {watcher} = this.getWatcher(database)
if (watcher) {
return watcher.send(remoteId, message)
......@@ -55,6 +61,12 @@ class SecureEphemeralMessagingChannel extends EventEmitter {
// send a message to all peers
broadcast (database, message) {
// Check that this node can broadcast messages.
if (this.secretKey === undefined) {
throw new Error('Unprivileged nodes cannot broadcast messages.')
}
var {watcher} = this.getWatcher(database)
if (watcher) {
return watcher.broadcast(message)
......@@ -63,10 +75,9 @@ class SecureEphemeralMessagingChannel extends EventEmitter {
}
exports.SecureEphemeralMessagingChannel = SecureEphemeralMessagingChannel
// internal
// =
// Private.
// helper class to track individual dats
// Helper class to track individual databases.
class DatabaseWatcher {
constructor (database, emitter, secretKey) {
this.database = database
......@@ -77,23 +88,23 @@ class DatabaseWatcher {
this.onPeerRemove = this.onPeerRemove.bind(this)
}
send (remoteId, message = {}) {
// get peer and assure support exists
send (remoteId, message = {}, serialise = true) {
// Get peer and assure support exists for the protocol extension.
var peer = this.getPeer(remoteId)
if (!remoteSupports(peer, 'secure-ephemeral')) {
return
}
// send
message = serialize(message, this.secretKey)
message = serialise ? _serialise(message, this.secretKey) : message
getPeerFeedStream(peer).extension('secure-ephemeral', message)
}
broadcast (message) {
// send to all peers
broadcast (message, serialise = true) {
// Send to all peers.
var peers = this.hypercore.peers
for (let i = 0; i < peers.length; i++) {
this.send(peers[i], message)
this.send(peers[i], message, serialise)
}
}
......@@ -129,6 +140,20 @@ class DatabaseWatcher {
// handle ephemeral messages only
if (type !== 'secure-ephemeral') return
if (this.secretKey === undefined) {
// This is an unprivileged node. It cannot decrypt received messages
// as it does not have the secret key. Instead, it should act as a
// relay and re-broadcast received messages to other nodes.
// // Decode the message using protocol buffers and emit a relay message.
// // Useful for debugging. We can disable this in production to reduce load.
// const decodedMessage = encodings.SecureEphemeralMessage.decode(codedMessage)
// this.emitter.emit('relay', decodedMessage)
this.broadcast(codedMessage, /* serialise = */ false)
return
}
try {
// Decode the message using protocol buffers.
const decodedMessage = encodings.SecureEphemeralMessage.decode(codedMessage)
......@@ -163,10 +188,10 @@ class DatabaseWatcher {
// emit
this.emitter.emit('message', this.database, peer, parsedMessage)
} catch (e) {
} catch (error) {
// TODO: Improve this: we should return different errors based on specifics
// e.g., decryption failed, etc.
this.emitter.emit('received-bad-message', e, this.database, peer/*, message*/)
this.emitter.emit('received-bad-message', error, this.database, peer/*, message*/)
}
})
}
......@@ -176,7 +201,7 @@ class DatabaseWatcher {
}
}
function serialize (message, secretKey) {
function _serialise (message, secretKey) {
// Message should be an object
if (typeof message !== 'object') {
throw new Error('Message must be an object.')
......
......@@ -232,4 +232,89 @@ module.exports = function (database) {
}
}
})
tape(`unprivileged nodes acts as relay: ${databaseName}`, function (t) {
var srcEphemeral = new SecureEphemeralMessagingChannel(secretKey)
// The clone ephemeral is an unprivileged node.
var cloneEphemeral = new SecureEphemeralMessagingChannel()
var src = database(ram)
var srcFeed = src.source || src.metadata || src
var clone
var cloneFeed
src.on('ready', function () {
// generate clone instance
clone = database(ram, src.key)
cloneFeed = clone.source || clone.metadata || clone
clone.on('ready', startReplication)
})
function startReplication () {
// wire up archives
srcEphemeral.addDatabase(src)
cloneEphemeral.addDatabase(clone)
// The source should get its own message relayed back from
// the unprivileged node.
srcEphemeral.on ('message', (archive, peer, message) => {
t.ok(archive === src, 'message is relayed from the unprivileged node')
t.same(JSON.stringify(message), '{"message":"regular-send"}', 'message relayed correctly')
t.end()
})
// // The clone node is unprivileged. It should get a relay message.
// cloneEphemeral.on ('relay', (decodedMessage) => {
// console.log('Relay handler', decodedMessage)
// })
// start replication
var stream1 = clone.replicate({
id: Buffer.from('clone-stream'),
live: true,
extensions: ['secure-ephemeral']
})
var stream2 = src.replicate({
id: Buffer.from('src-stream'),
live: true,
extensions: ['secure-ephemeral']
})
stream1.pipe(stream2).pipe(stream1)
// wait for handshakes
var handshakeCount = 0
stream1.on('handshake', gotHandshake)
stream2.on('handshake', gotHandshake)
function gotHandshake () {
if (++handshakeCount !== 2) return
// We need to do this on the next tick to give clone’s peers a chance to populate.
process.nextTick(() => {
// Check that the streams have support for the protocol extension.
t.ok(srcEphemeral.hasSupport(src, srcFeed.peers[0]), 'src has support')
t.ok(cloneEphemeral.hasSupport(clone, cloneFeed.peers[0]), 'clone has support')
// The source feed is privileged and can send messages.
srcEphemeral.send(src, srcFeed.peers[0], {message:'regular-send'})
try {
cloneEphemeral.send(clone, cloneFeed.peers[0], {message: 'unprivileged-send-attempt'})
} catch (error) {
t.ok(error instanceof Error, 'send attempt from unprivileged node resulted in error')
}
try {
cloneEphemeral.broadcast({message: 'unprivileged-broadcast-attempt'})
} catch (error) {
t.ok(error instanceof Error, 'broadcast attempt from unprivileged node resulted in error')
}
})
}
}
})
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment