Processing Wikipedia Dumps With Python 🐍🐍🐍
Would you benefit from GBs of organized text that can be categorized into hierarchies of domains and subject matter silos? Keep reading.
Feeding The Text Analysis Beast
Wikipedia is a great source of organized text1, useful for language projects2. In addition to the exposition style used in many articles, there are many tables of details and facts, along with the valuable relationships between articles, categories, and emergent ontologies.
While training or augmenting large language models is the most obvious use, developing and testing word embeddings, sentiment analysis, fact extraction, or solutions containing combinations of those facets 3, having large volumes of text to run through the system is a requirement.
Wikipedia conveniently provides regularly updated dumps of their corpus. In this case I'm focused on the English language articles extract which you can find at https://dumps.wikimedia.org/enwiki/4, along with the correlating index file for the same.
Wikipedia is imperfect and its many faults have been well documented, but it remains an extraordinary resource.
At the time of writing, the English articles extract and its correlating index file sum to a rounded 21GB in bzip2 form. There is no need to decompress, though if you did they would expand to about ~100GB.
Other extracts are available, such as the entire revision history of articles, media, and other languages, however the focus here is the current (or as current as the extract) revision text of English language articles.
For this exercise I'm going to work with the 20230520 (May 20th, 2023) variant, though the process I describe has remained the same for years. When the next dumps are available I'll re-run the process.
The two files I am working with now are enwiki-20230520-pages-articles-multistream.xml.bz2
and
enwiki-20230520-pages-articles-multistream-index.txt.bz2
. Hashes (sha1, md5) are linked to on
the download
page. Verify your files before wasting your time dealing with a potentially corrupt download5.
This is an intro to acquiring, processing and making sense of those extracts, then beginning the initial analysis, at which point the reader can specialize to their own particular needs. I'm using Python purely because it's the tool that I was using in other parts of the analysis chain for this particular project (given its strong machine learning tooling), but it would be easy to do the same thing in any number of other stacks. This isn't language advocacy, and I make no claims of being a python expert.
Notes on BZIP2
These extracts are compressed with bzip2. bzip2 is not a file archiver and does not store more than one file or any metadata about the same. It is merely a compressed sequence of bytes, offering a better compression ratio than many competitors6 at the cost of increased compression time. When the need requires more than one file or file metadata to be preserved, the norm is to tarball first, then feeding that output to bz2 (e.g. .tar.bz2), however for the wiki use each is a single file. A massive XML file for the main articles dump, along with a separate colon-delimited index file.
Bzip2 supports a feature called "streams" which are multiple bzip2 compressed files merged end to
end, the decompression of which will be the concatenated sum. If you took two separate files and bzip2
compressed them, then concatenating the results (e.g. copy /b file1.txt.bz2+file2.txt.bz2
file3.txt.bz2
), that would be a multistream bzip2 file, and when decompressed would yield a
single file that would be the combination of the two files, end to end. While many formats would use a head
or tail metadata block detailing each of these segments for easier processing/interpretation, bzip2 does
not.
Wikipedia makes use of those streams — note the "multistream" in the filenames — offering the benefits of parallelization to both compression and decompression stages. It allows them to split the articles into sets of 100 and then compress each bundle independently and in parallel. When complete they can combine them in order, documenting the offset of each in a separate index file. This benefit carries to consumers of the file which can selectively decompress parts of the stream surgically, or can parallelize processing the file for performance reasons, which is the approach discussed here.
The Index File
The index file tells us in which bz2 "stream" of the larger bz2 file a particular article can be found by telling us the byte offset to the beginning of that discrete bz2-compressed segment. In the uncompressed index file (though we stream decompress it during processing, so you don't need to decompress it in advance) you'll find millions of lines of text in the form of
Offset:ArticleId:ArticleName
e.g.
608:10:AccessibleComputing
The offset is the bytes from the beginning of the compressed file where the page, and up to 99 of its peers, can be found in an isolated bzip2 stream, the result being a partial XML document.
The dump begins with various bookkeeping and administrative type metadata, hence why the first article is several hundred bytes in.
You'll notice that a series of articles will share the same offset because articles were combined into the same bzip2 stream. If you wanted to surgically extract a single article you would need to decompress the stream from the offset (to the next offset or the end of the file) and then parse the resulting XML to find the specific article you seek among the 1007 in that set.
A Note On The Python Code
This isn't a python tutorial. I'm not trying to teach anyone about command line arguments, robust error handling or optimized python. My error approach for this is "if something small fails, fail entirely because human intervention is necessary". I wrote this entry purely because in trying to scratch a small itch (a need for a large volume of test data for a system with an ontological organization) I found multiple similar tutorials that were wrong (or wrong-headed) in various time-wasting ways.
I'm going to hardcode paths for the code because it simplifies, but you shouldn't do this in your own code. I trust the reader has the capacity to adapt code to their own environment. I've removed code comments8.
Processing The Index File
The only use for the index file in this scheme is to find all the unique offsets, allowing for the parallel
processing of the separate streams from the larger file. In the scenario being discussed I am working with
the index file enwiki-20230520-pages-articles-multistream-index.txt.bz2
.
I could eliminate the need for the index file by pre-scanning the larger file, finding the beginning and end of each stream, but bz2 is a non-trivial format. For instance the end of stream magic sequence (0x177245385090) isn't byte aligned. Normal lazy mechanisms fail without a much more involved interpretation of the file, which I have no interest in building.
Wikipedia provides the index file. Even though 98.85%+ of it is redundant if we want just the offsets, it's a robust, simple source.
Let's make some code to extract the unique offsets from the index file. In the case of the
20230520
file there are 229,002 offsets. Not that this code "caches" the results,
drawing from the cache on subsequent runs. If you're like me you'll probably decide to change how
you process the files and re-run multiple times, so it's beneficial.
A set is used for the initial extraction, eliminating duplicates9. It is then sorted into a list and returned to the caller.
import bz2
import os
def process_index_file(index_source, persist=True):
index_offsets_persisted = index_source + ".offsets"
if os.path.exists(index_offsets_persisted):
try:
index_filehandle = open(index_offsets_persisted, "r")
offset_strings = index_filehandle.readlines()
sorted_offset_strings = [int(offset) for offset in offset_strings]
return sorted_offset_strings
finally:
index_filehandle.close()
else:
stream_offsets = set()
try:
index_filehandle = bz2.BZ2File(index_source)
last_offset = -1
for line in index_filehandle:
offset = int(line.decode("utf-8").split(":")[0])
if offset != last_offset:
stream_offsets.add(offset)
last_offset = offset
finally:
index_filehandle.close()
sorted_stream_offsets = sorted(stream_offsets)
if persist:
try:
offset_output_filehandle = open(index_offsets_persisted, "w")
sorter_stream_offset_strings = [str(offset) for offset in sorted_stream_offsets]
sorter_stream_offset_string = '\n'.join(sorter_stream_offset_strings)
offset_output_filehandle.write(sorter_stream_offset_string)
finally:
offset_output_filehandle.close()
return sorted_stream_offsets
This code could be called from another module via a call like offsets =
wikiindex.process_index_file("./enwiki-20230520-pages-articles-multistream-index.txt.bz2")
Quick to process, and assuming you haven't passed False to persist, almost immediate on subsequent calls. The caching file is about 2.8MB in size.
Using The Offsets To Process The Article Dump
Now that we have the offsets, let's process them in parallel. For this we're going to leverage the multiprocessing package. If you attempt to do this using threads you're going to be quickly hit a scalability ceiling given the limits of GIL, which is fine given that the multiprocessing facilities comes at little overhead for our purposes, while being very easy to utilize. We will also be using the xml.sax package for the data extraction from the XML fragments.
This solution approach uses little memory while offering high degrees of parallelism and great efficiency.
In this example I've also leveraged the wikitextparser
dependency (e.g. pip install
wikitextparser
) as it works well for my needs. It extracts the plain-text in addition
to all the internal and external links within the page (in the wikilinks
and external_links
list members, respectively), and I've found it a great kick-start to extracting value from the wiki dump.
To help with the comprehension of the following code, it effectively performs the following steps-
- Load the discrete set of offsets described in the prior section regarding the index file. This tells us where each distinct bz2 "file" is in the total archive
- Move those offsets to a multiprocessing queue to be shared by the worker processes. Note that in the processes we acquire a mutex lock before pulling from the queue as the queue has a defect in most implementations that it returns Empty to callers if there is simultaneous access
- Start the work processes, sharing the queue and lock object. Originally I used a queue per process, pre-assigning an equal number of offsets to each, however in a world of asymmetrical cores (efficiency, performance), and the reality that each offset contains sets of articles of varying complexity, moved back to a shared work queue.
- Each work processes iterates over its queue pulling offsets, decompressing the bz2 content found at that offset into an XML fragment which is recomposed into a valid XML document
- It uses a SAX parser to iterate over that XML document, pulling each discrete page efficiently, calling a process function on each successful extract
- The main thread waits until worker processes are all complete and then exits
In my case I use those extracts for a variety of analysis stages, along with populating the relationships and ontologies of all articles into a graph, however that code has been removed as it isn't necessary for this article.
import bz2
import queue
import wikitextparser
import multiprocessing
import wikiindex
import xml.sax
import io
articles_source = "./enwiki-20230520-pages-articles-multistream.xml.bz2"
index_source = "./enwiki-20230520-pages-articles-multistream-index.txt.bz2"
def process_worker(work_queue, work_queue_lock):
offsets_processed = 0
stream_filehandle = open(articles_source, "rb")
try:
while True:
try:
work_queue_lock.acquire()
stream_offset = work_queue.get(block=False)
finally:
work_queue_lock.release()
stream_filehandle.seek(stream_offset)
decompressor = bz2.BZ2Decompressor()
output = [b'<pages>']
while not decompressor.eof:
output.append(decompressor.decompress(stream_filehandle.read(65536)))
output.append(b'</pages>')
contents = b''.join(output)
process_stream_contents(contents)
offsets_processed += 1
except queue.Empty:
return
finally:
print("Worker process shutting down after processing {} offsets".format(offsets_processed))
stream_filehandle.close()
# this is a placeholder. Presumably you would do something more useful
def your_function(page_id, page_ns, page_title, page_redirect, page_content):
if page_redirect is None:
page_parsed = wikitextparser.parse(page_content)
print("Parsed page {}-{}".format(page_id, page_title))
else:
print("Redirect from {} to {}".format(page_title, page_redirect))
class XMLSAXParser(xml.sax.ContentHandler):
def __init__(self):
super().__init__()
self.read_stack = []
self.page_id = None
self.page_title = None
self.page_redirect = None
self.page_ns = None
self.page_content = None
self.page_count = 0
self.in_page = False
def startElement(self, tag_name, attributes):
self.text_aggregate = []
if tag_name == "page":
self.page_redirect = None
self.page_title = None
self.page_id = None
self.page_ns = None
self.page_content = None
self.in_page = True
else:
if (tag_name == "redirect") and (self.read_stack[-1] == "page"):
self.page_redirect = attributes["title"]
self.read_stack.append(tag_name)
def endElement(self, tag_name):
if (len(self.read_stack) > 0) and (tag_name == self.read_stack[-1]):
del self.read_stack[-1]
else:
raise Exception("Tag ({}) does not match open tag ({}).".format(tag_name, self.read_stack[-1]))
element_string = ''.join(self.text_aggregate)
if tag_name == "page":
self.in_page = False
# We have the whole page so do with it what you will
your_function(self.page_id, self.page_ns, self.page_title, self.page_redirect, self.page_content)
else:
if self.in_page:
if self.read_stack[-1] == "page":
if tag_name == "title":
self.page_title = element_string
elif (tag_name == "id") and self.read_stack[-1]:
self.page_id = int(element_string)
elif tag_name == "ns":
self.page_ns = int(element_string)
elif self.read_stack[-1] == "revision":
# the actual page contents exist as a revision
if tag_name == "text":
self.page_content = element_string
text_aggregate = []
def characters(self, content):
if self.in_page:
self.text_aggregate.append(content)
def process_stream_contents(manyPages):
reader = XMLSAXParser()
try:
byte_stream = io.BytesIO(manyPages)
xml.sax.parse(byte_stream, reader)
finally:
byte_stream.close()
def main():
try:
sorted_stream_offsets = wikiindex.process_index_file(index_source)
if (sorted_stream_offsets is None) or (len(sorted_stream_offsets) < 1):
raise Exception("Index file unsuccessful")
process_count = multiprocessing.cpu_count()
work_queue = multiprocessing.Queue()
work_queue_lock = multiprocessing.Lock()
[work_queue.put(x) for x in sorted_stream_offsets]
jobs = []
for i in range(process_count):
p = multiprocessing.Process(target=process_worker, args=(work_queue,work_queue_lock))
p.start()
jobs.append(p)
for j in jobs:
j.join()
except Exception as e:
print(e)
if __name__ == "__main__":
multiprocessing.freeze_support()
main()
This code has been significantly modified from my "production" source for illustrative purposes. Obviously
you wouldn't hard code file paths like this, for instance. Additionally the function
your_function
, called for each individual extracted and processed wiki article,
is left as a lame placeholder.
process_worker
pulls items from the work queue it has been passed, bz2 decompresses from that
offset in the wiki dump until it hits a bz2 EOF, and then passes the XML fragment to process_stream_contents
.
process_stream_contents
runs an XML SAX extraction on the XML fragment, with each unique Page
element having the relevant data points extract and coalesced, on a full successful extraction passing it to
the user-defined (but in the sample implemented as a trivial, useless print10
to demonstrate things functioning)
A full extraction of tens of millions of articles is still a fairly onerous task, and presumably you're going to be doing intensive processing on the extracts (for instance building a graph from the wikilinks associations between each of the pages), but this approach is efficient and can saturate even a beastly machine, yielding the full benefit of vertical scalability. With a small amount of work it can be adapted for horizontal scalability.
Footnotes
-
the entire Gutenberg library can be downloaded as another easy source ↩︎
-
assuming you abide by the copyright considerations ↩︎
-
the classic SOLR style literal search is no longer competitive — search needs a contextual awareness and domain knowledge now ↩︎
-
there are unofficial torrents which transfer incredibly quickly but tend to be delayed from the official dumps. At time of writing they are over a month behind ↩︎
-
on Windows you can use
certutil
, such ascertUtil -hashfile enwiki-20230520-pages-articles-multistream.xml.bz2 sha1
, where macOS hasshasum
and Linuxsha1sum
↩︎ -
though xz is much better at the cost of exponentially worse compression times ↩︎
-
the current standard is 100 articles per stream, at least until the final set containing the sub-100 remainder, however as good programmers we accomodate variations and assume it's n ↩︎
-
I'm a huge fan of code comments to the greatest extent possible, as the work I'm involved in often means I'm revisiting my own code some time later and comments are a fantastic way of getting back to speed ↩︎
-
the index file is sequentially in order, but failing to find any guarantees of that I'm going to treat it as hostile and assume that it can come in any sequence. It's easy enough in a situation like this to code defensively ↩︎
-
worth noting that the print in the sample placeholder function will significantly slow down the program ↩︎