The code imports necessary modules, defines constants for timeouts and connections, and implements two key functions: deQueue
for recursively dequeuing tasks from an input queue and multiCrawl
for parallel crawling using Selenium connections. The multiCrawl
function uses deQueue
to crawl through an input list and returns a promise with the crawl results.
npm run import -- "multi crawl"
var importer = require('../Core');
var runSeleniumCell = importer.import("selenium cell");
var TIMEOUT = 1000;
var CONNECTIONS = 3;
// recursively dequeue tasks
function deQueue(inputQueue, searchCallback, ctx) {
const results = [];
console.log(ctx.client.requestHandler.sessionID);
const callback = typeof searchCallback === 'function'
? searchCallback
: importer.import("searchCallback",
"Object.assign({useCache: false}",
"ctx"));
if(inputQueue.length > 0) {
const item = inputQueue.shift();
return new Promise(resolve => setTimeout(() => resolve(), 100))
.then(() => callback(...[item, ctx]))
.catch(e => {
console.log(e + '');
if((e + '').indexOf('Already') > -1 || (e + '').indexOf('session') > -1) {
inputQueue.push(item);
throw new Error('Abandoning session :(', e);
}
})
.then(r => results.push(r))
.then(() => deQueue(inputQueue, searchCallback, ctx))
.then(r => results.concat(r))
.catch(e => console.log(e))
} else {
return results;
}
}
// create a number of individual selenium sessions and dequeue the tasks with the callback search
function multiCrawl(inputList, searchCallback) {
var indexes = Array.from(Array(Math.min(inputList.length, CONNECTIONS)).keys());
var connections = [];
var promises = indexes.map((s, i) => resolve => {
const client = runSeleniumCell(false, false);
return client
// skip this if error
//.then(() => connections[i].onlyOneWindow())
//.then(() => connections[i].resizeWindow())
.then(ctx => {
connections.push(ctx);
resolve(ctx)
})
.catch(e => {
console.log(e);
resolve(null);
})
});
var queue = [].concat(inputList);
var count = 0;
return importer.runAllPromises(promises)
.then(() => {
console.log(connections.map(c => c.client.requestHandler.sessionID));
})
.then(() => {
return connections[0].client
.scanning(true)
.then(() => connections[0].getAllSessionUrls())
.scanning(false)
})
.then(() => console.log('done loading sessions'))
.then(() => Promise.all(connections.map(ctx => deQueue(queue, searchCallback, ctx))))
.then(r => [].concat([], ...r))
}
module.exports = multiCrawl;
const importer = require('../Core');
const runSeleniumCell = importer.import('selenium cell');
const TIMEOUT = 1000;
const MAX_CONCURRENT_CONNECTIONS = 3;
const SLEEP_DURATION = 100; // duration to sleep before making the next API call
/**
* Dequeue tasks from the input queue recursively.
*
* @param {Array} inputQueue The queue of tasks to dequeue
* @param {Function} searchCallback The callback function to apply to each dequeued item
* @param {Object} ctx The context to pass to the callback function
* @returns {Promise} A promise that resolves with an array of results
*/
function dequeue(inputQueue, searchCallback, ctx) {
const results = [];
// Get the current session ID for logging purposes
const sessionId = ctx.client.requestHandler.sessionID;
console.log(`Session ID: ${sessionId}`);
const callback = typeof searchCallback === 'function'
? searchCallback
: importer.import(searchCallback, { useCache: false });
// Base case: if the queue is empty, return the results
if (inputQueue.length === 0) {
return Promise.resolve(results);
}
// Dequeue the next item from the queue
const item = inputQueue.shift();
// Apply the callback function to the dequeued item and wait for the result
return new Promise(resolve => setTimeout(resolve, SLEEP_DURATION))
.then(() => callback(item, ctx))
.catch((error) => {
console.log(`Error dequeuing item: ${error}`);
// If the error indicates that the session has been abandoned, re-add the item to the queue
if (error.message.includes('Already') || error.message.includes('session')) {
inputQueue.push(item);
throw new Error('Abandoning session :(', error);
}
})
.then((result) => {
results.push(result);
// Recursively dequeue the remaining items in the queue
return dequeue(inputQueue, searchCallback, ctx).then((results) => results.concat(results));
});
}
/**
* Create multiple Selenium sessions and execute a search callback function on each item in the input list.
*
* @param {Array} inputList The list of items to search
* @param {Function} searchCallback The callback function to apply to each item in the input list
* @returns {Promise} A promise that resolves with an array of results
*/
function multiCrawl(inputList, searchCallback) {
// Limit the number of concurrent connections to the maximum allowed
const maxConnections = Math.min(inputList.length, MAX_CONCURRENT_CONNECTIONS);
// Create an array of promises to establish the connections
const connections = Array.from({ length: maxConnections }, (_, index) => {
const client = runSeleniumCell(false, false);
// Resolve the promise when the connection is established
return new Promise((resolve) => {
client.then((ctx) => {
resolve(ctx);
}).catch((error) => {
console.log(`Error establishing connection: ${error}`);
resolve(null);
});
});
});
// Run all the promises to establish the connections concurrently
return Promise.all(connections)
.then((results) => {
// Filter out any null results
const validResults = results.filter((result) => result!== null);
// Create an array of promises to dequeue the items in the input list
const promises = validResults.map((ctx) => dequeue(inputList, searchCallback, ctx));
// Run all the promises to dequeue the items concurrently
return Promise.all(promises);
})
.then((results) => results.flat());
}
module.exports = multiCrawl;
var importer = require('../Core');
var runSeleniumCell = importer.import('selenium cell');
importer
from ../Core
and runSeleniumCell
from the selenium cell
module imported by importer
.var TIMEOUT = 1000;
var CONNECTIONS = 3;
TIMEOUT
with a value of 1000 (1 second) and CONNECTIONS
with a value of 3.function deQueue(inputQueue, searchCallback, ctx) {
//...
}
deQueue
function takes three arguments: inputQueue
, searchCallback
, and ctx
.inputQueue
using the searchCallback
function.searchCallback
function, the task is pushed back to the input queue.function multiCrawl(inputList, searchCallback) {
//...
}
multiCrawl
function takes two arguments: inputList
and searchCallback
.runSeleniumCell
function.Promise.all
to wait for the connections to be established.deQueue
function with the queue and the connections.