#秋日生活打卡季#
看下面的代码,请问 db.command({ serverStatus: 1 }) 具体做了什么?是如何实现的?
假设你是一个mongodb小白,对于mongodb的操作完全不了解,那看到上面的代码,会好奇到底对应mongod执行了什么命令呢?
如果抛开node.js和mongodb模块的源码不管,最快的解决方案是直接搜索:
mongodb serverStatus
这几个关键词,能很快了解到它实际上对应了在mongosh里执行下面的命令:
db.serverStatus()
mongodb serverStatus函数参考文档:
https://www.mongodb.com/docs/manual/reference/method/db.serverStatus/
但是具体mongodb模块是怎么实现的呢?我们可以深入源码看看。
先创建一个空白项目:
mkdir mongodb-testcd mongodb-testnpm init -ynpm i mongodbtouch test.js
test.js文件的内容如下:
(async function test() { const mongoClient = require('mongodb').MongoClient; client = await mongoClient.connect(`mongodb://127.0.0.1:27017`); const db = client.db(); const result = await db.command({ serverStatus: 1 }); console.log(`command ok? ${result.ok === 1}`) // 退出node.js进程 process.exit(0);})()
执行:
node test.js
执行结果如下:
➜ mongodb-test node test.jscommand ok? true
源码仓库:
https://github.com/mongodb/node-mongodb-native
可以顺藤摸瓜找到具体实现在 src/db.ts 这个文件:
/** * Execute a command * * @remarks * This command does not inherit options from the MongoClient. * * The driver will ensure the following fields are attached to the command sent to the server: * - `lsid` - sourced from an implicit session or options.session * - `$readPreference` - defaults to primary or can be configured by options.readPreference * - `$db` - sourced from the name of this database * * If the client has a serverApi setting: * - `apiVersion` * - `apiStrict` * - `apiDeprecationErrors` * * When in a transaction: * - `readConcern` - sourced from readConcern set on the TransactionOptions * - `writeConcern` - sourced from writeConcern set on the TransactionOptions * * Attaching any of the above fields to the command will have no effect as the driver will overwrite the value. * * @param command - The command to run * @param options - Optional settings for the command */ async command(command: Document, options?: RunCommandOptions): Promise<Document> { // Intentionally, we do not inherit options from parent for this operation. return executeOperation( this.client, new RunCommandOperation(this, command, { ...resolveBSONOptions(options), session: options?.session, readPreference: options?.readPreference }) ); }
executeOperation的函数定义:
/** * Executes the given operation with provided arguments. * @internal * * @remarks * This method reduces large amounts of duplication in the entire codebase by providing * a single point for determining whether callbacks or promises should be used. Additionally * it allows for a single point of entry to provide features such as implicit sessions, which * are required by the Driver Sessions specification in the event that a ClientSession is * not provided * * @param topology - The topology to execute this operation on * @param operation - The operation to execute * @param callback - The command result callback */export function executeOperation< T extends AbstractOperation<TResult>, TResult = ResultTypeFromOperation<T>>(client: MongoClient, operation: T): Promise<TResult>;export function executeOperation< T extends AbstractOperation<TResult>, TResult = ResultTypeFromOperation<T>>(client: MongoClient, operation: T, callback: Callback<TResult>): void;export function executeOperation< T extends AbstractOperation<TResult>, TResult = ResultTypeFromOperation<T>>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void;export function executeOperation< T extends AbstractOperation<TResult>, TResult = ResultTypeFromOperation<T>>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void { return maybeCallback(() => executeOperationAsync(client, operation), callback);}
RunCommandOperation的定义:
import type { BSONSerializeOptions, Document } from '../bson';import { type Db } from '../db';import { type TODO_NODE_3286 } from '../mongo_types';import type { ReadPreferenceLike } from '../read_preference';import type { Server } from '../sdam/server';import type { ClientSession } from '../sessions';import { MongoDBNamespace } from '../utils';import { AbstractOperation } from './operation';/** @public */export type RunCommandOptions = { /** Specify ClientSession for this command */ session?: ClientSession; /** The read preference */ readPreference?: ReadPreferenceLike;} & BSONSerializeOptions;/** @internal */export class RunCommandOperation<T = Document> extends AbstractOperation<T> { constructor(parent: Db, public command: Document, public override options: RunCommandOptions) { super(options); this.ns = parent.s.namespace.withCollection('$cmd'); } override async execute(server: Server, session: ClientSession | undefined): Promise<T> { this.server = server; return server.commandAsync(this.ns, this.command, { ...this.options, readPreference: this.readPreference, session }) as TODO_NODE_3286; }}export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T> { constructor( public command: Document, public override options: RunCommandOptions & { noResponse?: boolean; bypassPinningCheck?: boolean; } ) { super(options); this.ns = new MongoDBNamespace('admin', '$cmd'); } override async execute(server: Server, session: ClientSession | undefined): Promise<T> { this.server = server; return server.commandAsync(this.ns, this.command, { ...this.options, readPreference: this.readPreference, session }) as TODO_NODE_3286; }}
可以追踪到实现在 server.commandAsync 里。
this.commandAsync = promisify( ( ns: MongoDBNamespace, cmd: Document, options: CommandOptions, // callback type defines Document result because result is never nullish when it succeeds, otherwise promise rejects callback: (error: Error | undefined, result: Document) => void ) => this.command(ns, cmd, options, callback as any) );
commandAsync是Server类的属性,签名如下:
commandAsync: (ns: MongoDBNamespace, cmd: Document, options: CommandOptions) => Promise<Document>;
又可以追溯到 this.command 函数:
继续追溯到 src/cmap/connection.ts:
write(this, message, commandOptions, callback);
对应核心代码:
conn[kMessageStream].writeCommand(command, operationDescription);
找到
src/cmap/message_stream.ts:
writeCommand( command: WriteProtocolMessageType, operationDescription: OperationDescription ): void { const agreedCompressor = operationDescription.agreedCompressor ?? 'none'; if (agreedCompressor === 'none' || !canCompress(command)) { const data = command.toBin(); this.push(Array.isArray(data) ? Buffer.concat(data) : data); return; } // otherwise, compress the message const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin()); const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE); // Extract information needed for OP_COMPRESSED from the uncompressed message const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12); const options = { agreedCompressor, zlibCompressionLevel: operationDescription.zlibCompressionLevel ?? 0 }; // Compress the message body compress(options, messageToBeCompressed).then( compressedMessage => { // Create the msgHeader of OP_COMPRESSED const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE); msgHeader.writeInt32LE( MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, 0 ); // messageLength msgHeader.writeInt32LE(command.requestId, 4); // requestID msgHeader.writeInt32LE(0, 8); // responseTo (zero) msgHeader.writeInt32LE(OP_COMPRESSED, 12); // opCode // Create the compression details of OP_COMPRESSED const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE); compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader compressionDetails.writeUInt8(Compressor[agreedCompressor], 8); // compressorID this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage])); }, error => { operationDescription.cb(error); } ); }
至此,我们可以了解到mongodb命令的具体实现了。
核心原理:基于 stream 的 Duplex 类,核心类实现:
export class MessageStream extends Duplex { /** @internal */ maxBsonMessageSize: number; /** @internal */ [kBuffer]: BufferPool; /** @internal */ isMonitoringConnection = false; constructor(options: MessageStreamOptions = {}) { super(options); this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize; this[kBuffer] = new BufferPool(); } get buffer(): BufferPool { return this[kBuffer]; } override _write(chunk: Buffer, _: unknown, callback: Callback<Buffer>): void { this[kBuffer].append(chunk); processIncomingData(this, callback); } override _read(/* size */): void { // NOTE: This implementation is empty because we explicitly push data to be read // when `writeMessage` is called. return; } writeCommand( command: WriteProtocolMessageType, operationDescription: OperationDescription ): void { const agreedCompressor = operationDescription.agreedCompressor ?? 'none'; if (agreedCompressor === 'none' || !canCompress(command)) { const data = command.toBin(); this.push(Array.isArray(data) ? Buffer.concat(data) : data); return; } // otherwise, compress the message const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin()); const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE); // Extract information needed for OP_COMPRESSED from the uncompressed message const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12); const options = { agreedCompressor, zlibCompressionLevel: operationDescription.zlibCompressionLevel ?? 0 }; // Compress the message body compress(options, messageToBeCompressed).then( compressedMessage => { // Create the msgHeader of OP_COMPRESSED const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE); msgHeader.writeInt32LE( MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, 0 ); // messageLength msgHeader.writeInt32LE(command.requestId, 4); // requestID msgHeader.writeInt32LE(0, 8); // responseTo (zero) msgHeader.writeInt32LE(OP_COMPRESSED, 12); // opCode // Create the compression details of OP_COMPRESSED const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE); compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader compressionDetails.writeUInt8(Compressor[agreedCompressor], 8); // compressorID this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage])); }, error => { operationDescription.cb(error); } ); }}