Commit 562ddca4 authored by Paul Frazee's avatar Paul Frazee
Browse files

Initial commit

parents
# dat-ephemeral-ext-msg
Methods for implementing [DEP-0000: Ephemeral Messages (Extension Message)](https://github.com/pfrazee/DEPs/blob/630a69141f80ea218ff20aa6353b7da67ba4d849/proposals/0000-ephemeral-message.md).
```js
const {DatEphemeralExtMsg} = require('@beaker/dat-ephemeral-ext-msg')
var datEphemeralExtMsg = new DatEphemeralExtMsg()
/**
* Step 1. Register the 'ephemeral' extension in the protocol streams
*/
var mySwarm = discoverySwarm(swarmDefaults({
stream (info) {
// add to the the protocol stream
var stream = hypercoreProtocol({
extensions: ['ephemeral']
})
// ...
return stream
}
}))
/**
* Step 2. Wire up each dat you create
*/
datEphemeralExtMsg.watchDat(archiveOrHypercore) // can give a hyperdrive or hypercore
// datEphemeralExtMsg.unwatchDat(archiveOrHypercore) when done
/**
* Step 3. Listen to events
*/
datEphemeralExtMsg.on('message', (archiveOrHypercore, peer, {contentType, payload}) => {
// `peer` has sent `payload` of mimetype `contentType` for `archiveOrHypercore`
})
datEphemeralExtMsg.on('received-bad-message', (err, archiveOrHypercore, peer, messageBuffer) => {
// there was an error parsing a received message
})
/**
* Step 4. Use the API
*/
datEphemeralExtMsg.hasSupport(archiveOrHypercore, peerId)
datEphemeralExtMsg.broadcast(archiveOrHypercore, {contentType, payload})
datEphemeralExtMsg.send(archiveOrHypercore, peerId, {contentType, payload})
```
// This file is auto generated by the protocol-buffers cli tool
/* eslint-disable quotes */
/* eslint-disable indent */
/* eslint-disable no-redeclare */
/* eslint-disable camelcase */
// Remember to `npm install --save protocol-buffers-encodings`
var encodings = require('protocol-buffers-encodings')
var varint = encodings.varint
var skip = encodings.skip
var EphemeralMessage = exports.EphemeralMessage = {
buffer: true,
encodingLength: null,
encode: null,
decode: null
}
defineEphemeralMessage()
function defineEphemeralMessage () {
var enc = [
encodings.string,
encodings.bytes
]
EphemeralMessage.encodingLength = encodingLength
EphemeralMessage.encode = encode
EphemeralMessage.decode = decode
function encodingLength (obj) {
var length = 0
if (defined(obj.contentType)) {
var len = enc[0].encodingLength(obj.contentType)
length += 1 + len
}
if (!defined(obj.payload)) throw new Error("payload is required")
var len = enc[1].encodingLength(obj.payload)
length += 1 + len
return length
}
function encode (obj, buf, offset) {
if (!offset) offset = 0
if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj))
var oldOffset = offset
if (defined(obj.contentType)) {
buf[offset++] = 10
enc[0].encode(obj.contentType, buf, offset)
offset += enc[0].encode.bytes
}
if (!defined(obj.payload)) throw new Error("payload is required")
buf[offset++] = 18
enc[1].encode(obj.payload, buf, offset)
offset += enc[1].encode.bytes
encode.bytes = offset - oldOffset
return buf
}
function decode (buf, offset, end) {
if (!offset) offset = 0
if (!end) end = buf.length
if (!(end <= buf.length && offset <= buf.length)) throw new Error("Decoded message is not valid")
var oldOffset = offset
var obj = {
contentType: "",
payload: null
}
var found1 = false
while (true) {
if (end <= offset) {
if (!found1) throw new Error("Decoded message is not valid")
decode.bytes = offset - oldOffset
return obj
}
var prefix = varint.decode(buf, offset)
offset += varint.decode.bytes
var tag = prefix >> 3
switch (tag) {
case 1:
obj.contentType = enc[0].decode(buf, offset)
offset += enc[0].decode.bytes
break
case 2:
obj.payload = enc[1].decode(buf, offset)
offset += enc[1].decode.bytes
found1 = true
break
default:
offset = skip(prefix & 7, buf, offset)
}
}
}
}
function defined (val) {
return val !== null && val !== undefined && (typeof val !== 'number' || !isNaN(val))
}
const EventEmitter = require('events')
const encodings = require('./encodings')
// exported api
// =
class DatEphemeralExtMsg extends EventEmitter {
constructor () {
super()
this.datWatchers = {}
}
getWatcher (dat) {
var key = toStr(dat.key)
return {key, watcher: this.datWatchers[key]}
}
watchDat (dat) {
var {key, watcher} = this.getWatcher(dat)
if (!watcher) {
watcher = this.datWatchers[key] = new DatWatcher(dat, this)
watcher.listen()
}
}
unwatchDat (dat) {
var {key, watcher} = this.getWatcher(dat)
if (watcher) {
watcher.unlisten()
delete this.datWatchers[key]
}
}
// does the given peer have protocol support?
hasSupport (dat, remoteId) {
var {watcher} = this.getWatcher(dat)
if (watcher) {
var peer = watcher.getPeer(remoteId)
if (peer) {
return getPeerProtocolStream(peer).remoteSupports('ephemeral')
}
}
return false
}
// send a message to a peer
send (dat, remoteId, message) {
var {watcher} = this.getWatcher(dat)
if (watcher) {
return watcher.send(remoteId, message)
}
}
// send a message to all peers
broadcast (dat, message) {
var {watcher} = this.getWatcher(dat)
if (watcher) {
return watcher.broadcast(message)
}
}
}
exports.DatEphemeralExtMsg = DatEphemeralExtMsg
// internal
// =
// helper class to track individual dats
class DatWatcher {
constructor (dat, emitter) {
this.dat = dat
this.emitter = emitter
this.onPeerAdd = this.onPeerAdd.bind(this)
this.onPeerRemove = this.onPeerRemove.bind(this)
}
send (remoteId, message = {}) {
// get peer and assure support exists
var peer = this.getPeer(remoteId)
if (!getPeerProtocolStream(peer).remoteSupports('ephemeral')) {
return
}
// send
message = serialize(message)
getPeerFeedStream(peer).extension('ephemeral', message)
}
broadcast (message) {
// serialize message
message = serialize(message)
// send to all peers
var peers = this.hypercore.peers
for (let i = 0; i < peers.length; i++) {
this.send(peers[i], message)
}
}
listen () {
this.hypercore.on('peer-add', this.onPeerAdd)
this.hypercore.on('peer-remove', this.onPeerRemove)
}
unlisten () {
this.hypercore.removeListener('peer-add', this.onPeerAdd)
this.hypercore.removeListener('peer-remove', this.onPeerRemove)
}
get hypercore () {
// if dat is a hyperdrive, use the metadata hypercore
// otherwise assume dat is a hypercore already
return this.dat.metadata ? this.dat.metadata : this.dat
}
getPeer (remoteId) {
remoteId = toRemoteId(remoteId)
return this.hypercore.peers.find(p => isSameId(remoteId, toRemoteId(p)))
}
onPeerAdd (peer) {
getPeerFeedStream(peer).on('extension', (type, message) => {
// handle ephemeral messages only
if (type !== 'ephemeral') return
try {
// decode
message = encodings.EphemeralMessage.decode(message)
// emit
this.emitter.emit('message', this.dat, peer, message)
} catch (e) {
this.emitter.emit('received-bad-message', e, this.dat, peer, message)
}
})
}
onPeerRemove (peer) {
// unstore session data
delete this.sessionDatas[toStr(toRemoteId(peer))]
}
}
function serialize (message) {
if (Buffer.isBuffer(message)) {
return message // already encoded
}
// massage values
if (!message.payload) {
message.payload = Buffer.from([])
} else if (typeof message.payload === 'string') {
message.payload = Buffer.from(message.payload, 'utf8')
}
if (typeof message.contentType !== 'string') {
message.contentType = undefined
}
// serialize
return encodings.EphemeralMessage.encode(message)
}
function getPeerFeedStream (peer) {
if (!peer) return null
return peer.stream
}
function getPeerProtocolStream (peer) {
var feedStream = getPeerFeedStream(peer)
if (!feedStream) return null
return feedStream.stream
}
function getPeerRemoteId (peer) {
var protocolStream = getPeerProtocolStream(peer)
if (!protocolStream) return null
return protocolStream.remoteId
}
function toRemoteId (peer) {
if (peer && typeof peer === 'object') {
return getPeerRemoteId(peer)
}
return peer
}
function toStr (buf) {
if (!buf) return buf
if (Buffer.isBuffer(buf)) return buf.toString('hex')
return buf
}
function isSameId (a, b) {
if (!a || !b) return false
if (Buffer.isBuffer(a) && Buffer.isBuffer(b)) {
return a.equals(b)
}
return toStr(a) === toStr(b)
}
{
"name": "@beaker/dat-ephemeral-ext-msg",
"version": "0.0.1",
"description": "Methods for implementing DEP-0000: Ephemeral Messages (Extension Message)",
"main": "index.js",
"scripts": {
"test": "tape test.js",
"protobuf": "protocol-buffers schema.proto -o encodings.js"
},
"repository": {
"type": "git",
"url": "git+https://github.com/beakerbrowser/dat-ephemeral-ext-msg.git"
},
"keywords": [
"dat",
"extension",
"message",
"ephemeral"
],
"author": "Paul Frazee <pfrazee@gmail.com>",
"license": "MIT",
"bugs": {
"url": "https://github.com/beakerbrowser/dat-ephemeral-ext-msg/issues"
},
"homepage": "https://github.com/beakerbrowser/dat-ephemeral-ext-msg#readme",
"dependencies": {
"protocol-buffers-encodings": "^1.1.0"
},
"devDependencies": {
"hyperdrive": "^9.13.0",
"protocol-buffers": "^4.0.4",
"random-access-memory": "^3.0.0",
"tape": "^4.9.1"
}
}
message EphemeralMessage {
optional string contentType = 1;
required bytes payload = 2;
}
\ No newline at end of file
var tape = require('tape')
var hyperdrive = require('hyperdrive')
var ram = require('random-access-memory')
var {DatEphemeralExtMsg} = require('./')
tape('exchange ephemeral messages', function (t) {
// must use 2 instances to represent 2 different nodes
var srcEphemeral = new DatEphemeralExtMsg()
var cloneEphemeral = new DatEphemeralExtMsg()
var src = hyperdrive(ram)
var clone
src.on('ready', function () {
// generate source archive
t.ok(src.writable)
t.ok(src.metadata.writable)
t.ok(src.content.writable)
src.writeFile('/first.txt', 'number 1', function (err) {
t.error(err, 'no error')
src.writeFile('/second.txt', 'number 2', function (err) {
t.error(err, 'no error')
src.writeFile('/third.txt', 'number 3', function (err) {
t.error(err, 'no error')
t.same(src.version, 3, 'version correct')
// generate clone instance
clone = hyperdrive(ram, src.key)
clone.on('ready', startReplication)
})
})
})
})
function startReplication () {
// wire up archives
srcEphemeral.watchDat(src)
cloneEphemeral.watchDat(clone)
// listen to events
var messageEventCount1 = 0
srcEphemeral.on('message', onMessage1)
cloneEphemeral.on('message', onMessage1)
function onMessage1 (archive, peer, msg) {
if (archive === src) {
// received clone's data
t.same(msg.contentType, 'application/json', 'received clone data')
t.same(msg.payload.toString('utf8'), '"bar"', 'received clone data')
}
if (archive === clone) {
// received src's data
t.same(msg.contentType, 'application/json', 'received src data')
t.same(msg.payload.toString('utf8'), '"foo"', 'received src data')
}
if (++messageEventCount1 === 2) {
hasReceivedEvents1()
}
}
// start replication
var stream1 = clone.replicate({
id: new Buffer('clone-stream'),
live: true,
extensions: ['ephemeral']
})
var stream2 = src.replicate({
id: new Buffer('src-stream'),
live: true,
extensions: ['ephemeral']
})
stream1.pipe(stream2).pipe(stream1)
// wait for handshakes
var handshakeCount = 0
stream1.on('handshake', gotHandshake)
stream2.on('handshake', gotHandshake)
function gotHandshake () {
if (++handshakeCount !== 2) return
// has support
t.ok(srcEphemeral.hasSupport(src, src.metadata.peers[0]), 'clone has support')
t.ok(cloneEphemeral.hasSupport(clone, clone.metadata.peers[0]), 'src has support')
// send values
srcEphemeral.send(src, src.metadata.peers[0], {contentType: 'application/json', payload: '"foo"'})
cloneEphemeral.send(clone, clone.metadata.peers[0], {contentType: 'application/json', payload: '"bar"'})
}
function hasReceivedEvents1 () {
srcEphemeral.removeListener('message', onMessage1)
cloneEphemeral.removeListener('message', onMessage1)
// listen to new events
var messageEventCount2 = 0
srcEphemeral.on('message', onMessageEvent2)
cloneEphemeral.on('message', onMessageEvent2)
function onMessageEvent2 (archive, peer, msg) {
if (archive === src) {
// received clone's data
t.same(msg.contentType, 'application/octet-stream', 'received clone data')
t.ok(msg.payload.equals(Buffer.from([4,3,2,1])), 'received clone data')
}
if (archive === clone) {
// received src's data
t.same(msg.contentType, '', 'received src data')
t.ok(msg.payload.equals(Buffer.from([1,2,3,4])), 'received src data')
}
if (++messageEventCount2 === 2) {
hasReceivedEvents2()
}
}
// broadcast new values
srcEphemeral.broadcast(src, {payload: Buffer.from([1,2,3,4])})
cloneEphemeral.broadcast(clone, {contentType: 'application/octet-stream', payload: Buffer.from([4,3,2,1])})
}
function hasReceivedEvents2 () {
// unwatch
srcEphemeral.unwatchDat(src)
cloneEphemeral.unwatchDat(clone)
t.end()
}
}
})
tape('no peers causes no issue', function (t) {
var ephemeral = new DatEphemeralExtMsg()
var src = hyperdrive(ram)
src.on('ready', function () {
ephemeral.watchDat(src)
ephemeral.broadcast(src, {contentType: 'application/json', payload: '"test"'})
t.pass('no error thrown')
t.end()
})
})
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment