kernels | , decode encode ipython zmq protocol | Cell 26 | Search

The setupSockets function establishes multiple sockets based on a provided configuration and returns a promise resolving to an object with established sockets. A related setupSocket function creates a single socket connection, also returning a promise that resolves to the established socket object.

Run example

npm run import -- "bind wire sockets"

bind wire sockets


function setupSockets(config) {
    const sockets = {
        control: {
            port: config.control_port,
            type: 'xrep',
        },
        shell: {
            port: config.shell_port,
            type: 'xrep',
        },
        stdin: {
            port: config.stdin_port,
            type: 'router',
        },
        iopub: {
            port: config.iopub_port,
            type: 'pub',
        },
        heartbeat: {
            port: config.hb_port,
            type: 'rep',
        }
    }
    var keys = Object.keys(sockets);
    return Promise.all(keys.map(s => setupSocket.apply(null, [sockets[s], config])))
        .then(sockets => sockets.reduce((obj, socket, i) =>
                                        (obj[keys[i]] = socket, obj), {}))
}

function setupSocket(config, general) {
    const sock = zmq.socket(config.type);
    const addr = general.transport + '://' + general.ip + ':' + config.port
    return util.promisify(sock.bind.bind(sock))(addr)
        .then(() => sock)
}

module.exports = {
    setupSockets
}

What the code could have been:

// zmqSocketConfig.js
const zmq = require('zmq');
const { promisify } = require('util');

// Define the supported socket types
const socketTypes = ['xrep', 'router', 'pub','rep'];

class ZMQSocketConfig {
    constructor(config) {
        if (!config ||!config.control_port) {
            throw new Error('Missing required configuration');
        }

        // Set default values for missing configuration options
        this.control = {
            port: config.control_port,
            type: 'xrep',
        };
        this.shell = {
            port: config.shell_port || config.control_port,
            type: 'xrep',
        };
        this.stdin = {
            port: config.stdin_port || 5555, // Default stdin port
            type: 'router',
        };
        this.iopub = {
            port: config.iopub_port || config.control_port,
            type: 'pub',
        };
        this.heartbeat = {
            port: config.hb_port || (config.control_port + 1), // Default heartbeat port
            type:'rep',
        };
    }

    getSocketConfig() {
        return {
            control: this.control,
            shell: this.shell,
            stdin: this.stdin,
            iopub: this.iopub,
            heartbeat: this.heartbeat,
        };
    }
}

async function setupSockets(config) {
    try {
        const zmqConfig = new ZMQSocketConfig(config);
        const sockets = zmqConfig.getSocketConfig();

        // Validate the socket types
        const validTypes = Object.values(sockets).map(socket => socket.type);
        if (!validTypes.every(type => socketTypes.includes(type))) {
            throw new Error('Invalid socket type');
        }

        // Set up the sockets
        const promises = Object.keys(sockets).map(async (key) => {
            const socketConfig = sockets[key];
            const socket = await setupSocket(socketConfig, config);
            return { [key]: socket };
        });

        // Wait for all sockets to be set up
        const socketObjects = await Promise.all(promises);

        // Merge the socket objects into a single object
        return socketObjects.reduce((obj, socket) => ({...obj,...socket }), {});
    } catch (error) {
        console.error('Failed to set up sockets:', error);
        throw error;
    }
}

async function setupSocket(config, general) {
    try {
        const sock = zmq.socket(config.type);
        const addr = `${general.transport}://${general.ip}:${config.port}`;
        return promisify(sock.bind.bind(sock))(addr)
           .then(() => sock);
    } catch (error) {
        console.error('Failed to set up socket:', error);
        throw error;
    }
}

module.exports = {
    setupSockets,
    setupSocket,
};

Code Breakdown

setupSockets Function

The setupSockets function is an asynchronous operation that sets up multiple sockets based on the provided configuration.

Parameters:

Return Value:

Description:

  1. Creates an object sockets with properties for different types of sockets (control, shell, stdin, iopub, and heartbeat).
  2. Each socket configuration is created with a specified port and type.
  3. Returns a promise that resolves to an array of socket objects after they are established.
  4. The promise is resolved by reducing the array of socket objects into an object containing the socket names as keys.

setupSocket Function

The setupSocket function establishes a single socket connection.

Parameters:

Return Value:

Description:

  1. Creates a socket of the specified type using the zmq library.
  2. Binds the socket to the specified address using the bind method.
  3. Returns a promise that resolves to the established socket object.

Module Exports

The setupSockets function is exported as a module.