I have an API that tags documents based on names from a predefined standard name list. Before tagging, I receive a list of file URLs (public Firebase URLs), typically around 40–80 files, which need to be processed using an LLM (currently OpenAI).
The issue I’m facing is with memory usage. When I download these documents for processing, the application sometimes crashes due to out-of-memory (OOM) errors. The instance I’m using has only 2GB RAM, and I’d prefer not to increase the instance size until I’ve fully optimized the code.
The problem seems to occur because multiple PDFs are being processed asynchronously, and at some point, many of them are held in memory simultaneously. I also perform additional operations like base64 encoding for images, which further increases memory usage. Since I need to return all document tags within about a minute, I’m using parallel processing.
Current approach:
- There are 10–15 document types that I send directly to OpenAI.
- For images (JPG, PNG, JPEG): I download them, base64 encode them, and send them to OpenAI.
- For PDFs: I download them, upload them via OpenAI’s file API, and then send the file ID for processing.
- All of this is done in parallel using semaphores:
OPENAI_SEMAPHORE = 30DOWNLOAD_SEMAPHORE = 15
Problem:
Even with semaphores, memory usage spikes because multiple large files are downloaded and processed at the same time. This leads to OOM crashes.
Questions:
- How can I reduce memory usage in this workflow?
- Is there a better architectural approach to handle this kind of workload?
- How can I avoid having too many documents in memory at once while still maintaining performance constraints?
​
async def _stream_download_file(url: str, ext: str) -> str:
"""
Stream-download a file to disk in 64KB chunks.
Never holds the full file in memory — writes directly to disk.
Returns the path to the temp file.
"""
async with DOWNLOAD_SEMAPHORE:
temp_path = None
try:
temp = tempfile.NamedTemporaryFile(delete=False, suffix=ext or ".tmp")
temp_path = temp.name
temp.close()
async with http_client.stream("GET", url, follow_redirects=True) as response:
response.raise_for_status()
with open(temp_path, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size=65536):
f.write(chunk)
return temp_path
except asyncio.TimeoutError:
_cleanup_local_file(temp_path, "failed download")
raise Exception(f"Download timed out after {DOWNLOAD_TIMEOUT}s")
except Exception as e:
_cleanup_local_file(temp_path, "failed download")
raise Exception(f"Download failed: {str(e)}")