Indexing 1.4 Million Amazon Products into Elasticsearch in Under 90 Seconds — and Beyond
At 1.4 million documents, a single index works fine. At 1 billion+, everything changes.
Section 1: Scaling to 1 billion+ documents: what changes
At 1.4 million documents, a single index works fine. At 1 billion+, everything changes.
The first thing that breaks is your shard strategy. With default settings, Elasticsearch allows 1,000 shards per node. Sounds like a lot, until you realize that each index creates shards, each ILM rollover creates a new index, and suddenly you're hitting the limit without understanding why your cluster is rejecting writes.
Shard sizing matters more than shard count. The rule of thumb: aim for 10–50 GB per shard. At 1 billion documents, that means planning your index architecture upfront:
{
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1
}
}A 5-shard index with 200M documents per shard at ~200 bytes each ≈ 40 GB/shard. That's the sweet spot.
ILM (Index Lifecycle Management) becomes non-negotiable. At scale, you can't manage indices manually. You need a policy that handles the full lifecycle automatically:
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_docs": 50000000,
"max_size": "40gb"
}
}
},
"warm": {
"min_age": "30d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 }
}
},
"delete": {
"min_age": "365d",
"actions": { "delete": {} }
}
}
}
}The rollover pattern replaces a single giant index with a chain of time-bounded indices (products-000001, products-000002...) behind an alias. Your application always writes to the alias — it never needs to know which physical index is active.
Heap memory becomes your bottleneck. At 1B+ documents, field data and segment metadata consume significant JVM heap. Two rules: never allocate more than 50% of RAM to the heap, and never exceed 32 GB (above that, the JVM loses compressed object pointers and performance drops sharply).
Section 2: Mistakes I made along the way
Mistake #1: Starting with dynamic: true
The default Elasticsearch behavior maps fields automatically. Seems convenient, until a CSV column that usually contains numbers suddenly has a string value, and ES infers the wrong type. You end up with a price field mapped as text instead of float, and every aggregation breaks silently.
Fix: always start with dynamic: strict. Define your mapping explicitly. The extra 10 minutes upfront saves hours of debugging later.
Mistake #2: Forgetting to restore settings after ingestion
Setting refresh_interval: -1 and number_of_replicas: 0 before bulk indexing is correct. Forgetting to restore them after is a production incident waiting to happen. New documents become invisible to searches, and your cluster has no redundancy.
Always wrap your ingestion in a try/finally:
try:
optimize_for_import(client, index)
run_ingestion(client, index)
finally:
restore_after_import(client, index)Mistake #3: Skipping the force merge
After a large ingestion, the Lucene index can contain hundreds of segments. Search performance degrades noticeably. Force merge consolidates them, but it's I/O intensive, so run it once, after ingestion, never continuously. I learned this the hard way after wondering why searches were slow on a freshly indexed dataset.
Mistake #4: Using _id auto-generation for re-indexable data
If you let ES auto-generate _id values, every re-ingestion creates duplicate documents instead of overwriting existing ones. Using the business identifier (ASIN in this case) as _id makes your pipeline idempotent, you can re-run it safely anytime.
Section 3: Rewriting the pipeline in TypeScript
If your stack is Node.js rather than Python, here's a standalone script that replicates the same pipeline using the official @elastic/elasticsearch client:
import { Client } from '@elastic/elasticsearch';
import { createReadStream } from 'fs';
import { parse } from 'csv-parse';
const client = new Client({ node: 'http://localhost:9200' });
const INDEX = 'amazon-products';
const CHUNK_SIZE = 2000;
interface RawRow {
asin: string;
title: string;
stars: string;
price: string;
isBestSeller: string;
}
interface Product {
asin: string;
title: string;
stars: number | null;
price: number | null;
isBestSeller: boolean;
ingested_at: string;
}
function transform(row: RawRow): Product | null {
if (!row.asin) return null;
return {
asin: row.asin,
title: row.title || '',
stars: row.stars ? parseFloat(row.stars) : null,
price: row.price ? parseFloat(row.price) : null,
isBestSeller: row.isBestSeller === 'True',
ingested_at: new Date().toISOString(),
};
}
async function optimizeForImport() {
await client.indices.putSettings({
index: INDEX,
settings: { refresh_interval: '-1', number_of_replicas: 0 },
});
}
async function restoreAfterImport() {
await client.indices.putSettings({
index: INDEX,
settings: { refresh_interval: '1s', number_of_replicas: 1 },
});
await client.indices.refresh({ index: INDEX });
}
async function bulkIndex(docs: Product[]) {
const operations = docs.flatMap(doc => [
{ index: { _index: INDEX, _id: doc.asin } },
doc,
]);
const { errors, items } = await client.bulk({ operations });
if (errors) {
const failed = items.filter(i => i.index?.error);
console.error(`${failed.length} errors in batch`);
}
}
async function run(csvPath: string) {
await optimizeForImport();
let batch: Product[] = [];
let total = 0;
try {
const parser = createReadStream(csvPath).pipe(
parse({ columns: true, skip_empty_lines: true })
);
for await (const row of parser) {
const doc = transform(row as RawRow);
if (!doc) continue;
batch.push(doc);
if (batch.length >= CHUNK_SIZE) {
await bulkIndex(batch);
total += batch.length;
console.log(`Indexed ${total} documents...`);
batch = [];
}
}
if (batch.length > 0) {
await bulkIndex(batch);
total += batch.length;
}
console.log(`✓ Done — ${total} documents indexed`);
} finally {
await restoreAfterImport();
await client.indices.forcemerge({ index: INDEX, max_num_segments: 1 });
console.log('✓ Settings restored, force merge complete');
}
}
run('./amazon-products.csv').catch(console.error);Key differences from the Python version:
csv-parsewith async iteration replaces pandas chunking. Same streaming principle, no full file load in memory.flatMapfor bulk operations: the Elasticsearch bulk API requires alternating action/document pairs.flatMapis the cleanest way to build that structure in TypeScript.try/finallyguarantees that settings are always restored, even if the ingestion throws an error midway.- Same idempotency:
_id: doc.asinensures re-runs overwrite rather than duplicate.
To run it:
npm init -y
npm install @elastic/elasticsearch csv-parse tsx
npx tsx index.ts