diff --git a/lib/vscode/src/vs/base/parts/ipc/common/ipc.net.ts b/lib/vscode/src/vs/base/parts/ipc/common/ipc.net.ts index 777d4379f..dcce7c0ef 100644 --- a/lib/vscode/src/vs/base/parts/ipc/common/ipc.net.ts +++ b/lib/vscode/src/vs/base/parts/ipc/common/ipc.net.ts @@ -743,6 +743,11 @@ export class PersistentProtocol implements IMessagePassingProtocol { }, Math.max(ProtocolConstants.KeepAliveTimeoutTime - timeSinceLastIncomingMsg, 0) + 5); } + // NOTE@coder: Set the socket without initiating a reconnect. + public setSocket(socket: ISocket): void { + this._socket = socket; + } + public getSocket(): ISocket { return this._socket; } diff --git a/lib/vscode/src/vs/server/entry.ts b/lib/vscode/src/vs/server/entry.ts index 901bcda63..2f458f403 100644 --- a/lib/vscode/src/vs/server/entry.ts +++ b/lib/vscode/src/vs/server/entry.ts @@ -6,7 +6,9 @@ import { logger } from 'vs/server/node/logger'; import { enableCustomMarketplace } from 'vs/server/node/marketplace'; import { Vscode } from 'vs/server/node/server'; -setUnexpectedErrorHandler((error) => logger.warn(error instanceof Error ? error.message : error)); +setUnexpectedErrorHandler((error) => { + logger.warn('Uncaught error', field('error', error instanceof Error ? error.message : error)); +}); enableCustomMarketplace(); proxyAgent.monkeyPatch(true); diff --git a/lib/vscode/src/vs/server/node/connection.ts b/lib/vscode/src/vs/server/node/connection.ts index 3b4d54ab9..ef5cecdaa 100644 --- a/lib/vscode/src/vs/server/node/connection.ts +++ b/lib/vscode/src/vs/server/node/connection.ts @@ -3,31 +3,49 @@ import * as cp from 'child_process'; import { VSBuffer } from 'vs/base/common/buffer'; import { Emitter } from 'vs/base/common/event'; import { FileAccess } from 'vs/base/common/network'; -import { ISocket } from 'vs/base/parts/ipc/common/ipc.net'; -import { WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net'; import { INativeEnvironmentService } from 'vs/platform/environment/common/environment'; +import { IRemoteExtensionHostStartParams } from 'vs/platform/remote/common/remoteAgentConnection'; import { getNlsConfiguration } from 'vs/server/node/nls'; import { Protocol } from 'vs/server/node/protocol'; import { IExtHostReadyMessage } from 'vs/workbench/services/extensions/common/extensionHostProtocol'; export abstract class Connection { private readonly _onClose = new Emitter(); + /** + * Fire when the connection is closed (not just disconnected). This should + * only happen when the connection is offline and old or has an error. + */ public readonly onClose = this._onClose.event; private disposed = false; private _offline: number | undefined; - public constructor(protected protocol: Protocol, public readonly token: string) {} + protected readonly logger: Logger; + + public constructor( + protected readonly protocol: Protocol, + public readonly name: string, + ) { + this.logger = logger.named( + this.name, + field('token', this.protocol.options.reconnectionToken), + ); + + this.logger.debug('Connecting...'); + this.onClose(() => this.logger.debug('Closed')); + } public get offline(): number | undefined { return this._offline; } - public reconnect(socket: ISocket, buffer: VSBuffer): void { + public reconnect(protocol: Protocol): void { + this.logger.debug('Reconnecting...'); this._offline = undefined; - this.doReconnect(socket, buffer); + this.doReconnect(protocol); } - public dispose(): void { + public dispose(reason?: string): void { + this.logger.debug('Disposing...', field('reason', reason)); if (!this.disposed) { this.disposed = true; this.doDispose(); @@ -36,6 +54,7 @@ export abstract class Connection { } protected setOffline(): void { + this.logger.debug('Disconnected'); if (!this._offline) { this._offline = Date.now(); } @@ -44,7 +63,11 @@ export abstract class Connection { /** * Set up the connection on a new socket. */ - protected abstract doReconnect(socket: ISocket, buffer: VSBuffer): void; + protected abstract doReconnect(protcol: Protocol): void; + + /** + * Dispose/destroy everything permanently. + */ protected abstract doDispose(): void; } @@ -52,21 +75,22 @@ export abstract class Connection { * Used for all the IPC channels. */ export class ManagementConnection extends Connection { - public constructor(protected protocol: Protocol, token: string) { - super(protocol, token); + public constructor(protocol: Protocol) { + super(protocol, 'management'); protocol.onDidDispose(() => this.dispose()); // Explicit close. protocol.onSocketClose(() => this.setOffline()); // Might reconnect. + protocol.sendMessage({ type: 'ok' }); } protected doDispose(): void { - this.protocol.sendDisconnect(); - this.protocol.dispose(); - this.protocol.getUnderlyingSocket().destroy(); + this.protocol.destroy(); } - protected doReconnect(socket: ISocket, buffer: VSBuffer): void { - this.protocol.beginAcceptReconnection(socket, buffer); + protected doReconnect(protocol: Protocol): void { + protocol.sendMessage({ type: 'ok' }); + this.protocol.beginAcceptReconnection(protocol.getSocket(), protocol.readEntireBuffer()); this.protocol.endAcceptReconnection(); + protocol.dispose(); } } @@ -85,55 +109,62 @@ type ExtHostMessage = DisconnectedMessage | ConsoleMessage | IExtHostReadyMessag export class ExtensionHostConnection extends Connection { private process?: cp.ChildProcess; - private readonly logger: Logger; public constructor( - locale: string, protocol: Protocol, buffer: VSBuffer, token: string, + protocol: Protocol, + private readonly params: IRemoteExtensionHostStartParams, private readonly environment: INativeEnvironmentService, ) { - super(protocol, token); - this.logger = logger.named('exthost', field('token', token)); - this.protocol.dispose(); - this.spawn(locale, buffer).then((p) => this.process = p); - this.protocol.getUnderlyingSocket().pause(); + super(protocol, 'exthost'); + + protocol.sendMessage({ debugPort: this.params.port }); + const buffer = protocol.readEntireBuffer(); + const inflateBytes = protocol.inflateBytes; + protocol.dispose(); + protocol.getUnderlyingSocket().pause(); + + this.spawn(buffer, inflateBytes).then((p) => this.process = p); } protected doDispose(): void { + this.protocol.destroy(); if (this.process) { this.process.kill(); } - this.protocol.getUnderlyingSocket().destroy(); } - protected doReconnect(socket: ISocket, buffer: VSBuffer): void { - // This is just to set the new socket. - this.protocol.beginAcceptReconnection(socket, null); - this.protocol.dispose(); - this.sendInitMessage(buffer); + protected doReconnect(protocol: Protocol): void { + protocol.sendMessage({ debugPort: this.params.port }); + const buffer = protocol.readEntireBuffer(); + const inflateBytes = protocol.inflateBytes; + protocol.dispose(); + protocol.getUnderlyingSocket().pause(); + this.protocol.setSocket(protocol.getSocket()); + + this.sendInitMessage(buffer, inflateBytes); } - private sendInitMessage(buffer: VSBuffer): void { - const socket = this.protocol.getUnderlyingSocket(); - socket.pause(); + private sendInitMessage(buffer: VSBuffer, inflateBytes: Uint8Array | undefined): void { + if (!this.process) { + throw new Error('Tried to initialize VS Code before spawning'); + } - const wrapperSocket = this.protocol.getSocket(); + this.logger.debug('Sending socket'); - this.logger.trace('Sending socket'); - this.process!.send({ // Process must be set at this point. + // TODO: Do something with the debug port. + this.process.send({ type: 'VSCODE_EXTHOST_IPC_SOCKET', initialDataChunk: Buffer.from(buffer.buffer).toString('base64'), - skipWebSocketFrames: !(wrapperSocket instanceof WebSocketNodeSocket), + skipWebSocketFrames: this.protocol.options.skipWebSocketFrames, permessageDeflate: this.protocol.options.permessageDeflate, - inflateBytes: wrapperSocket instanceof WebSocketNodeSocket - ? Buffer.from(wrapperSocket.recordedInflateBytes.buffer).toString('base64') - : undefined, - }, socket); + inflateBytes: inflateBytes ? Buffer.from(inflateBytes).toString('base64') : undefined, + }, this.protocol.getUnderlyingSocket()); } - private async spawn(locale: string, buffer: VSBuffer): Promise { - this.logger.trace('Getting NLS configuration...'); - const config = await getNlsConfiguration(locale, this.environment.userDataPath); - this.logger.trace('Spawning extension host...'); + private async spawn(buffer: VSBuffer, inflateBytes: Uint8Array | undefined): Promise { + this.logger.debug('Getting NLS configuration...'); + const config = await getNlsConfiguration(this.params.language, this.environment.userDataPath); + this.logger.debug('Spawning extension host...'); const proc = cp.fork( FileAccess.asFileUri('bootstrap-fork', require).fsPath, // While not technically necessary, makes it easier to tell which process @@ -162,7 +193,7 @@ export class ExtensionHostConnection extends Connection { this.dispose(); }); proc.on('exit', (code) => { - this.logger.trace('Exited', field('code', code)); + this.logger.debug('Exited', field('code', code)); this.dispose(); }); if (proc.stdout && proc.stderr) { @@ -181,12 +212,12 @@ export class ExtensionHostConnection extends Connection { } break; case 'VSCODE_EXTHOST_DISCONNECTED': - this.logger.trace('Going offline'); + this.logger.debug('Got disconnected message'); this.setOffline(); break; case 'VSCODE_EXTHOST_IPC_READY': - this.logger.trace('Got ready message'); - this.sendInitMessage(buffer); + this.logger.debug('Handshake completed'); + this.sendInitMessage(buffer, inflateBytes); break; default: this.logger.error('Unexpected message', field('event', event)); @@ -194,7 +225,7 @@ export class ExtensionHostConnection extends Connection { } }); - this.logger.trace('Waiting for handshake...'); + this.logger.debug('Waiting for handshake...'); return proc; } } diff --git a/lib/vscode/src/vs/server/node/protocol.ts b/lib/vscode/src/vs/server/node/protocol.ts index 15fa414e4..fd61d4c95 100644 --- a/lib/vscode/src/vs/server/node/protocol.ts +++ b/lib/vscode/src/vs/server/node/protocol.ts @@ -1,21 +1,31 @@ -import { field } from '@coder/logger'; +import { field, logger, Logger } from '@coder/logger'; import * as net from 'net'; import { VSBuffer } from 'vs/base/common/buffer'; import { PersistentProtocol } from 'vs/base/parts/ipc/common/ipc.net'; import { NodeSocket, WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net'; import { AuthRequest, ConnectionTypeRequest, HandshakeMessage } from 'vs/platform/remote/common/remoteAgentConnection'; -import { logger } from 'vs/server/node/logger'; export interface SocketOptions { + /** The token is how we identify and connect to existing sessions. */ readonly reconnectionToken: string; + /** Specifies that the client is trying to reconnect. */ readonly reconnection: boolean; + /** If true assume this is not a web socket (always false for code-server). */ readonly skipWebSocketFrames: boolean; + /** Whether to support compression (web socket only). */ readonly permessageDeflate?: boolean; + /** + * Seed zlib with these bytes (web socket only). If parts of inflating was + * done in a different zlib instance we need to pass all those bytes into zlib + * otherwise the inflate might hit an inflated portion referencing a distance + * too far back. + */ readonly inflateBytes?: VSBuffer; - readonly recordInflateBytes?: boolean; } export class Protocol extends PersistentProtocol { + private readonly logger: Logger; + public constructor(socket: net.Socket, public readonly options: SocketOptions) { super( options.skipWebSocketFrames @@ -24,9 +34,12 @@ export class Protocol extends PersistentProtocol { new NodeSocket(socket), options.permessageDeflate || false, options.inflateBytes || null, - options.recordInflateBytes || false, + // Always record inflate bytes if using permessage-deflate. + options.permessageDeflate || false, ), ); + + this.logger = logger.named('protocol', field('token', this.options.reconnectionToken)); } public getUnderlyingSocket(): net.Socket { @@ -40,31 +53,44 @@ export class Protocol extends PersistentProtocol { * Perform a handshake to get a connection request. */ public handshake(): Promise { - logger.trace('Protocol handshake', field('token', this.options.reconnectionToken)); + this.logger.debug('Initiating handshake...'); + return new Promise((resolve, reject) => { + const cleanup = () => { + handler.dispose(); + onClose.dispose(); + clearTimeout(timeout); + }; + + const onClose = this.onSocketClose(() => { + cleanup(); + this.logger.debug('Handshake failed'); + reject(new Error('Protocol socket closed unexpectedly')); + }); + const timeout = setTimeout(() => { - logger.error('Handshake timed out', field('token', this.options.reconnectionToken)); - reject(new Error('timed out')); + cleanup(); + this.logger.debug('Handshake timed out'); + reject(new Error('Protocol handshake timed out')); }, 10000); // Matches the client timeout. const handler = this.onControlMessage((rawMessage) => { try { const raw = rawMessage.toString(); - logger.trace('Protocol message', field('token', this.options.reconnectionToken), field('message', raw)); + this.logger.trace('Got message', field('message', raw)); const message = JSON.parse(raw); switch (message.type) { case 'auth': return this.authenticate(message); case 'connectionType': - handler.dispose(); - clearTimeout(timeout); + cleanup(); + this.logger.debug('Handshake completed'); return resolve(message); default: throw new Error('Unrecognized message type'); } } catch (error) { - handler.dispose(); - clearTimeout(timeout); + cleanup(); reject(error); } }); @@ -90,10 +116,38 @@ export class Protocol extends PersistentProtocol { } /** - * Send a handshake message. In the case of the extension host, it just sends - * back a debug port. + * Send a handshake message. In the case of the extension host it should just + * send a debug port. */ - public sendMessage(message: HandshakeMessage | { debugPort?: number } ): void { + public sendMessage(message: HandshakeMessage | { debugPort?: number | null } ): void { this.sendControl(VSBuffer.fromString(JSON.stringify(message))); } + + /** + * Disconnect and dispose everything including the underlying socket. + */ + public destroy(reason?: string): void { + try { + if (reason) { + this.sendMessage({ type: 'error', reason }); + } + // If still connected try notifying the client. + this.sendDisconnect(); + } catch (error) { + // I think the write might fail if already disconnected. + this.logger.warn(error.message || error); + } + this.dispose(); // This disposes timers and socket event handlers. + this.getSocket().dispose(); // This will destroy() the socket. + } + + /** + * Get inflateBytes from the current socket. + */ + public get inflateBytes(): Uint8Array | undefined { + const socket = this.getSocket(); + return socket instanceof WebSocketNodeSocket + ? socket.recordedInflateBytes.buffer + : undefined; + } } diff --git a/lib/vscode/src/vs/server/node/server.ts b/lib/vscode/src/vs/server/node/server.ts index a44cbd0bb..cf7faf370 100644 --- a/lib/vscode/src/vs/server/node/server.ts +++ b/lib/vscode/src/vs/server/node/server.ts @@ -1,4 +1,3 @@ -import { field } from '@coder/logger'; import * as fs from 'fs'; import * as net from 'net'; import { release } from 'os'; @@ -123,14 +122,11 @@ export class Vscode { reconnection: query.reconnection === 'true', skipWebSocketFrames: query.skipWebSocketFrames === 'true', permessageDeflate, - recordInflateBytes: permessageDeflate, }); try { await this.connect(await protocol.handshake(), protocol); } catch (error) { - protocol.sendMessage({ type: 'error', reason: error.message }); - protocol.dispose(); - protocol.getSocket().dispose(); + protocol.destroy(error.message); } return true; } @@ -143,56 +139,61 @@ export class Vscode { switch (message.desiredConnectionType) { case ConnectionType.ExtensionHost: case ConnectionType.Management: + // Initialize connection map for this type of connection. if (!this.connections.has(message.desiredConnectionType)) { this.connections.set(message.desiredConnectionType, new Map()); } const connections = this.connections.get(message.desiredConnectionType)!; - const ok = async () => { - return message.desiredConnectionType === ConnectionType.ExtensionHost - ? { debugPort: await this.getDebugPort() } - : { type: 'ok' }; - }; - const token = protocol.options.reconnectionToken; - if (protocol.options.reconnection && connections.has(token)) { - protocol.sendMessage(await ok()); - const buffer = protocol.readEntireBuffer(); - protocol.dispose(); - return connections.get(token)!.reconnect(protocol.getSocket(), buffer); - } else if (protocol.options.reconnection || connections.has(token)) { - throw new Error(protocol.options.reconnection - ? 'Unrecognized reconnection token' - : 'Duplicate reconnection token' - ); + let connection = connections.get(token); + if (protocol.options.reconnection && connection) { + return connection.reconnect(protocol); } - logger.debug('New connection', field('token', token)); - protocol.sendMessage(await ok()); + // This probably means the process restarted so the session was lost + // while the browser remained open. + if (protocol.options.reconnection) { + throw new Error(`Unable to reconnect; session no longer exists (${token})`); + } - let connection: Connection; + // This will probably never happen outside a chance collision. + if (connection) { + throw new Error('Unable to connect; token is already in use'); + } + + // Now that the initial exchange has completed we can create the actual + // connection on top of the protocol then send it to whatever uses it. if (message.desiredConnectionType === ConnectionType.Management) { - connection = new ManagementConnection(protocol, token); + // The management connection is used by firing onDidClientConnect + // which makes the IPC server become aware of the connection. + connection = new ManagementConnection(protocol); this._onDidClientConnect.fire({ - protocol, onDidClientDisconnect: connection.onClose, + protocol, + onDidClientDisconnect: connection.onClose, }); } else { - const buffer = protocol.readEntireBuffer(); + // The extension host connection is used by spawning an extension host + // and passing the socket into it. connection = new ExtensionHostConnection( - message.args ? message.args.language : 'en', - protocol, buffer, token, + protocol, + { + language: 'en', + ...message.args, + }, this.services.get(IEnvironmentService) as INativeEnvironmentService, ); } connections.set(token, connection); - connection.onClose(() => { - logger.debug('Connection closed', field('token', token)); - connections.delete(token); - }); + connection.onClose(() => connections.delete(token)); + this.disposeOldOfflineConnections(connections); + logger.debug(`${connections.size} active ${connection.name} connection(s)`); break; - case ConnectionType.Tunnel: return protocol.tunnel(); - default: throw new Error('Unrecognized connection type'); + case ConnectionType.Tunnel: + return protocol.tunnel(); + default: + throw new Error(`Unrecognized connection type ${message.desiredConnectionType}`); } } @@ -200,8 +201,7 @@ export class Vscode { const offline = Array.from(connections.values()) .filter((connection) => typeof connection.offline !== 'undefined'); for (let i = 0, max = offline.length - this.maxExtraOfflineConnections; i < max; ++i) { - logger.debug('Disposing offline connection', field('token', offline[i].token)); - offline[i].dispose(); + offline[i].dispose('old'); } } @@ -295,11 +295,4 @@ export class Vscode { }); }); } - - /** - * TODO: implement. - */ - private async getDebugPort(): Promise { - return undefined; - } } diff --git a/src/node/routes/index.ts b/src/node/routes/index.ts index a4348952b..5bc9f10bb 100644 --- a/src/node/routes/index.ts +++ b/src/node/routes/index.ts @@ -41,7 +41,7 @@ export const register = async ( if (error) { return reject(error) } - logger.trace(plural(count, `${count} active connection`)) + logger.debug(plural(count, `${count} active connection`)) resolve(count > 0) }) })