gRPC 是个高性能、静态类型、支持多语言、跨平台的远程调用库。

定义服务

首先通过 myapi.proto 文件来声明几个方法。

syntax = "proto3"; // proto 语法版本

package myapi; // 包名

service MyApi {
  // 客户端发送两个数字,服务端计算并响应总和
  rpc add (AddRequest) returns (Number) {}
  
  // 客户端发送数字流,服务端计算并响应总和
  rpc sum (stream Number) returns (Number) {}
  
  // 客户端发送一个数字,服务端响应一个每秒减一的数字流
  rpc countdown (Number) returns (stream Number) {}
  
  // 客户端发送一个文件块流,服务端实时响应进度流
  rpc uploadFile (stream FileChunk) returns (stream Progress) {}
}

message AddRequest {
  double numberA = 1;
  double numberB = 2;
}

message Number {
  double number = 1;
}

message FileChunk {
  bytes chunk = 1;
}

message Progress {
  double percent = 1;
}

创建服务端

import grpc from 'grpc'
import path from 'path'
import { add, sum, countdown, uploadFile } from './myApi.js'

const myapiProto = grpc.load(path.join(__dirname, '/../../myapi.proto'))

const server = new grpc.Server()
server.addService(myapiProto.myapi.MyApi.service, {
  add, sum, countdown, uploadFile
})

server.bind('0.0.0.0:5000', grpc.ServerCredentials.createInsecure())
server.start()

console.log('grpc server listening on port 5000')

创建客户端

import grpc from 'grpc'
import path from 'path'

const myapiProto = grpc.load(path.join(__dirname, '/../../myapi.proto'))
const client = new myapiProto.myapi.MyApi(
  '0.0.0.0:5000',
  grpc.credentials.createInsecure()
)

add()

客户端发送一个请求,服务端响应一个回复。(Unary RPC)

服务端

// myApi.js
function add (call, callback) {
  const { numberA, numberB } = call.request
  callback(null, numberA + numberB)
}

客户端

client.add({ numberA: 3, numberB: 4 }, (err, res) => {
  console.log('add', err, number)
})

客户端输出

add null { number: 7 }

sum()

客户端发送一个流请求,服务端响应一个回复。(Client streaming RPC)

服务端实现

// myApi.js
function sum (call, callback) {
  let sum = 0
  call.on('data', ({ number }) => { sum += number })
  call.on('end', () => callback(null, { number: sum }))
}

客户端

const sumCall = client.sum((err, number) => {
  console.log('sum', err, number)
})
sumCall.write(1)
sumCall.write(2)
sumCall.write(3)
sumCall.write(0.66)
sumCall.end()

客户端输出

sum null { number: 6.66 }

countdown()

客户端发送一个请求,服务端响应一个流回复。(Server streaming RPC)

服务端

// myApi.js
function countdown (call) {
  let { number } = call.request
  const timer = setInterval(() => {
    call.write({ number })
    if (--number < 0) {
      call.end()
      clearInterval(timer)
    }
  }, 1000)

  call.on('cancelled', () => clearInterval(timer)) // 断开连接时触发
}

客户端

const countdownCall = client.countdown({ number: 3 })
countdownCall.on('data', (number) => {
  console.log('countdown', Date.now(), number)
})

客户端输出

countdown 1515588588805 { number: 3 }
countdown 1515588589803 { number: 2 }
countdown 1515588590809 { number: 1 }
countdown 1515588591813 { number: 0 }

uploadFile()

客户端发送一个流请求,服务端响应一个流回复。(Bidirectional streaming RPC)

服务端

服务端通过 through2 创建一个 objectMode 流,将 gRPC 收到的包含 chunk 对象流,解包成字节流,交给 fs。

function uploadFile (call, callback) {
  const file = JSON.parse(call.metadata.getMap().file)

  let completedSize = 0
  call.pipe(through2.obj(({ chunk }, enc, next) => {
    completedSize += chunk.byteLength
    call.write({ percent: completedSize / file.size })
    next(null, chunk)
  }))
    .pipe(fs.createWriteStream(file.name))
    .on('finish', () => call.end())
}

客户端

客户端首先构造欲发送文件的元数据(metadata),包含文件名、大小。
随后再通过 through2 创建一个 objectMode 的流,将 fs 读入的字节流,打包成对象流,交给gRPC。

const filename = 'lovely.jpg'
const file = {
  name: filename,
  size: fs.statSync(filename).size
}
const metadata = new grpc.Metadata()
metadata.set('file', JSON.stringify(file))

const uploadFileCall = client.uploadFile(metadata)

fs.createReadStream(file.name)
  .pipe(through2.obj((chunk, enc, next) => next(null, { chunk })))
  .pipe(uploadFileCall)

uploadFileCall.on('data', ({ percent }) => {
  console.log('uploadFile', file.name, percent)
})
uploadFileCall.on('end', () => console.log('uploadFile', 'finish', file.name))

客户端输出

uploadFile lovely.jpg 0.3692543469196876
uploadFile lovely.jpg 0.7385086938393752
uploadFile lovely.jpg 1
uploadFile finish lovely.jpg

其他常用方法

取消调用

const call = client.add({ numberA: 3, numberB: 4 }, (err, number) => {
  console.log('add', err, number)
})
call.cancel()

超时

设置调用五秒超时:

const options = { deadline: Date.now() + 5000 }
const call = client.add({ numberA: 3, numberB: 4 }, options, (err, number) => {
  console.log('add', err, number)
})

元数据

客户端

const metadata = new grpc.Metadata()
metadata.set('token', '123456')
const options = { deadline: Date.now() + 5000 }
const call = client.add({ numberA: 3, numberB: 4 }, metadata, options, (err, number) => {
  console.log('add', err, number)
})
call.on('metadata', (metadata) => {
  console.log('add', 'metadata', metadata.getMap())
})

服务端

function add (call, callback) {
  const { numberA, numberB } = call.request
  const { token } = call.metadata.getMap()
  if (token !== 'abcdef') {
    const metadata = new grpc.Metadata()
    metadata.set('message', 'Invalid token')
    call.sendMetadata(metadata)
    callback()
    return
  }
  callback(null, numberA + numberB)
}

相关资料

Node.js gRPC Library
What is gRPC?