探索 Node.js 中的 gRPC
gRPC 是个高性能、静态类型、支持多语言、跨平台的远程调用库。
定义服务
首先通过 myapi.proto
文件来声明几个方法。
syntax = "proto3"; // proto 语法版本
package myapi; // 包名
service MyApi {
// 客户端发送两个数字,服务端计算并响应总和
rpc add (AddRequest) returns (Number) {}
// 客户端发送数字流,服务端计算并响应总和
rpc sum (stream Number) returns (Number) {}
// 客户端发送一个数字,服务端响应一个每秒减一的数字流
rpc countdown (Number) returns (stream Number) {}
// 客户端发送一个文件块流,服务端实时响应进度流
rpc uploadFile (stream FileChunk) returns (stream Progress) {}
}
message AddRequest {
double numberA = 1;
double numberB = 2;
}
message Number {
double number = 1;
}
message FileChunk {
bytes chunk = 1;
}
message Progress {
double percent = 1;
}
创建服务端
import grpc from 'grpc'
import path from 'path'
import { add, sum, countdown, uploadFile } from './myApi.js'
const myapiProto = grpc.load(path.join(__dirname, '/../../myapi.proto'))
const server = new grpc.Server()
server.addService(myapiProto.myapi.MyApi.service, {
add, sum, countdown, uploadFile
})
server.bind('0.0.0.0:5000', grpc.ServerCredentials.createInsecure())
server.start()
console.log('grpc server listening on port 5000')
创建客户端
import grpc from 'grpc'
import path from 'path'
const myapiProto = grpc.load(path.join(__dirname, '/../../myapi.proto'))
const client = new myapiProto.myapi.MyApi(
'0.0.0.0:5000',
grpc.credentials.createInsecure()
)
add()
客户端发送一个请求,服务端响应一个回复。(Unary RPC)
服务端
// myApi.js
function add (call, callback) {
const { numberA, numberB } = call.request
callback(null, numberA + numberB)
}
客户端
client.add({ numberA: 3, numberB: 4 }, (err, res) => {
console.log('add', err, number)
})
客户端输出
add null { number: 7 }
sum()
客户端发送一个流请求,服务端响应一个回复。(Client streaming RPC)
服务端实现
// myApi.js
function sum (call, callback) {
let sum = 0
call.on('data', ({ number }) => { sum += number })
call.on('end', () => callback(null, { number: sum }))
}
客户端
const sumCall = client.sum((err, number) => {
console.log('sum', err, number)
})
sumCall.write(1)
sumCall.write(2)
sumCall.write(3)
sumCall.write(0.66)
sumCall.end()
客户端输出
sum null { number: 6.66 }
countdown()
客户端发送一个请求,服务端响应一个流回复。(Server streaming RPC)
服务端
// myApi.js
function countdown (call) {
let { number } = call.request
const timer = setInterval(() => {
call.write({ number })
if (--number < 0) {
call.end()
clearInterval(timer)
}
}, 1000)
call.on('cancelled', () => clearInterval(timer)) // 断开连接时触发
}
客户端
const countdownCall = client.countdown({ number: 3 })
countdownCall.on('data', (number) => {
console.log('countdown', Date.now(), number)
})
客户端输出
countdown 1515588588805 { number: 3 }
countdown 1515588589803 { number: 2 }
countdown 1515588590809 { number: 1 }
countdown 1515588591813 { number: 0 }
uploadFile()
客户端发送一个流请求,服务端响应一个流回复。(Bidirectional streaming RPC)
服务端
服务端通过 through2 创建一个 objectMode 流,将 gRPC 收到的包含 chunk 对象流,解包成字节流,交给 fs。
function uploadFile (call, callback) {
const file = JSON.parse(call.metadata.getMap().file)
let completedSize = 0
call.pipe(through2.obj(({ chunk }, enc, next) => {
completedSize += chunk.byteLength
call.write({ percent: completedSize / file.size })
next(null, chunk)
}))
.pipe(fs.createWriteStream(file.name))
.on('finish', () => call.end())
}
客户端
客户端首先构造欲发送文件的元数据(metadata),包含文件名、大小。
随后再通过 through2 创建一个 objectMode 的流,将 fs 读入的字节流,打包成对象流,交给gRPC。
const filename = 'lovely.jpg'
const file = {
name: filename,
size: fs.statSync(filename).size
}
const metadata = new grpc.Metadata()
metadata.set('file', JSON.stringify(file))
const uploadFileCall = client.uploadFile(metadata)
fs.createReadStream(file.name)
.pipe(through2.obj((chunk, enc, next) => next(null, { chunk })))
.pipe(uploadFileCall)
uploadFileCall.on('data', ({ percent }) => {
console.log('uploadFile', file.name, percent)
})
uploadFileCall.on('end', () => console.log('uploadFile', 'finish', file.name))
客户端输出
uploadFile lovely.jpg 0.3692543469196876
uploadFile lovely.jpg 0.7385086938393752
uploadFile lovely.jpg 1
uploadFile finish lovely.jpg
其他常用方法
取消调用
const call = client.add({ numberA: 3, numberB: 4 }, (err, number) => {
console.log('add', err, number)
})
call.cancel()
超时
设置调用五秒超时:
const options = { deadline: Date.now() + 5000 }
const call = client.add({ numberA: 3, numberB: 4 }, options, (err, number) => {
console.log('add', err, number)
})
元数据
客户端
const metadata = new grpc.Metadata()
metadata.set('token', '123456')
const options = { deadline: Date.now() + 5000 }
const call = client.add({ numberA: 3, numberB: 4 }, metadata, options, (err, number) => {
console.log('add', err, number)
})
call.on('metadata', (metadata) => {
console.log('add', 'metadata', metadata.getMap())
})
服务端
function add (call, callback) {
const { numberA, numberB } = call.request
const { token } = call.metadata.getMap()
if (token !== 'abcdef') {
const metadata = new grpc.Metadata()
metadata.set('message', 'Invalid token')
call.sendMetadata(metadata)
callback()
return
}
callback(null, numberA + numberB)
}