From Bootstrap to Airflow DAG (11 Part Series)
1 Web Scraping Sprott U Fund with BS4 in 10 Lines of Code
2 The Web Scraping Continuum
… 7 more parts…
3 Bootstrapped to Functional
4 Quick Detour on Unit Testing with PyTest
5 From Functional to Class: a look at SOLID coding
6 From Class to Abstract Classes
7 Abstract Configurations
8 Scraper Function to Airflow DAG
9 Class to Airflow Custom Operator
10 Custom Airflow Hooks
11 Final: How to Structure the DAG
Quickly reviewing what we’ve done in the previous post, we turned an object that was responsible for the full ETL process and created separate objects for extracting, transforming, and loading and created a pipeline object called FundScraper to run through the process. However, FundScraper isn’t very abstracted. You can put any URL you want in there, but the transformations are quite limited. The pipeline will only work for a single URL, which is a bit of a pain, and you could swap out the JSONWriter with a different file format, but we would need to modify the object to write to a database. Let’s refactor it a bit so that it can handle configurations and be slightly more extendible.
# How it currently isclass FundScraper:def __init__(self, requester: WebRequester, parser: HTMLParser, transformer: DataTransformer, writer: DataWriter):self.requester = requesterself.parser = parserself.transformer = transformerself.writer = writerdef run(self, url, indexes, class_name, filename='data.json'):response = self.requester.get(url)if response.status_code != 200:raise ValueError(f"Error retrieving {url}: {response.status_code}")soup = self.parser.parse(response.content)data = self.transformer.transform(soup.find_all('div', class_=class_name), indexes)self.writer.write(data, filename)# How it currently is class FundScraper: def __init__(self, requester: WebRequester, parser: HTMLParser, transformer: DataTransformer, writer: DataWriter): self.requester = requester self.parser = parser self.transformer = transformer self.writer = writer def run(self, url, indexes, class_name, filename='data.json'): response = self.requester.get(url) if response.status_code != 200: raise ValueError(f"Error retrieving {url}: {response.status_code}") soup = self.parser.parse(response.content) data = self.transformer.transform( soup.find_all('div', class_=class_name), indexes) self.writer.write(data, filename)# How it currently is class FundScraper: def __init__(self, requester: WebRequester, parser: HTMLParser, transformer: DataTransformer, writer: DataWriter): self.requester = requester self.parser = parser self.transformer = transformer self.writer = writer def run(self, url, indexes, class_name, filename='data.json'): response = self.requester.get(url) if response.status_code != 200: raise ValueError(f"Error retrieving {url}: {response.status_code}") soup = self.parser.parse(response.content) data = self.transformer.transform( soup.find_all('div', class_=class_name), indexes) self.writer.write(data, filename)
Enter fullscreen mode Exit fullscreen mode
In all likelihood, we would probably want to run this across multiple webpages, potentially multiple divs, for multiple values. We could add a for-loop to collect data from multiple places on a single webpage like this:
data = {}for index, index_number in enumerate(indexes):data = self.transformer.transform(soup.find_all('div', class_=class_name), data, index_number, value_names[index])data = {} for index, index_number in enumerate(indexes): data = self.transformer.transform( soup.find_all('div', class_=class_name), data, index_number, value_names[index])data = {} for index, index_number in enumerate(indexes): data = self.transformer.transform( soup.find_all('div', class_=class_name), data, index_number, value_names[index])
Enter fullscreen mode Exit fullscreen mode
Modifying the transform method to this:
class FundValueTransformer(DataTransformer):def transform(self, values, dictionary: dict, index: int, value_name: str):dictionary[value_name] = str(values[index].contents[0]).strip().replace('$US', '').replace(',', '')return dictionaryclass FundValueTransformer(DataTransformer): def transform(self, values, dictionary: dict, index: int, value_name: str): dictionary[value_name] = str(values[index].contents[0]).strip().replace( '$US', '').replace(',', '') return dictionaryclass FundValueTransformer(DataTransformer): def transform(self, values, dictionary: dict, index: int, value_name: str): dictionary[value_name] = str(values[index].contents[0]).strip().replace( '$US', '').replace(',', '') return dictionary
Enter fullscreen mode Exit fullscreen mode
So now it’s able to request a website, pull the contents with beautifulsoup, extract the values (i.e. transform the contents to something useful), and write to JSON.
How do we give it instructions? We could parse a JSON, create simple Python parameters, or we could use Yet Another Markup Language (YAML), which is a pretty popular configuration tool found in other applications.
A config we could use for our example would be like this:
sprott:url: https://sprott.com/investment-strategies/physical-commodity-funds/uranium/class_name:- fundHeader_value: {4: shareprice, 6: u3o8_stock}sprott: url: https://sprott.com/investment-strategies/physical-commodity-funds/uranium/ class_name: - fundHeader_value: {4: shareprice, 6: u3o8_stock}sprott: url: https://sprott.com/investment-strategies/physical-commodity-funds/uranium/ class_name: - fundHeader_value: {4: shareprice, 6: u3o8_stock}
Enter fullscreen mode Exit fullscreen mode
Really simply, the keys followed by colons show up as dictionaries when parsed in Python and the dashes show up as lists. You can also provide it a dictionary like I do for the index and value_name. You can see how we could easily add more values, more html tags, and more URLs to this list.
Last, but not least, we have to reconfigure our basic if script is called function at the bottom to parse the yaml. That could be done with the following:
if __name__ == '__main__':config = safe_load(open('config.yml', 'r'))scraper = FundScraper(RequestsWebRequester(), BeautifulSoupHTMLParser(), FundValueTransformer(), JSONDataWriter())for key, value in config.items():for class_name in value['class_name']:for tag, indexes in class_name.items():scraper.run(url=value['url'],class_name=tag,indexes=[i for i in indexes.keys()],value_names=[v for v in indexes.values()],filename=f"{key}.json")if __name__ == '__main__': config = safe_load(open('config.yml', 'r')) scraper = FundScraper(RequestsWebRequester(), BeautifulSoupHTMLParser( ), FundValueTransformer(), JSONDataWriter()) for key, value in config.items(): for class_name in value['class_name']: for tag, indexes in class_name.items(): scraper.run(url=value['url'], class_name=tag, indexes=[i for i in indexes.keys()], value_names=[v for v in indexes.values()], filename=f"{key}.json")if __name__ == '__main__': config = safe_load(open('config.yml', 'r')) scraper = FundScraper(RequestsWebRequester(), BeautifulSoupHTMLParser( ), FundValueTransformer(), JSONDataWriter()) for key, value in config.items(): for class_name in value['class_name']: for tag, indexes in class_name.items(): scraper.run(url=value['url'], class_name=tag, indexes=[i for i in indexes.keys()], value_names=[v for v in indexes.values()], filename=f"{key}.json")
Enter fullscreen mode Exit fullscreen mode
It’s a bit more complex than the last round where we just instantiated the scraper and ran it because now we’re running it multiple times over different URLs the separate configurations for each URL.
All of this is being done this way for a reason that will be clearer in a couple of weeks after I create an Airflow DAG with it all, but before then, we need to dive into Airflow first at a high level as a workflow orchestrator and, second, its components.
As always, the code can be found here.
From Bootstrap to Airflow DAG (11 Part Series)
1 Web Scraping Sprott U Fund with BS4 in 10 Lines of Code
2 The Web Scraping Continuum
… 7 more parts…
3 Bootstrapped to Functional
4 Quick Detour on Unit Testing with PyTest
5 From Functional to Class: a look at SOLID coding
6 From Class to Abstract Classes
7 Abstract Configurations
8 Scraper Function to Airflow DAG
9 Class to Airflow Custom Operator
10 Custom Airflow Hooks
11 Final: How to Structure the DAG
暂无评论内容