深入理解Node.js全栈基础:MongoDB模块详解

发表时间: 2023-09-19 20:29

#秋日生活打卡季#

问题

看下面的代码,请问 db.command({ serverStatus: 1 }) 具体做了什么?是如何实现的?

假设你是一个mongodb小白,对于mongodb的操作完全不了解,那看到上面的代码,会好奇到底对应mongod执行了什么命令呢?

如果抛开node.js和mongodb模块的源码不管,最快的解决方案是直接搜索:

mongodb serverStatus

这几个关键词,能很快了解到它实际上对应了在mongosh里执行下面的命令:

db.serverStatus()

mongodb serverStatus函数参考文档:
https://www.mongodb.com/docs/manual/reference/method/db.serverStatus/

实践

但是具体mongodb模块是怎么实现的呢?我们可以深入源码看看。

先创建一个空白项目:

mkdir mongodb-testcd mongodb-testnpm init -ynpm i mongodbtouch test.js

test.js文件的内容如下:

(async function test() {  const mongoClient = require('mongodb').MongoClient;  client = await mongoClient.connect(`mongodb://127.0.0.1:27017`);  const db = client.db();  const result = await db.command({ serverStatus: 1 });  console.log(`command ok? ${result.ok === 1}`)  // 退出node.js进程  process.exit(0);})()

执行:

node test.js

执行结果如下:

➜  mongodb-test node test.jscommand ok? true

深入源码

源码仓库:
https://github.com/mongodb/node-mongodb-native

可以顺藤摸瓜找到具体实现在 src/db.ts 这个文件:

  /**   * Execute a command   *   * @remarks   * This command does not inherit options from the MongoClient.   *   * The driver will ensure the following fields are attached to the command sent to the server:   * - `lsid` - sourced from an implicit session or options.session   * - `$readPreference` - defaults to primary or can be configured by options.readPreference   * - `$db` - sourced from the name of this database   *   * If the client has a serverApi setting:   * - `apiVersion`   * - `apiStrict`   * - `apiDeprecationErrors`   *   * When in a transaction:   * - `readConcern` - sourced from readConcern set on the TransactionOptions   * - `writeConcern` - sourced from writeConcern set on the TransactionOptions   *   * Attaching any of the above fields to the command will have no effect as the driver will overwrite the value.   *   * @param command - The command to run   * @param options - Optional settings for the command   */  async command(command: Document, options?: RunCommandOptions): Promise<Document> {    // Intentionally, we do not inherit options from parent for this operation.    return executeOperation(      this.client,      new RunCommandOperation(this, command, {        ...resolveBSONOptions(options),        session: options?.session,        readPreference: options?.readPreference      })    );  }

executeOperation的函数定义

/** * Executes the given operation with provided arguments. * @internal * * @remarks * This method reduces large amounts of duplication in the entire codebase by providing * a single point for determining whether callbacks or promises should be used. Additionally * it allows for a single point of entry to provide features such as implicit sessions, which * are required by the Driver Sessions specification in the event that a ClientSession is * not provided * * @param topology - The topology to execute this operation on * @param operation - The operation to execute * @param callback - The command result callback */export function executeOperation<  T extends AbstractOperation<TResult>,  TResult = ResultTypeFromOperation<T>>(client: MongoClient, operation: T): Promise<TResult>;export function executeOperation<  T extends AbstractOperation<TResult>,  TResult = ResultTypeFromOperation<T>>(client: MongoClient, operation: T, callback: Callback<TResult>): void;export function executeOperation<  T extends AbstractOperation<TResult>,  TResult = ResultTypeFromOperation<T>>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void;export function executeOperation<  T extends AbstractOperation<TResult>,  TResult = ResultTypeFromOperation<T>>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void {  return maybeCallback(() => executeOperationAsync(client, operation), callback);}

RunCommandOperation的定义:

import type { BSONSerializeOptions, Document } from '../bson';import { type Db } from '../db';import { type TODO_NODE_3286 } from '../mongo_types';import type { ReadPreferenceLike } from '../read_preference';import type { Server } from '../sdam/server';import type { ClientSession } from '../sessions';import { MongoDBNamespace } from '../utils';import { AbstractOperation } from './operation';/** @public */export type RunCommandOptions = {  /** Specify ClientSession for this command */  session?: ClientSession;  /** The read preference */  readPreference?: ReadPreferenceLike;} & BSONSerializeOptions;/** @internal */export class RunCommandOperation<T = Document> extends AbstractOperation<T> {  constructor(parent: Db, public command: Document, public override options: RunCommandOptions) {    super(options);    this.ns = parent.s.namespace.withCollection('$cmd');  }  override async execute(server: Server, session: ClientSession | undefined): Promise<T> {    this.server = server;    return server.commandAsync(this.ns, this.command, {      ...this.options,      readPreference: this.readPreference,      session    }) as TODO_NODE_3286;  }}export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T> {  constructor(    public command: Document,    public override options: RunCommandOptions & {      noResponse?: boolean;      bypassPinningCheck?: boolean;    }  ) {    super(options);    this.ns = new MongoDBNamespace('admin', '$cmd');  }  override async execute(server: Server, session: ClientSession | undefined): Promise<T> {    this.server = server;    return server.commandAsync(this.ns, this.command, {      ...this.options,      readPreference: this.readPreference,      session    }) as TODO_NODE_3286;  }}

可以追踪到实现在 server.commandAsync 里。

    this.commandAsync = promisify(      (        ns: MongoDBNamespace,        cmd: Document,        options: CommandOptions,        // callback type defines Document result because result is never nullish when it succeeds, otherwise promise rejects        callback: (error: Error | undefined, result: Document) => void      ) => this.command(ns, cmd, options, callback as any)    );

commandAsync是Server类的属性,签名如下:

  commandAsync: (ns: MongoDBNamespace, cmd: Document, options: CommandOptions) => Promise<Document>;

又可以追溯到 this.command 函数:

继续追溯到 src/cmap/connection.ts:

      write(this, message, commandOptions, callback);

对应核心代码:

    conn[kMessageStream].writeCommand(command, operationDescription);

找到
src/cmap/message_stream.ts:

  writeCommand(    command: WriteProtocolMessageType,    operationDescription: OperationDescription  ): void {    const agreedCompressor = operationDescription.agreedCompressor ?? 'none';    if (agreedCompressor === 'none' || !canCompress(command)) {      const data = command.toBin();      this.push(Array.isArray(data) ? Buffer.concat(data) : data);      return;    }    // otherwise, compress the message    const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin());    const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);    // Extract information needed for OP_COMPRESSED from the uncompressed message    const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);    const options = {      agreedCompressor,      zlibCompressionLevel: operationDescription.zlibCompressionLevel ?? 0    };    // Compress the message body    compress(options, messageToBeCompressed).then(      compressedMessage => {        // Create the msgHeader of OP_COMPRESSED        const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);        msgHeader.writeInt32LE(          MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,          0        ); // messageLength        msgHeader.writeInt32LE(command.requestId, 4); // requestID        msgHeader.writeInt32LE(0, 8); // responseTo (zero)        msgHeader.writeInt32LE(OP_COMPRESSED, 12); // opCode        // Create the compression details of OP_COMPRESSED        const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);        compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode        compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader        compressionDetails.writeUInt8(Compressor[agreedCompressor], 8); // compressorID        this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));      },      error => {        operationDescription.cb(error);      }    );  }

至此,我们可以了解到mongodb命令的具体实现了。

核心原理:基于 stream 的 Duplex 类,核心类实现:

export class MessageStream extends Duplex {  /** @internal */  maxBsonMessageSize: number;  /** @internal */  [kBuffer]: BufferPool;  /** @internal */  isMonitoringConnection = false;  constructor(options: MessageStreamOptions = {}) {    super(options);    this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;    this[kBuffer] = new BufferPool();  }  get buffer(): BufferPool {    return this[kBuffer];  }  override _write(chunk: Buffer, _: unknown, callback: Callback<Buffer>): void {    this[kBuffer].append(chunk);    processIncomingData(this, callback);  }  override _read(/* size */): void {    // NOTE: This implementation is empty because we explicitly push data to be read    //       when `writeMessage` is called.    return;  }  writeCommand(    command: WriteProtocolMessageType,    operationDescription: OperationDescription  ): void {    const agreedCompressor = operationDescription.agreedCompressor ?? 'none';    if (agreedCompressor === 'none' || !canCompress(command)) {      const data = command.toBin();      this.push(Array.isArray(data) ? Buffer.concat(data) : data);      return;    }    // otherwise, compress the message    const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin());    const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);    // Extract information needed for OP_COMPRESSED from the uncompressed message    const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);    const options = {      agreedCompressor,      zlibCompressionLevel: operationDescription.zlibCompressionLevel ?? 0    };    // Compress the message body    compress(options, messageToBeCompressed).then(      compressedMessage => {        // Create the msgHeader of OP_COMPRESSED        const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);        msgHeader.writeInt32LE(          MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,          0        ); // messageLength        msgHeader.writeInt32LE(command.requestId, 4); // requestID        msgHeader.writeInt32LE(0, 8); // responseTo (zero)        msgHeader.writeInt32LE(OP_COMPRESSED, 12); // opCode        // Create the compression details of OP_COMPRESSED        const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);        compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode        compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader        compressionDetails.writeUInt8(Compressor[agreedCompressor], 8); // compressorID        this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));      },      error => {        operationDescription.cb(error);      }    );  }}