- 在【p2p、分布式,区块链笔记 Torrent】WebTorrent的上传和下载界面的示例中,主要通过
WebTorrent
类的add和seed函数实现相关功能。这两个函数都返回一个Torrent
类对象的实例。
seed函数
import createTorrent, { parseInput } from 'create-torrent' // "create-torrent":"^6.0.18"
/**
* Start seeding a new file/folder.
* @param {string|File|FileList|Buffer|Array.<string|File|Buffer>} input
* @param {Object=} opts
* @param {function=} onseed called when torrent is seeding
*/
seed (input, opts, onseed){if(this.destroyed) throw new Error('client is destroyed')if(typeof opts ==='function')[opts, onseed]=[{}, opts]
this._debug('seed')
opts = opts ? Object.assign({}, opts):{}
// no need to verify the hashes we create
opts.skipVerify =true
const isFilePath = typeof input ==='string'
// When seeding from fs path, initialize store from that path to avoid a copy
if(isFilePath) opts.path = path.dirname(input)if(!opts.createdBy) opts.createdBy =`WebTorrent/${VERSION_STR}`
const onTorrent = torrent =>{
const tasks =[
cb =>{
// when a filesystem path is specified or the store is preloaded, files are already in the FS store
if(isFilePath || opts.preloadedStore)return cb()
torrent.load(streams, cb)}]if(this.dht){
tasks.push(cb =>{
torrent.once('dhtAnnounce', cb)})}
parallel(tasks, err =>{if(this.destroyed)returnif(err)return torrent._destroy(err)
_onseed(torrent)})}
const _onseed = torrent =>{
this._debug('on seed')if(typeof onseed ==='function') onseed(torrent)
torrent.emit('seed')
this.emit('seed', torrent)}
const torrent = this.add(null, opts, onTorrent)let streams
if(isFileList(input)) input = Array.from(input)elseif(!Array.isArray(input)) input =[input]
parallel(input.map(item => async cb =>{if(!opts.preloadedStore && isReadable(item)){
const chunks =[]
try {for await (const chunk of item){
chunks.push(chunk)}} catch (err){return cb(err)}
const buf = concat(chunks)
buf.name = item.name
cb(null, buf)}else{
cb(null, item)}}), (err, input)=>{if(this.destroyed)returnif(err)return torrent._destroy(err)
parseInput(input, opts, (err, files)=>{if(this.destroyed)returnif(err)return torrent._destroy(err)
streams = files.map(file => file.getStream)
createTorrent(input, opts, async (err, torrentBuf)=>{if(this.destroyed)returnif(err)return torrent._destroy(err)
const existingTorrent = await this.get(torrentBuf)if(existingTorrent){
console.warn('A torrent with the same id is already being seeded')
torrent._destroy()if(typeof onseed ==='function') onseed(existingTorrent)}else{
torrent._onTorrentId(torrentBuf)}})})})return torrent
}
- 代码中的关键一句是:
const torrent =this.add(null, opts, onTorrent)
- 函数最终返回生成的 torrent 对象,其由add函数创建(第一个参数为null),代表正在进行 seeding 的种子。
- 这句代码实际上启动了种子(torrent)的创建和处理过程。它调用了
this.add()
方法,将opts
和onTorrent
回调传递进去,这些回调负责在种子完成处理时执行进一步的操作。 - 另一个关键的一句是
createTorrent(input, opts, async (err, torrentBuf)=>{……})
- 调用
createTorrent
生成种子文件,并通过 onTorrent 处理后续操作(如 DHT 发布广播等) - DHT 广播的过程发生在
onTorrent
回调中的这段代码部分:
if(this.dht){// DHT 是否启用
tasks.push(cb=>{// 将一个任务推送到任务队列tasks 中
torrent.once('dhtAnnounce', cb)// 监听torrent对象的dhtAnnounce事件。once意味着该事件处理器仅会触发一次,当dhtAnnounce事件发生时,执行回调 `cb`。})}
- 当种子(
torrent
)开始上传并与其他节点建立连接时,WebTorrent 会尝试将种子信息通过 DHT 广播出去,允许其他客户端在没有 Tracker 的情况下发现这个种子。 torrent.once('dhtAnnounce', cb)
监听的是这个广播完成后的通知,当种子成功通过 DHT 被宣布时,cb
被执行,表示广播成功。
parallel(tasks,err=>{if(this.destroyed)returnif(err)return torrent._destroy(err)_onseed(torrent)})
parallel(tasks, cb)
用来并行执行所有任务,确保所有操作(如文件加载、DHT 广播等)都执行完毕后再继续。- 如果没有错误发生,
_onseed(torrent)
会被调用,表示种子已经开始上传,并且相关事件会被触发。
add函数
/**
* Start downloading a new torrent. Aliased as `client.download`.
* @param {string|Buffer|Object} torrentId
* @param {Object} opts torrent-specific options
* @param {function=} ontorrent called when the torrent is ready (has metadata)
*/
add(torrentId, opts ={}, ontorrent =()=>{}){if(this.destroyed) throw new Error('client is destroyed')if(typeof opts ==='function')[opts, ontorrent]=[{}, opts]
const onInfoHash =()=>{if(this.destroyed)returnfor(const t of this.torrents){if(t.infoHash === torrent.infoHash && t !== torrent){
torrent._destroy(new Error(`Cannot add duplicate torrent ${torrent.infoHash}`))
ontorrent(t)return}}}
const onReady =()=>{if(this.destroyed)return
ontorrent(torrent)
this.emit('torrent', torrent)}functiononClose(){
torrent.removeListener('_infoHash', onInfoHash)
torrent.removeListener('ready', onReady)
torrent.removeListener('close', onClose)}
this._debug('add')
opts = opts ? Object.assign({}, opts):{}
const torrent = new Torrent(torrentId, this, opts) // Create a new Torrent instance using the provided torrentId, current client (`this`), and options (`opts`).
this.torrents.push(torrent) //Add the new torrent to the list of active torrents.
torrent.once('_infoHash', onInfoHash) //监听 _infoHash 事件来检查是否存在重复的种子。
torrent.once('ready', onReady) // 监听 ready 事件,当种子准备好时执行回调。
torrent.once('close', onClose) // 监听 close 事件,清理资源和事件监听器。
this.emit('add', torrent)return torrent
}
- 关键一句是:
const torrent =newTorrent(torrentId,this, opts)
- 这行代码负责创建一个新的
Torrent
实例,并将其添加到当前 WebTorrent 客户端中,开始处理指定的种子。Torrent
类定义在lib\torrent.js
中。相关接口与功能可见docs\api.md
的Torrent部分 - 以下是对
torrent
方法的简单总结,使用表格呈现:
方法名功能描述参数****返回值
torrent.destroy([opts], [callback])
删除种子,销毁与对等体的所有连接,并删除所有保存的文件元数据。
可以选择销毁存储的文件。
当完全销毁后调用
callback
。
opts
(可选): 配置销毁选项,例如是否销毁存储。
callback
(可选): 完成销毁后的回调。无
torrent.addPeer(peer)
向种子加入一个对等体。通常不需要手动调用,WebTorrent 会自动发现对等体。
手动添加时需确保
infoHash
事件已触发。
peer
: 对等体地址(如 “12.34.56.78:4444”)或
simple-peer
实例(WebRTC对等体)。
true
(添加成功)或
false
(被阻止,可能因黑名单)。
torrent.addWebSeed(urlOrConn)
向种子加入一个 Web Seed。
Web Seed 是通过 HTTP(S) 提供种子数据的源。
支持 URL 或自定义连接对象。
urlOrConn
: Web Seed URL 或自定义连接对象(实现 BitTorrent 协议的 Duplex 流,必须具有
connId
)。无
torrent.removePeer(peer)
从种子中移除一个对等体。WebTorrent 会自动移除慢速或没有所需数据的对等体。
手动移除时指定对等体标识。
peer
: 对等体地址(如 “ip:port”)、peer id(hex 字符串)或
simple-peer
实例。无
torrent.select(start, end, [priority], [notify])
选择一范围的数据块,优先下载该范围内的数据。
可以指定优先级和回调。
start
,
end
: 数据块范围(包含)。
priority
(可选): 优先级。
notify
(可选): 更新时触发的回调。无
torrent.deselect(start, end)
取消选择一范围的数据块,从而降低优先级。
start
,
end
: 数据块范围(包含)。无
torrent.critical(start, end)
将一范围的数据块标记为关键优先级,要求尽快下载。
start
,
end
: 数据块范围(包含)。无
torrent.pause()
暂停连接新的对等体。
此操作不影响现有连接或流。无无
torrent.resume()
恢复与新对等体的连接。无恢复连接新的对等体,开始与更多对等体交换数据。
torrent.rescanFiles([callback])
扫描文件并验证存储中的每个数据块的哈希值,更新已下载的有效数据块的位图。通常用于外部进程添加文件时,确保 WebTorrent 识别这些文件。
完成后会调用回调。
callback
(可选): 扫描完成时的回调函数,
callback(err)
。无
torrent.on('infoHash', function () {})
当种子的
infoHash
确定时触发该事件。无无
torrent.on('metadata', function () {})
当种子的元数据(包括
.torrent
文件的内容)确定时触发该事件。无无
torrent.on('ready', function () {})
当种子准备好使用时触发该事件,表示元数据已就绪且存储准备好。无无
torrent.on('warning', function (err) {})
当种子遇到警告时触发该事件。此事件仅用于调试,不一定需要监听。
err
: 警告信息。无
torrent.on('error', function (err) {})
当种子遇到致命错误时触发该事件,种子会被自动销毁并从客户端移除。
err
: 错误信息。无
torrent.on('done', function () {})
当所有种子文件下载完成时触发该事件。无当所有文件下载完毕时触发,通常用于通知用户下载完成。
torrent.on('download', function (bytes) {})
当数据被下载时触发该事件,用于报告当前种子的下载进度。
bytes
: 本次下载的字节数。用于监控下载进度,返回已下载的字节数,并可查询总下载量、下载速度等。
torrent.on('upload', function (bytes) {})
当数据被上传时触发该事件,用于报告当前种子的上传进度。
bytes
: 本次上传的字节数。用于监控上传进度,返回已上传的字节数。
torrent.on('wire', function (wire) {})
每当一个新的对等体连接时触发该事件,
wire
是一个
bittorrent-protocol
实现的 Duplex 流。可以使用它来扩展 BitTorrent 协议或进行其他自定义操作。
wire
: 与对等体连接的流对象。用于处理与对等体的通信,可扩展 BitTorrent 协议或处理自定义协议。
torrent.on('noPeers', function (announceType) {})
每隔几秒当没有找到对等体时触发。
announceType
指明了导致该事件触发的公告类型,可能是
'tracker'
、
'dht'
、
'lsd'
或
'ut_pex'
。如果尝试通过多个方式发现对等体,如跟踪器、DHT、LSD 或 PEX,将会为每种公告分别触发此事件。
announceType
: 字符串,指示公告类型,可以是
'tracker'
、
'dht'
、
'lsd'
、或
'ut_pex'
。当没有对等体可用且公告方式不同(例如 tracker、DHT、LSD 或 PEX)时触发此事件。
torrent.on('verified', function (index) {})
每当一个数据块被验证时触发。事件的参数是被验证数据块的索引。
index
: 整数,表示被验证的块的索引。每次数据块的哈希值被验证时触发此事件。
torrent & wire
- torrent.on(‘wire’, function (wire) {}) 是在 WebTorrent 客户端中,当一个新的对等体(peer)连接时,触发的事件。当这个事件被触发时,wire 会作为参数传递给回调函数。wire 是一个 bittorrent-protocol 的实例,这是一个用于实现 BitTorrent 协议的 duplex 流(双向流),允许客户端与远程对等体(peer)进行数据交换。
- 以下是一个示例:
import MyExtension from'./my-extension'
torrent1.on('wire',(wire, addr)=>{
console.log('connected to peer with address '+ addr)
wire.use(MyExtension)})
- 实际上
torrent
中管理者相关的链接对象,如以下代码所示:
client.add(opts.magnetURI,torrent =>{
const wire = torrent.wires[0];
wire.use(MyExtension());
wire.MyExtension.on('hack_message', func);})
client.seed(buf, torrent =>{
torrent.on('wire', (wire, addr)=>{
wire.use(MyExtension());
wire.MyExtension.on('hack_message', func);}})
wires
是一个数组,包含与该种子相关的所有 wire 对象。每个 wire 代表与一个对等体(peer)之间的连接。torrent.wires[0]
代表与第一个对等体(peer)建立的连接(wire)。
版权归原作者 FakeOccupational 所有, 如有侵权,请联系我们删除。