gg.dev
EN

Indexing 1.4 Million Amazon Products into Elasticsearch in Under 90 Seconds — and Beyond

At 1.4 million documents, a single index works fine. At 1 billion+, everything changes.

Section 1: Scaling to 1 billion+ documents: what changes

At 1.4 million documents, a single index works fine. At 1 billion+, everything changes.

The first thing that breaks is your shard strategy. With default settings, Elasticsearch allows 1,000 shards per node. Sounds like a lot, until you realize that each index creates shards, each ILM rollover creates a new index, and suddenly you're hitting the limit without understanding why your cluster is rejecting writes.

Shard sizing matters more than shard count. The rule of thumb: aim for 10–50 GB per shard. At 1 billion documents, that means planning your index architecture upfront:

json
{
  "settings": {
    "number_of_shards": 5,
    "number_of_replicas": 1
  }
}

A 5-shard index with 200M documents per shard at ~200 bytes each ≈ 40 GB/shard. That's the sweet spot.

ILM (Index Lifecycle Management) becomes non-negotiable. At scale, you can't manage indices manually. You need a policy that handles the full lifecycle automatically:

json
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_docs": 50000000,
            "max_size": "40gb"
          }
        }
      },
      "warm": {
        "min_age": "30d",
        "actions": {
          "shrink": { "number_of_shards": 1 },
          "forcemerge": { "max_num_segments": 1 }
        }
      },
      "delete": {
        "min_age": "365d",
        "actions": { "delete": {} }
      }
    }
  }
}

The rollover pattern replaces a single giant index with a chain of time-bounded indices (products-000001, products-000002...) behind an alias. Your application always writes to the alias — it never needs to know which physical index is active.

Heap memory becomes your bottleneck. At 1B+ documents, field data and segment metadata consume significant JVM heap. Two rules: never allocate more than 50% of RAM to the heap, and never exceed 32 GB (above that, the JVM loses compressed object pointers and performance drops sharply).

Section 2: Mistakes I made along the way

Mistake #1: Starting with dynamic: true

The default Elasticsearch behavior maps fields automatically. Seems convenient, until a CSV column that usually contains numbers suddenly has a string value, and ES infers the wrong type. You end up with a price field mapped as text instead of float, and every aggregation breaks silently.

Fix: always start with dynamic: strict. Define your mapping explicitly. The extra 10 minutes upfront saves hours of debugging later.

Mistake #2: Forgetting to restore settings after ingestion

Setting refresh_interval: -1 and number_of_replicas: 0 before bulk indexing is correct. Forgetting to restore them after is a production incident waiting to happen. New documents become invisible to searches, and your cluster has no redundancy.

Always wrap your ingestion in a try/finally:

python
try:
    optimize_for_import(client, index)
    run_ingestion(client, index)
finally:
    restore_after_import(client, index)

Mistake #3: Skipping the force merge

After a large ingestion, the Lucene index can contain hundreds of segments. Search performance degrades noticeably. Force merge consolidates them, but it's I/O intensive, so run it once, after ingestion, never continuously. I learned this the hard way after wondering why searches were slow on a freshly indexed dataset.

Mistake #4: Using _id auto-generation for re-indexable data

If you let ES auto-generate _id values, every re-ingestion creates duplicate documents instead of overwriting existing ones. Using the business identifier (ASIN in this case) as _id makes your pipeline idempotent, you can re-run it safely anytime.

Section 3: Rewriting the pipeline in TypeScript

If your stack is Node.js rather than Python, here's a standalone script that replicates the same pipeline using the official @elastic/elasticsearch client:

typescript
import { Client } from '@elastic/elasticsearch';
import { createReadStream } from 'fs';
import { parse } from 'csv-parse';

const client = new Client({ node: 'http://localhost:9200' });
const INDEX = 'amazon-products';
const CHUNK_SIZE = 2000;

interface RawRow {
  asin: string;
  title: string;
  stars: string;
  price: string;
  isBestSeller: string;
}

interface Product {
  asin: string;
  title: string;
  stars: number | null;
  price: number | null;
  isBestSeller: boolean;
  ingested_at: string;
}

function transform(row: RawRow): Product | null {
  if (!row.asin) return null;
  return {
    asin: row.asin,
    title: row.title || '',
    stars: row.stars ? parseFloat(row.stars) : null,
    price: row.price ? parseFloat(row.price) : null,
    isBestSeller: row.isBestSeller === 'True',
    ingested_at: new Date().toISOString(),
  };
}

async function optimizeForImport() {
  await client.indices.putSettings({
    index: INDEX,
    settings: { refresh_interval: '-1', number_of_replicas: 0 },
  });
}

async function restoreAfterImport() {
  await client.indices.putSettings({
    index: INDEX,
    settings: { refresh_interval: '1s', number_of_replicas: 1 },
  });
  await client.indices.refresh({ index: INDEX });
}

async function bulkIndex(docs: Product[]) {
  const operations = docs.flatMap(doc => [
    { index: { _index: INDEX, _id: doc.asin } },
    doc,
  ]);
  const { errors, items } = await client.bulk({ operations });
  if (errors) {
    const failed = items.filter(i => i.index?.error);
    console.error(`${failed.length} errors in batch`);
  }
}

async function run(csvPath: string) {
  await optimizeForImport();
  
  let batch: Product[] = [];
  let total = 0;

  try {
    const parser = createReadStream(csvPath).pipe(
      parse({ columns: true, skip_empty_lines: true })
    );

    for await (const row of parser) {
      const doc = transform(row as RawRow);
      if (!doc) continue;

      batch.push(doc);
      if (batch.length >= CHUNK_SIZE) {
        await bulkIndex(batch);
        total += batch.length;
        console.log(`Indexed ${total} documents...`);
        batch = [];
      }
    }

    if (batch.length > 0) {
      await bulkIndex(batch);
      total += batch.length;
    }

    console.log(`✓ Done — ${total} documents indexed`);
  } finally {
    await restoreAfterImport();
    await client.indices.forcemerge({ index: INDEX, max_num_segments: 1 });
    console.log('✓ Settings restored, force merge complete');
  }
}

run('./amazon-products.csv').catch(console.error);

Key differences from the Python version:

  • csv-parse with async iteration replaces pandas chunking. Same streaming principle, no full file load in memory.
  • flatMap for bulk operations: the Elasticsearch bulk API requires alternating action/document pairs. flatMap is the cleanest way to build that structure in TypeScript.
  • try/finally guarantees that settings are always restored, even if the ingestion throws an error midway.
  • Same idempotency: _id: doc.asin ensures re-runs overwrite rather than duplicate.

To run it:

sh
npm init -y
npm install @elastic/elasticsearch csv-parse tsx
npx tsx index.ts