Welcome to Subscribe On Youtube
Question
Formatted question description: https://leetcode.ca/all/1242.html
1242 Web Crawler Multithreaded
Given a url startUrl and an interface HtmlParser, implement a Multi-threaded web crawler
to crawl all links that are under the same hostname as startUrl.
Return all urls obtained by your web crawler in any order.
Your crawler should:
Start from the page: startUrl
Call HtmlParser.getUrls(url) to get all urls from a webpage of given url.
Do not crawl the same link twice.
Explore only the links that are under the same hostname as startUrl.
As shown in the example url above, the hostname is example.org.
For simplicity sake, you may assume all urls use http protocol without any port specified.
For example, the urls http://leetcode.com/problems and http://leetcode.com/contest are under the same hostname,
while urls http://example.org/test and http://example.com/abc are not under the same hostname.
The HtmlParser interface is defined as such:
interface HtmlParser {
// Return a list of all urls from a webpage of given url.
// This is a blocking call, that means it will do HTTP request and return when this request is finished.
public List<String> getUrls(String url);
}
Note that getUrls(String url) simulates performing a HTTP request.
You can treat it as a blocking function call which waits for a HTTP request to finish.
It is guaranteed that getUrls(String url) will return the urls within 15ms.
Single-threaded solutions will exceed the time limit so, can your multi-threaded web crawler do better?
Below are two examples explaining the functionality of the problem,
for custom testing purposes you'll have three variables
* urls,
* edges and
* startUrl
Notice that you will only have access to startUrl in your code,
while urls and edges are not directly accessible to you in code.
Example 1:
Input:
urls = [
"http://news.yahoo.com",
"http://news.yahoo.com/news",
"http://news.yahoo.com/news/topics/",
"http://news.google.com",
"http://news.yahoo.com/us"
]
edges = [[2,0],[2,1],[3,2],[3,1],[0,4]]
startUrl = "http://news.yahoo.com/news/topics/"
Output: [
"http://news.yahoo.com",
"http://news.yahoo.com/news",
"http://news.yahoo.com/news/topics/",
"http://news.yahoo.com/us"
]
Example 2:
Input:
urls = [
"http://news.yahoo.com",
"http://news.yahoo.com/news",
"http://news.yahoo.com/news/topics/",
"http://news.google.com"
]
edges = [[0,2],[2,1],[3,2],[3,1],[3,0]]
startUrl = "http://news.google.com"
Output: ["http://news.google.com"]
Explanation: The startUrl links to all other pages that do not share the same hostname.
Constraints:
1 <= urls.length <= 1000
1 <= urls[i].length <= 300
startUrl is one of the urls.
Hostname label must be from 1 to 63 characters long, including the dots, may contain only the ASCII letters from 'a' to 'z', digits from '0' to '9' and the hyphen-minus character ('-').
The hostname may not start or end with the hyphen-minus character ('-').
See: https://en.wikipedia.org/wiki/Hostname#Restrictions_on_valid_hostnames
You may assume there're no duplicates in url library.
Follow up:
Assume we have 10,000 nodes and 1 billion URLs to crawl.
We will deploy the same software onto each node. The software can know about all the nodes.
We have to minimize communication between machines and make sure each node does equal amount of work.
How would your web crawler design change?
What if one node fails or does not work?
How do you know when the crawler is done?
Algorithm
This topic is very novel. It means to write a simulated crawler program to crawl all web pages belonging to this domain name from a start page. LeetCode provides the crawler interface.
The topic requires the use of multi-threaded crawlers, otherwise it will time out.
- Use set to store crawled web pages. This set needs to support multi-threaded concurrent modification. I use
ConcurrentHashMap
- As long as the list of stored results supports multi-thread concurrency, I use
Collections.synchronizedList
Follow up:
Assume we have 10,000 nodes and 1 billion URLs to crawl. We will deploy the same software onto each node. The software can know about all the nodes. We have to minimize communication between machines and make sure each node does equal amount of work.
1 . How would your web crawler design change?
After a node crawls a page, it will get a list of URLs to crawl next.
The brute force solution would be that this node communicates with all other nodes to see what are the links that haven’t been crawled.
A better solution would be that a node use Consistent Hashing
to determine which node should handle that URL and send the URL to that node. In this way, each node becomes a task assigner, and the communication between nodes are minimized.
- When a new node is added to the system, a small fraction of links (crawled or not) that were or will be crawled in an old node will get migrated to the new node.
- When a node becomes offline, the links assigned to this node will be shared across other nodes.
And more:
- Add a database to store all the crawled data.
- Add serialize and de-serialize method, so that code can be distributed to 10k nodes with less bandwitch consumption.
- Add some centralized monitoring/scheduling strategy, to coordinate between hosts, if a crawler job failed in one stale host, restart it in another healthy host.
- Add some metric emitting, so that we can have a report of how many pages crawled, space usage to store them, etc.
- Kill long running thread, potentially something is going wrong.
Things to consider:
- Politeness/crawl rate
- DNS query
- Distributed crawling
- Priority crawling
- How to generate content signature?
- Duplicate detection
- what to do with similar content?
2 . What if one node fails or does not work?
Same as above, need a health ping or node management component, to move tasks on a stale host to a healthy host.
3 . How do you know when the crawler is done?
All tasks to all nodes are done, so still, job management componenet is needed.
Code
-
import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.stream.Collectors; import java.util.stream.Stream; public class Web_Crawler_Multithreaded { class Solution_1_synchronizedList { private final Set<String> set = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); private final List<String> result = Collections.synchronizedList(new ArrayList<String>()); private String HOSTNAME = null; public List<String> crawl(String startUrl, HtmlParser htmlParser) { initHostName(startUrl); set.add(startUrl); getUrlDfs(startUrl, htmlParser); return result; } private boolean judgeHostname(String url) { int idx = url.indexOf('/', 7); String hostName = (idx != -1) ? url.substring(0, idx) : url; return hostName.equals(HOSTNAME); } private void initHostName(String url) { int idx = url.indexOf('/', 7); HOSTNAME = (idx != -1) ? url.substring(0, idx) : url; } private void getUrlDfs(String startUrl, HtmlParser htmlParser) { result.add(startUrl); List<String> res = htmlParser.getUrls(startUrl); List<Thread> threads = new ArrayList<>(); for (String url : res) { if (judgeHostname(url) && !set.contains(url)) { set.add(url); threads.add(new Thread(() -> { getUrlDfs(url, htmlParser); })); } } for (Thread thread : threads) { thread.start(); } try { for (Thread thread : threads) { thread.join(); // Waits for this thread to die. } } catch (InterruptedException e) { e.printStackTrace(); } } } class Solution_2_ConcurrentSkipListSet { public List<String> crawl(String startUrl, HtmlParser htmlParser) { // ConcurrentSkipListSet: Constructs a new, empty set that orders its elements // according to their {@linkplain Comparable natural ordering} // https://stackoverflow.com/questions/1904439/when-is-a-concurrentskiplistset-useful // ConcurrentSkipListSet and ConcurrentSkipListMap are useful when you need a sorted container // that will be accessed by multiple threads. // These are essentially the equivalents of TreeMap and TreeSet for concurrent code. Set<String> visited = new ConcurrentSkipListSet<>(); String hostname = getHostname(startUrl); visited.add(startUrl); return crawlDfs(startUrl, htmlParser, hostname, visited).collect(Collectors.toList()); } private Stream<String> crawlDfs(String startUrl, HtmlParser htmlParser, String hostname, Set<String> visited) { try (Stream<String> stream = htmlParser.getUrls(startUrl) .parallelStream() .filter(url -> isSameHostname(url, hostname)) .filter(visited::add) .flatMap(url -> crawlDfs(url, htmlParser, hostname, visited))) { return Stream.concat(Stream.of(startUrl), stream); } } private String getHostname(String url) { int idx = url.indexOf('/', 7); return (idx != -1) ? url.substring(0, idx) : url; } private boolean isSameHostname(String url, String hostname) { return url.startsWith(hostname) && (url.length() == hostname.length() || url.charAt(hostname.length()) == '/'); } } interface HtmlParser { // Return a list of all urls from a webpage of given url. // This is a blocking call, that means it will do HTTP request and return when this request is finished. List<String> getUrls(String str); } class Solution_CrawlerClass { public List<String> crawl(String startUrl, HtmlParser htmlParser) { // 取得startUrl的域名 String host = URI.create(startUrl).getHost(); // 新建一个线程,爬取startUrl中的所有链接 Crawler crawler = new Crawler(startUrl, host, htmlParser); // 初始化线程的返回结果 crawler.res = new ArrayList<>(); // 开启线程(相当于从起点开始dfs) crawler.start(); // 等待线程执行结束 Crawler.joinThread(crawler); // 返回线程的执行结果 return crawler.res; } } // 爬虫线程(相当于原始的dfs方法) static class Crawler extends Thread { String startUrl; // 当前url String hostname; // 域名 HtmlParser htmlParser; // 爬虫接口 // 返回结果 public volatile List<String> res = new ArrayList<>(); // 初始化线程 public Crawler(String startUrl, String hostname, HtmlParser htmlParser) { this.startUrl = startUrl; this.hostname = hostname; this.htmlParser = htmlParser; } @Override public void run() { // 获得当前url的域名 String host = URI.create(startUrl).getHost(); // 如果当前域名不属于目标网站,或者当前域名已经爬过,略过 if (!host.equals(hostname) || res.contains(startUrl)) { return; } // 将当前url加入结果集 res.add(startUrl); // 记录当前url页面包含的链接 // 每个链接启动一个新的线程继续dfs List<Thread> threads = new ArrayList<>(); for (String s : htmlParser.getUrls(startUrl)) { Crawler crawler = new Crawler(s, hostname, htmlParser); crawler.start(); threads.add(crawler); } // 等待每个子线程执行结束后,再结束当前线程 for (Thread t : threads) { joinThread(t); } } public static void joinThread(Thread thread) { try { thread.join(); } catch (InterruptedException e) { } } } }
-
/** * // This is the HtmlParser's API interface. * // You should not implement it, or speculate about its implementation * class HtmlParser { * public: * vector<string> getUrls(string url); * }; */ class Solution { public: vector<string> crawl(string startUrl, HtmlParser htmlParser) { queue<string> q{ {startUrl} }; unordered_set<string> seen{ {startUrl} }; const string& hostname = getHostname(startUrl); // Threading const int nThreads = std::thread::hardware_concurrency(); vector<thread> threads; std::mutex mtx; std::condition_variable cv; auto t = [&]() { while (true) { unique_lock<mutex> lock(mtx); cv.wait_for(lock, 30ms, [&]() { return q.size(); }); if (q.empty()) return; auto cur = q.front(); q.pop(); lock.unlock(); const vector<string> urls = htmlParser.getUrls(cur); lock.lock(); for (const string& url : urls) { if (seen.count(url)) continue; if (url.find(hostname) != string::npos) { q.push(url); seen.insert(url); } } lock.unlock(); cv.notify_all(); } }; for (int i = 0; i < nThreads; ++i) threads.emplace_back(t); for (std::thread& t : threads) t.join(); return {begin(seen), end(seen)}; } private: string getHostname(const string& url) { const int firstSlash = url.find_first_of('/'); const int thirdSlash = url.find_first_of('/', firstSlash + 2); return url.substr(firstSlash + 2, thirdSlash - firstSlash - 2); } };
-
# reference https://levelup.gitconnected.com/multi-threaded-python-web-crawler-for-https-pages-e103f0839b71 class Crawler(threading.Thread): def __init__(self,base_url, links_to_crawl,have_visited, error_links,url_lock): threading.Thread.__init__(self) print(f"Web Crawler worker {threading.current_thread()} has Started") self.base_url = base_url self.links_to_crawl = links_to_crawl self.have_visited = have_visited self.error_links = error_links self.url_lock = url_lock def run(self): # we create a ssl context so that our script can crawl # the https sties with ssl_handshake. #Create a SSLContext object with default settings. my_ssl = ssl.create_default_context() # by default when creating a default ssl context and making an handshake # we verify the hostname with the certificate but our objective is to crawl # the webpage so we will not be checking the validity of the cerfificate. my_ssl.check_hostname = False # in this case we are not verifying the certificate and any # certificate is accepted in this mode. my_ssl.verify_mode = ssl.CERT_NONE # we are defining an infinite while loop so that all the links in our # queue are processed. while True: # In this part of the code we create a global lock on our queue of # links so that no two threads can access the queue at same time self.url_lock.acquire() print(f"Queue Size: {self.links_to_crawl.qsize()}") link = self.links_to_crawl.get() self.url_lock.release() # if the link is None the queue is exhausted or the threads are yet # process the links. if link is None: break # if The link is already visited we break the execution. if link in self.have_visited: print(f"The link {link} is already visited") break try: # This method constructs a full "absolute" URL by combining the # base url with other url. this uses components of the base URL, # in particular the addressing scheme, the network # location and the path, to provide missing components # in the relative URL. # in short we repair our relative url if it is broken. link = urljoin(self.base_url,link) # we use the header parameter to "spoof" the "User-Agent" header # value which is used by the browser to identify itself. This is # because some servers will only allow the connection if it comes # from a verified browser. In this case we are using FireFox header. req = Request(link, headers= {'User-Agent': 'Mozilla/5.0'}) # we are opening the url using a ssl handshake. response = urlopen(req, context=my_ssl) print(f"The URL {response.geturl()} crawled with \ status {response.getcode()}") # this returns the html representation of the webpage soup = BeautifulSoup(response.read(),"html.parser") # in this case we are finding all the links in the page. for a_tag in soup.find_all('a'): # we are checking of the link is already visited and (network location part) is our # base url itself. if (a_tag.get("href") not in self.have_visited) and (urlparse(link).netloc == "www.python.org"): self.links_to_crawl.put(a_tag.get("href")) else: print(f"The link {a_tag.get('href')} is already visited or is not part \ of the website") print(f"Adding {link} to the crawled list") self.have_visited.add(link) except URLError as e: print(f"URL {link} threw this error {e.reason} while trying to parse") self.error_links.append(link) finally: self.links_to_crawl.task_done() if __name__ == '__main__': print("The Crawler is started") base_url = input("Please Enter Website to Crawl > ") number_of_threads = input("Please Enter number of Threads > ") links_to_crawl = queue.Queue() url_lock = threading.Lock() links_to_crawl.put(base_url) have_visited = set() crawler_threads = [] error_links = [] #base_url, links_to_crawl,have_visited, error_links,url_lock for i in range(int(number_of_threads)): crawler = Crawler(base_url = base_url, links_to_crawl= links_to_crawl, have_visited= have_visited, error_links= error_links, url_lock=url_lock) crawler.start() crawler_threads.append(crawler) for crawler in crawler_threads: crawler.join() print(f"Total Number of pages visited are {len(have_visited)}") print(f"Total Number of Errornous links: {len(error_links)}")
import multiprocessing from bs4 import BeautifulSoup from queue import Queue, Empty from concurrent.futures import ThreadPoolExecutor from urllib.parse import urljoin, urlparse import requests # ThreadPoolExecutor class MultiThreadedCrawler: def __init__(self, seed_url): self.seed_url = seed_url self.root_url = '{}://{}'.format(urlparse(self.seed_url).scheme, urlparse(self.seed_url).netloc) self.pool = ThreadPoolExecutor(max_workers=5) self.scraped_pages = set([]) self.crawl_queue = Queue() self.crawl_queue.put(self.seed_url) def parse_links(self, html): soup = BeautifulSoup(html, 'html.parser') Anchor_Tags = soup.find_all('a', href=True) for link in Anchor_Tags: url = link['href'] if url.startswith('/') or url.startswith(self.root_url): url = urljoin(self.root_url, url) if url not in self.scraped_pages: self.crawl_queue.put(url) def scrape_info(self, html): soup = BeautifulSoup(html, "html5lib") web_page_paragraph_contents = soup('p') text = '' for para in web_page_paragraph_contents: if not ('https:' in str(para.text)): text = text + str(para.text).strip() print(f'\n <---Text Present in The WebPage is --->\n', text, '\n') return def post_scrape_callback(self, res): result = res.result() if result and result.status_code == 200: self.parse_links(result.text) self.scrape_info(result.text) def scrape_page(self, url): try: res = requests.get(url, timeout=(3, 30)) return res except requests.RequestException: return def run_web_crawler(self): while True: try: print("\n Name of the current executing process: ", multiprocessing.current_process().name, '\n') target_url = self.crawl_queue.get(timeout=60) if target_url not in self.scraped_pages: print("Scraping URL: {}".format(target_url)) self.current_scraping_url = "{}".format(target_url) self.scraped_pages.add(target_url) job = self.pool.submit(self.scrape_page, target_url) job.add_done_callback(self.post_scrape_callback) except Empty: return except Exception as e: print(e) continue def info(self): print('\n Seed URL is: ', self.seed_url, '\n') print('Scraped pages are: ', self.scraped_pages, '\n') if __name__ == '__main__': cc = MultiThreadedCrawler("https://www.leetcode.ca") cc.run_web_crawler() cc.info()