Verified Commit 01bc9017 authored by Aral Balkan's avatar Aral Balkan
Browse files

Merge branch 'hyperdb'

parents e02a99f0 1040fa0c
Pipeline #763 canceled with stages
node_modules
......@@ -16,30 +16,30 @@ var mySwarm = discoverySwarm(swarmDefaults({
extensions: ['ephemeral']
})
// ...
return stream
return streams
}
}))
/**
* Step 2. Wire up each dat you create
*/
datEphemeralExtMsg.watchDat(archiveOrHypercore) // can give a hyperdrive or hypercore
// datEphemeralExtMsg.unwatchDat(archiveOrHypercore) when done
datEphemeralExtMsg.watchDat(database) // can pass a hypercore, hyperdb, or hyperdrive reference as the database
// datEphemeralExtMsg.unwatchDat(database) when done
/**
* Step 3. Listen to events
*/
datEphemeralExtMsg.on('message', (archiveOrHypercore, peer, {contentType, payload}) => {
// `peer` has sent `payload` of mimetype `contentType` for `archiveOrHypercore`
datEphemeralExtMsg.on('message', (database, peer, {contentType, payload}) => {
// `peer` has sent `payload` of mimetype `contentType` for `database`
})
datEphemeralExtMsg.on('received-bad-message', (err, archiveOrHypercore, peer, messageBuffer) => {
datEphemeralExtMsg.on('received-bad-message', (err, database, peer, messageBuffer) => {
// there was an error parsing a received message
})
/**
* Step 4. Use the API
*/
datEphemeralExtMsg.hasSupport(archiveOrHypercore, peerId)
datEphemeralExtMsg.broadcast(archiveOrHypercore, {contentType, payload})
datEphemeralExtMsg.send(archiveOrHypercore, peerId, {contentType, payload})
datEphemeralExtMsg.hasSupport(database, peerId)
datEphemeralExtMsg.broadcast(database, {contentType, payload})
datEphemeralExtMsg.send(database, peerId, {contentType, payload})
```
......@@ -108,9 +108,16 @@ class DatWatcher {
}
get hypercore () {
// if dat is a hyperdrive, use the metadata hypercore
// if dat is a hyperdrive, use the metadata hypercore,
// if it’s hyperdb, use the source hypercore,
// otherwise assume dat is a hypercore already
return this.dat.metadata ? this.dat.metadata : this.dat
if (this.dat.metadata) {
return this.dat.metadata
} else if (this.dat.source) {
return this.dat.source
} else {
return this.dat
}
}
getPeer (remoteId) {
......
This diff is collapsed.
{
"name": "@beaker/dat-ephemeral-ext-msg",
"version": "1.0.1",
"description": "Methods for implementing DEP-0000: Ephemeral Messages (Extension Message)",
"name": "@hypha/ephemeral-messaging-channel",
"version": "1.0.2",
"description": "Symmetrically-encrypted ephemeral messaging channel over hypercore protocol for hypercore, hyperdb, and hyperdrive.",
"main": "index.js",
"scripts": {
"test": "tape test.js",
"test": "tape test/*.js",
"protobuf": "protocol-buffers schema.proto -o encodings.js"
},
"repository": {
......@@ -27,7 +27,9 @@
"protocol-buffers-encodings": "^1.1.0"
},
"devDependencies": {
"hyperdrive": "^9.13.0",
"hypercore": "^6.24.0",
"hyperdb": "^3.5.0",
"hyperdrive": "^9.14.2",
"protocol-buffers": "^4.0.4",
"random-access-memory": "^3.0.0",
"tape": "^4.9.1"
......
var tape = require('tape')
var hyperdrive = require('hyperdrive')
var ram = require('random-access-memory')
var {DatEphemeralExtMsg} = require('./')
tape('exchange ephemeral messages', function (t) {
// must use 2 instances to represent 2 different nodes
var srcEphemeral = new DatEphemeralExtMsg()
var cloneEphemeral = new DatEphemeralExtMsg()
var src = hyperdrive(ram)
var clone
src.on('ready', function () {
// generate source archive
t.ok(src.writable)
t.ok(src.metadata.writable)
t.ok(src.content.writable)
src.writeFile('/first.txt', 'number 1', function (err) {
t.error(err, 'no error')
src.writeFile('/second.txt', 'number 2', function (err) {
t.error(err, 'no error')
src.writeFile('/third.txt', 'number 3', function (err) {
t.error(err, 'no error')
t.same(src.version, 3, 'version correct')
// generate clone instance
clone = hyperdrive(ram, src.key)
clone.on('ready', startReplication)
})
})
})
})
function startReplication () {
// wire up archives
srcEphemeral.watchDat(src)
cloneEphemeral.watchDat(clone)
// listen to events
var messageEventCount1 = 0
srcEphemeral.on('message', onMessage1)
cloneEphemeral.on('message', onMessage1)
function onMessage1 (archive, peer, msg) {
if (archive === src) {
// received clone's data
t.same(msg.contentType, 'application/json', 'received clone data')
t.same(msg.payload.toString('utf8'), '"bar"', 'received clone data')
}
if (archive === clone) {
// received src's data
t.same(msg.contentType, 'application/json', 'received src data')
t.same(msg.payload.toString('utf8'), '"foo"', 'received src data')
}
if (++messageEventCount1 === 2) {
hasReceivedEvents1()
}
}
// start replication
var stream1 = clone.replicate({
id: new Buffer('clone-stream'),
live: true,
extensions: ['ephemeral']
})
var stream2 = src.replicate({
id: new Buffer('src-stream'),
live: true,
extensions: ['ephemeral']
})
stream1.pipe(stream2).pipe(stream1)
// wait for handshakes
var handshakeCount = 0
stream1.on('handshake', gotHandshake)
stream2.on('handshake', gotHandshake)
function gotHandshake () {
if (++handshakeCount !== 2) return
// has support
t.ok(srcEphemeral.hasSupport(src, src.metadata.peers[0]), 'clone has support')
t.ok(cloneEphemeral.hasSupport(clone, clone.metadata.peers[0]), 'src has support')
// send values
srcEphemeral.send(src, src.metadata.peers[0], {contentType: 'application/json', payload: '"foo"'})
cloneEphemeral.send(clone, clone.metadata.peers[0], {contentType: 'application/json', payload: '"bar"'})
}
function hasReceivedEvents1 () {
srcEphemeral.removeListener('message', onMessage1)
cloneEphemeral.removeListener('message', onMessage1)
// listen to new events
var messageEventCount2 = 0
srcEphemeral.on('message', onMessageEvent2)
cloneEphemeral.on('message', onMessageEvent2)
function onMessageEvent2 (archive, peer, msg) {
if (archive === src) {
// received clone's data
t.same(msg.contentType, 'application/octet-stream', 'received clone data')
t.ok(msg.payload.equals(Buffer.from([4,3,2,1])), 'received clone data')
}
if (archive === clone) {
// received src's data
t.same(msg.contentType, '', 'received src data')
t.ok(msg.payload.equals(Buffer.from([1,2,3,4])), 'received src data')
}
if (++messageEventCount2 === 2) {
hasReceivedEvents2()
}
}
// broadcast new values
srcEphemeral.broadcast(src, {payload: Buffer.from([1,2,3,4])})
cloneEphemeral.broadcast(clone, {contentType: 'application/octet-stream', payload: Buffer.from([4,3,2,1])})
}
function hasReceivedEvents2 () {
// unwatch
srcEphemeral.unwatchDat(src)
cloneEphemeral.unwatchDat(clone)
t.end()
}
}
})
tape('no peers causes no issue', function (t) {
var ephemeral = new DatEphemeralExtMsg()
var src = hyperdrive(ram)
src.on('ready', function () {
ephemeral.watchDat(src)
ephemeral.broadcast(src, {contentType: 'application/json', payload: '"test"'})
t.pass('no error thrown')
t.end()
})
})
tape('fires received-bad-message', function (t) {
// must use 2 instances to represent 2 different nodes
var srcEphemeral = new DatEphemeralExtMsg()
var cloneEphemeral = new DatEphemeralExtMsg()
var src = hyperdrive(ram)
var clone
src.on('ready', function () {
// generate clone instance
clone = hyperdrive(ram, src.key)
clone.on('ready', startReplication)
})
function startReplication () {
// wire up archives
srcEphemeral.watchDat(src)
cloneEphemeral.watchDat(clone)
// listen to events
cloneEphemeral.on('received-bad-message', err => {
t.ok(err instanceof Error, 'error was emitted')
t.end()
})
// start replication
var stream1 = clone.replicate({
id: new Buffer('clone-stream'),
live: true,
extensions: ['ephemeral']
})
var stream2 = src.replicate({
id: new Buffer('src-stream'),
live: true,
extensions: ['ephemeral']
})
stream1.pipe(stream2).pipe(stream1)
// wait for handshakes
var handshakeCount = 0
stream1.on('handshake', gotHandshake)
stream2.on('handshake', gotHandshake)
function gotHandshake () {
if (++handshakeCount !== 2) return
// has support
t.ok(srcEphemeral.hasSupport(src, src.metadata.peers[0]), 'clone has support')
t.ok(cloneEphemeral.hasSupport(clone, clone.metadata.peers[0]), 'src has support')
// send bad message
src.metadata.peers[0].stream.extension('ephemeral', Buffer.from([0,1,2,3]))
}
}
})
// Common tests that are run once for
// hypercore, hyperdb, and hyperdrive.
var tape = require('tape')
var ram = require('random-access-memory')
var {DatEphemeralExtMsg} = require('../../')
module.exports = function (database) {
var isHypercore = database.name === 'Feed'
var isHyperDB = database.name === 'HyperDB'
var isHyperdrive = database.name === 'Hyperdrive'
var databaseNames = {Feed: 'hypercore', HyperDB: 'hyperdb', Hyperdrive: 'hyperdrive'}
var databaseName = databaseNames[database.name]
tape(`exchange ephemeral messages: ${databaseName}`, function (t) {
// must use 2 instances to represent 2 different nodes
var srcEphemeral = new DatEphemeralExtMsg()
var cloneEphemeral = new DatEphemeralExtMsg()
var src = database(ram)
var clone
var cloneFeed
var self = this
// Isomorphic interface to support hypercore, hyperdb, and hyperdrive.
// The three packages have slightly different APIs that makes this necessary.
// TODO: open issue for unifying the interfaces.
var srcFeed = src.source || src.metadata || src
var putFunction = isHyperdrive ? 'writeFile' : isHyperDB ? 'put' : 'append'
function firstCallback (err) {
t.error(err, 'no error')
src[putFunction].apply(src, secondArgs)
}
function secondCallback (err) {
t.error(err, 'no error')
src[putFunction].apply(src, thirdArgs)
}
function thirdCallback (err) {
t.error(err, 'no error')
if (isHyperdrive) {
t.same(src.version, 3, 'version correct')
}
// generate clone instance
clone = database(ram, src.key)
cloneFeed = clone.source || clone.metadata || clone
clone.on('ready', startReplication)
}
var firstArgs = (isHyperdrive || isHyperDB) ? ['/first.txt', 'number 1', firstCallback] : ['first', firstCallback]
var secondArgs = (isHyperdrive || isHyperDB) ? ['/second.txt', 'number 2', secondCallback] : ['second', secondCallback]
var thirdArgs = (isHyperdrive || isHyperDB) ? ['/third.txt', 'number 3', thirdCallback] : ['first', thirdCallback]
src.on('ready', function () {
// generate source archive
t.ok(srcFeed.writable)
src[putFunction].apply(src, firstArgs)
})
function startReplication () {
// wire up archives
srcEphemeral.watchDat(src)
cloneEphemeral.watchDat(clone)
// listen to events
var messageEventCount1 = 0
srcEphemeral.on('message', onMessage1)
cloneEphemeral.on('message', onMessage1)
function onMessage1 (archive, peer, msg) {
if (archive === src) {
// received clone's data
t.same(msg.contentType, 'application/json', 'received clone data')
t.same(msg.payload.toString('utf8'), '"bar"', 'received clone data')
}
if (archive === clone) {
// received src's data
t.same(msg.contentType, 'application/json', 'received src data')
t.same(msg.payload.toString('utf8'), '"foo"', 'received src data')
}
if (++messageEventCount1 === 2) {
hasReceivedEvents1()
}
}
// start replication
var stream1 = clone.replicate({
id: Buffer.from('clone-stream'),
live: true,
extensions: ['ephemeral']
})
var stream2 = src.replicate({
id: Buffer.from('src-stream'),
live: true,
extensions: ['ephemeral']
})
stream1.pipe(stream2).pipe(stream1)
// wait for handshakes
var handshakeCount = 0
stream1.on('handshake', gotHandshake)
stream2.on('handshake', gotHandshake)
function gotHandshake () {
if (++handshakeCount !== 2) return
// We need to do this on the next tick to give clone’s peers a chance to populate.
process.nextTick(() => {
// has support
t.ok(srcEphemeral.hasSupport(src, srcFeed.peers[0]), 'src has support')
t.ok(cloneEphemeral.hasSupport(clone, cloneFeed.peers[0]), 'clone has support')
// send values
srcEphemeral.send(src, srcFeed.peers[0], {contentType: 'application/json', payload: '"foo"'})
cloneEphemeral.send(clone, cloneFeed.peers[0], {contentType: 'application/json', payload: '"bar"'})
})
}
function hasReceivedEvents1 () {
srcEphemeral.removeListener('message', onMessage1)
cloneEphemeral.removeListener('message', onMessage1)
// listen to new events
var messageEventCount2 = 0
srcEphemeral.on('message', onMessageEvent2)
cloneEphemeral.on('message', onMessageEvent2)
function onMessageEvent2 (archive, peer, msg) {
if (archive === src) {
// received clone's data
t.same(msg.contentType, 'application/octet-stream', 'received clone data')
t.ok(msg.payload.equals(Buffer.from([4,3,2,1])), 'received clone data')
}
if (archive === clone) {
// received src's data
t.same(msg.contentType, '', 'received src data')
t.ok(msg.payload.equals(Buffer.from([1,2,3,4])), 'received src data')
}
if (++messageEventCount2 === 2) {
hasReceivedEvents2()
}
}
// broadcast new values
srcEphemeral.broadcast(src, {payload: Buffer.from([1,2,3,4])})
cloneEphemeral.broadcast(clone, {contentType: 'application/octet-stream', payload: Buffer.from([4,3,2,1])})
}
function hasReceivedEvents2 () {
// unwatch
srcEphemeral.unwatchDat(src)
cloneEphemeral.unwatchDat(clone)
t.end()
}
}
})
tape(`no peers causes no issue: ${databaseName}`, function (t) {
var ephemeral = new DatEphemeralExtMsg()
var src = database(ram)
src.on('ready', function () {
ephemeral.watchDat(src)
ephemeral.broadcast(src, {contentType: 'application/json', payload: '"test"'})
t.pass('no error thrown')
t.end()
})
})
tape(`fires received-bad-message: ${databaseName}`, function (t) {
// must use 2 instances to represent 2 different nodes
var srcEphemeral = new DatEphemeralExtMsg()
var cloneEphemeral = new DatEphemeralExtMsg()
var src = database(ram)
var srcFeed = src.source || src.metadata || src
var clone
var cloneFeed
src.on('ready', function () {
// generate clone instance
clone = database(ram, src.key)
cloneFeed = clone.source || clone.metadata || clone
clone.on('ready', startReplication)
})
function startReplication () {
// wire up archives
srcEphemeral.watchDat(src)
cloneEphemeral.watchDat(clone)
// listen to events
cloneEphemeral.on('received-bad-message', err => {
t.ok(err instanceof Error, 'error was emitted')
t.end()
})
// start replication
var stream1 = clone.replicate({
id: Buffer.from('clone-stream'),
live: true,
extensions: ['ephemeral']
})
var stream2 = src.replicate({
id: Buffer.from('src-stream'),
live: true,
extensions: ['ephemeral']
})
stream1.pipe(stream2).pipe(stream1)
// wait for handshakes
var handshakeCount = 0
stream1.on('handshake', gotHandshake)
stream2.on('handshake', gotHandshake)
function gotHandshake () {
if (++handshakeCount !== 2) return
// We need to do this on the next tick to give clone’s peers a chance to populate.
process.nextTick(() => {
// has support
t.ok(srcEphemeral.hasSupport(src, srcFeed.peers[0]), 'src has support')
t.ok(cloneEphemeral.hasSupport(clone, cloneFeed.peers[0]), 'clone has support')
// send bad message
srcFeed.peers[0].stream.extension('ephemeral', Buffer.from([0,1,2,3]))
})
}
}
})
}
var database = require('hypercore')
require('./common')(database)
var database = require('hyperdb')
require('./common')(database)
var database = require('hyperdrive')
require('./common')(database)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment