data collection | tell joke | crawl domain | Search

The code imports necessary modules, defines constants for timeouts and connections, and implements two key functions: deQueue for recursively dequeuing tasks from an input queue and multiCrawl for parallel crawling using Selenium connections. The multiCrawl function uses deQueue to crawl through an input list and returns a promise with the crawl results.

Run example

npm run import -- "multi crawl"

multi crawl

var importer = require('../Core');
var runSeleniumCell = importer.import("selenium cell");

var TIMEOUT = 1000;
var CONNECTIONS = 3;

// recursively dequeue tasks
function deQueue(inputQueue, searchCallback, ctx) {
    const results = [];
    console.log(ctx.client.requestHandler.sessionID);
    const callback = typeof searchCallback === 'function'
        ? searchCallback
        : importer.import("searchCallback",
"Object.assign({useCache: false}",
"ctx"));
    if(inputQueue.length > 0) {
        const item = inputQueue.shift();
        return new Promise(resolve => setTimeout(() => resolve(), 100))
            .then(() => callback(...[item, ctx]))
            .catch(e => {
                console.log(e + '');
                if((e + '').indexOf('Already') > -1 || (e + '').indexOf('session') > -1) {
                    inputQueue.push(item);
                    throw new Error('Abandoning session :(', e);
                }
            })
            .then(r => results.push(r))
            .then(() => deQueue(inputQueue, searchCallback, ctx))
            .then(r => results.concat(r))
            .catch(e => console.log(e))
    } else {
        return results;
    }
}

// create a number of individual selenium sessions and dequeue the tasks with the callback search
function multiCrawl(inputList, searchCallback) {
    var indexes = Array.from(Array(Math.min(inputList.length, CONNECTIONS)).keys());
    var connections = [];
    var promises = indexes.map((s, i) => resolve => {
        const client = runSeleniumCell(false, false);
        return client
            // skip this if error
            //.then(() => connections[i].onlyOneWindow())
            //.then(() => connections[i].resizeWindow())
            .then(ctx => {
                connections.push(ctx);
                resolve(ctx)
            })
            .catch(e => {
                console.log(e);
                resolve(null);
            })
    });
    var queue = [].concat(inputList);
    var count = 0;
    return importer.runAllPromises(promises)
        .then(() => {
            console.log(connections.map(c => c.client.requestHandler.sessionID));
        })
        .then(() => {
            return connections[0].client
                .scanning(true)
                .then(() => connections[0].getAllSessionUrls())
                .scanning(false)
        })
        .then(() => console.log('done loading sessions'))
        .then(() => Promise.all(connections.map(ctx => deQueue(queue, searchCallback, ctx))))
        .then(r => [].concat([], ...r))
}
module.exports = multiCrawl;

What the code could have been:

const importer = require('../Core');
const runSeleniumCell = importer.import('selenium cell');

const TIMEOUT = 1000;
const MAX_CONCURRENT_CONNECTIONS = 3;
const SLEEP_DURATION = 100; // duration to sleep before making the next API call

/**
 * Dequeue tasks from the input queue recursively.
 * 
 * @param {Array} inputQueue The queue of tasks to dequeue
 * @param {Function} searchCallback The callback function to apply to each dequeued item
 * @param {Object} ctx The context to pass to the callback function
 * @returns {Promise} A promise that resolves with an array of results
 */
function dequeue(inputQueue, searchCallback, ctx) {
    const results = [];

    // Get the current session ID for logging purposes
    const sessionId = ctx.client.requestHandler.sessionID;
    console.log(`Session ID: ${sessionId}`);

    const callback = typeof searchCallback === 'function'
       ? searchCallback
        : importer.import(searchCallback, { useCache: false });

    // Base case: if the queue is empty, return the results
    if (inputQueue.length === 0) {
        return Promise.resolve(results);
    }

    // Dequeue the next item from the queue
    const item = inputQueue.shift();

    // Apply the callback function to the dequeued item and wait for the result
    return new Promise(resolve => setTimeout(resolve, SLEEP_DURATION))
       .then(() => callback(item, ctx))
       .catch((error) => {
            console.log(`Error dequeuing item: ${error}`);

            // If the error indicates that the session has been abandoned, re-add the item to the queue
            if (error.message.includes('Already') || error.message.includes('session')) {
                inputQueue.push(item);
                throw new Error('Abandoning session :(', error);
            }
        })
       .then((result) => {
            results.push(result);

            // Recursively dequeue the remaining items in the queue
            return dequeue(inputQueue, searchCallback, ctx).then((results) => results.concat(results));
        });
}

/**
 * Create multiple Selenium sessions and execute a search callback function on each item in the input list.
 * 
 * @param {Array} inputList The list of items to search
 * @param {Function} searchCallback The callback function to apply to each item in the input list
 * @returns {Promise} A promise that resolves with an array of results
 */
function multiCrawl(inputList, searchCallback) {
    // Limit the number of concurrent connections to the maximum allowed
    const maxConnections = Math.min(inputList.length, MAX_CONCURRENT_CONNECTIONS);

    // Create an array of promises to establish the connections
    const connections = Array.from({ length: maxConnections }, (_, index) => {
        const client = runSeleniumCell(false, false);

        // Resolve the promise when the connection is established
        return new Promise((resolve) => {
            client.then((ctx) => {
                resolve(ctx);
            }).catch((error) => {
                console.log(`Error establishing connection: ${error}`);
                resolve(null);
            });
        });
    });

    // Run all the promises to establish the connections concurrently
    return Promise.all(connections)
       .then((results) => {
            // Filter out any null results
            const validResults = results.filter((result) => result!== null);

            // Create an array of promises to dequeue the items in the input list
            const promises = validResults.map((ctx) => dequeue(inputList, searchCallback, ctx));

            // Run all the promises to dequeue the items concurrently
            return Promise.all(promises);
        })
       .then((results) => results.flat());
}

module.exports = multiCrawl;

Code Breakdown

Import Statements

var importer = require('../Core');
var runSeleniumCell = importer.import('selenium cell');

Constants

var TIMEOUT = 1000;
var CONNECTIONS = 3;

deQueue Function

function deQueue(inputQueue, searchCallback, ctx) {
    //...
}

multiCrawl Function

function multiCrawl(inputList, searchCallback) {
    //...
}