# Natural Language Queries on Postgres using CQRS (Elasticsearch) & Spring AI

# Overview

Interacting with data through natural language is quickly becoming a mainstream expectation in modern applications. Whether it's asking for "all orders over $50" or "products added last week", enabling users to query systems in natural language can dramatically improve accessibility and productivity of users.

---

# Motivation

## Why Elasticsearch?

Relational databases like **PostgreSQL** are great for storing structured, transactional data. However, when it comes to searching large datasets with flexible, full-text queries, they can be limiting and complicated. To solve this pattern, we can use the CQRS pattern.

### CQRS

**CQRS (Command Query Responsibility Segregation)** is a design pattern that separates **read operations (queries)** from **write operations (commands)**. Instead of using the same model to update and read data, you use two distinct models:

* **Commands** modify data (like "PlaceOrder" or "UpdateOrder")
    
* **Queries** fetch data (like "GetOrderDetails" or "ListOrders")
    

This separation makes systems more **scalable**, **maintainable**, and can help optimize each side individually. You could implement this various ways, e.g.: single database with a **Command module** & **Query module** interfacing with it in the application side, or in this example, using a relational database (Postgresql) for writes and a fast NoSQL (Elasticsearch) for reads. For more information, you can read [Martin Fowler’s Essay](https://martinfowler.com/bliki/CQRS.html) on this topic.

### How does Elasticsearch fit the bill?

**Elasticsearch** complements PostgreSQL by enabling powerful, fast, and fuzzy search capabilities over the same data. By syncing PostgreSQL to Elasticsearch (using kafka connectors), you get the **best of both worlds**: reliable data storage and advanced search. This proves useful for applications like Semantic search, RAG & many other search related use cases.

## Complexity of querying

Traditionally, search in applications is implemented using **static queries**—hardcoded SQL / Query DSL with strict filters or limited parameters (complexity that grows as the users ask to query more fields). But users rarely think in ‘columns’, they want to answers to questions like:

* “Show all orders over $50 from last month”
    
* “Find customers who bought greater than 7 items”
    

Supporting these kind of **dynamic, user-generated** queries is tough with static scripts or code alone. It either leads to overly complex query builders or brittle pattern matching logic with limited flexibility. Using **Spring AI** with **Elasticsearch** powered by an **LLM (OpenAI)**, we can now interpret & translate the queries it into **dynamically generated DSL** making it easier to query the system, in turn making it much more user-friendly and accessible.

In this post, we will:

* Build an infrastructure pipeline with PostgreSQL, Kafka, and Elasticsearch enabling near real-time searchability
    
* Use Spring AI to handle natural language query interpretation
    
* Run and test the system with real examples
    

---

# Setting Up and Running the Project

## Prerequisites

Before getting started, make sure the following tools are installed:

* [**Java 21 using SDKMAN**](https://sdkman.io/install/)
    
* [**Docker & Docker Compose**](https://sdkman.io/install/)
    
* **OpenAI API Key**
    
* Your favorite text editor (I’m using **Intellij** for Java & **VSCode** for **React**)
    
* **Optional:** If running the UI helper application, you need **Node** (v22.12.0+ preferred)
    

**Using WSL?**  
If you're using WSL like I am, the same steps would work for you as well. Just open your favorite terminal editor (I prefer Powershell), & login to WSL.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1746501095883/cc4df870-970d-4f98-8c80-963bd1f0a6f8.png?auto=compress,format&format=webp align="left")

Now you can follow along with the commands described in the post.

**Note:** If you have any trouble following any of the steps, you can check the application README.md in [Github](https://github.com/arunbalachandran/QueryElasticUsingSpringAI/blob/main/README.md) for troubleshooting steps

### 1\. Clone the Repository & Start Infrastructure

You can find the code here: https://github.com/arunbalachandran/QueryElasticUsingSpringAI

```bash
git clone https://github.com/arunbalachandran/QueryElasticUsingSpringAI
cd QueryElasticUsingSpringAI
docker compose up -d

[+] Building 0.0s (0/0)
[+] Running 9/9
 ✔ Network queryelasticusingspringai_dockernet             Create...                                               0.1s
 ✔ Container queryelasticusingspringai-postgres-1          Sta...                                                  0.9s
 ✔ Container queryelasticusingspringai-zookeeper-1         St...                                                   0.8s
 ✔ Container queryelasticusingspringai-elasticsearch-1     Started                                                 0.7s
 ✔ Container queryelasticusingspringai-kibana-1            Start...                                                1.0s
 ✔ Container queryelasticusingspringai-kafka-1             Starte...                                               1.2s
 ✔ Container queryelasticusingspringai-akhq-1              Started                                                 1.6s
 ✔ Container queryelasticusingspringai-debezium-connect-1  Started                                                 1.7s
 ✔ Container queryelasticusingspringai-kafka-connect-1     Started                                                 1.7s
```

### **2\. Open querybackend**

Open the **querybackend** project in your favorite text editor. I’ve used **Intellij**:

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1747977182119/c0bf10d1-94c4-4808-a2e2-d364e4f9c809.png align="center")

### 3\. Set Your Environment for Java

```bash
export OPENAI_API_KEY=your-api-key-here
```

Or, if you're using IntelliJ, you can set it in your run configuration.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1747977229774/568ceb1e-3627-48a6-87a5-4db79f54e473.png align="center")

### 4\. Initialize Elasticsearch Mapping

Create the `order` index:

**Note:** If using Postman, import the collection included in the repository into Postman & you can follow along using the endpoints mentioned here.

```bash
curl -X PUT "http://localhost:9200/order" \
-H "Content-Type: application/json" \
-d '{
    "mappings": {
    "properties": {
        "id": {
        "type": "keyword"
        },
        "product_name": {
        "type": "text",
        "fields": {
            "keyword": {
            "type": "keyword",
            "ignore_above": 256
            }
        }
        },
        "product_qty": {
        "type": "integer"
        },
        "product_price": {
        "type": "double"
        },
        "product_description": {
        "type": "text"
        },
        "created_time": {
        "type": "date"
        },
        "updated_time": {
        "type": "date"
        }
    }
    }
}'
```

### 5\. Start the Application

```java
./gradlew bootRun
```

Or, if using Intellij, run the ‘**bootRun’** gradle task

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1747977767914/21e6c2b5-ba26-4204-b8fb-b79f04a07789.png align="center")

---

## Data Flow: PostgreSQL ➝ Kafka ➝ Elasticsearch

To get data flowing from your relational DB to Elasticsearch:

#### 1\. Set Up Kafka Connectors

```bash
# PostgreSQL to Kafka (Debezium)
curl -X POST http://localhost:8084/connectors -H "Content-Type: application/json" -d '{
    "name": "postgres-to-kafka-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname" : "querybackend",
        "topic.prefix": "connector",
        "tasks.max": "1",
        "schemas.enable": "false",
        "schema.include.list": "public",
        "table.include.list": "public.orders",
        "signal.data.collection": "public.debezium_signal",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "auto.register.schemas": true,
        "topic.creation.default.replication.factor": 1,
        "topic.creation.default.partitions": 1,
        "transforms": "extractlatest",
        "transforms.extractlatest.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
        "transforms.extractlatest.field": "after",
        "time.precision.mode": "connect",
        "decimal.handling.mode": "double",
        "heartbeat.interval.ms": "1800000",
        "snapshot.mode": "initial",
        "plugin.name": "pgoutput",
        "slot.name" : "query_slot_orders_01"
    }
}'

# Kafka to Elasticsearch (Confluent Sink)
curl -X POST http://localhost:8084/connectors -H "Content-Type: application/json" -d '{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "connector.public.orders",
    "schemas.enable": false,
    "schema.ignore": true,
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,    
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "type.name": "_doc",
    "key.ignore": false,
    "index": "orders",
    "connection.url": "http://elasticsearch:9200",
    "transforms": "InsertKey,ExtractId",
    "transforms.InsertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.InsertKey.fields": "id",
    "transforms.ExtractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.ExtractId.field": "id",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.drop.deletes": "false",
    "behavior.on.null.values": "delete"
  }
}'
```

---

# Querying with Natural Language

Once everything is up and running, it's time to test the actual use case: converting plain English queries into Elasticsearch queries using Spring AI.

## Testing the Backend API

### Add Test Data

```bash
curl -X POST http://localhost:8080/api/v1/orders -H "Content-Type: application/json" -d '{
  "productName": "Peaches",
  "productQty": 8,
  "productPrice": 12,
  "productDescription": "Peaches"
}'
```

### Try a Natural Language Query

The moment of truth! We will harness the power of OpenAI’s API to parse our queries & fetch the data from our database.

**Note:** I’ve already added a few sample records in the database.

```java
curl -X POST http://localhost:8080/api/v1/elastic/query \
-H "Content-Type: application/json" \
-d '{
    "query": "Get me all the orders with quantity greater than 5"
}'

# Response
[
    {
        "id": "bfacba9b-66bb-480f-b63c-eeb1f0381a21",
        "productName": "Orange",
        "productQty": 6,
        "productPrice": 10.0,
        "productDescription": "California Oranges",
        "createdTime": "2025-05-20T07:52:59.54",
        "updatedTime": "2025-05-20T07:52:59.541"
    },
    {
        "id": "4c3e65ec-8684-480a-a9b0-50925209146d",
        "productName": "Mangoes",
        "productQty": 12,
        "productPrice": 15.0,
        "productDescription": "Alphonso Mangoes",
        "createdTime": "2025-05-20T07:53:30.606",
        "updatedTime": "2025-05-20T07:53:30.606"
    },
    {
        "id": "7f1838cf-577a-42bd-b4a2-d58d3a70cedd",
        "productName": "Apple",
        "productQty": 8,
        "productPrice": 10.0,
        "productDescription": "Apples",
        "createdTime": "2025-05-20T07:52:46.101",
        "updatedTime": "2025-05-20T07:52:46.101"
    },
    {
        "id": "6fa9a0f8-b2c8-4c1c-86a5-b5344d2f2a58",
        "productName": "Peaches",
        "productQty": 7,
        "productPrice": 12.0,
        "productDescription": "Peaches",
        "createdTime": "2025-05-20T07:54:01.823",
        "updatedTime": "2025-05-20T07:54:01.823"
    }
]
```

Behind the scenes, Spring AI uses the OpenAI API to interpret the query and translate it into a DSL that Elasticsearch understands. The system is designed to be extensible, so you can add more context, prompt templates, or user data as needed.

---

# Seeing it in action

I’ve included a sample React based UI app, that we can use to see the API in action in the context of a Real World Application

### Run UI Code

Navigate to the **queryui** folder & run the following commands:

```bash
cd queryui
# install dependencies if you haven't already
npm install
# run the application
npm run dev
# Application starts on port 5173
```

Navigate to the application on http://localhost:5173 & you should see a UI that looks like this:

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1748820183748/a2521803-5855-4d84-a7cf-741ed365e9be.png align="center")

Run a query & you should see the results update in the screen:

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1748820025361/6b72415d-52ca-4e11-830c-26c5e6f06617.png align="center")

---

# Under the Hood

## The connector

* There are 2 connectors at play here. The Debezium connector & the Confluent connector.
    
* The debezium connector is responsible for relaying the data from Postgres to Kafka
    
* While, the confluent connector is responsible for relaying the data from Kafka to Elasticsearch.
    
* 💡Note: It may be possible to use Debezium to stream the data both as the source & the sink connectors, but is a bit more cumbersome to setup. This is left as an exercise to the reader.
    

## Spring AI integration

Let’s take a closer look at the `PromptService` class. This is where the magic happens. It’s responsible for turning user-written natural language queries into structured Elasticsearch queries with the help of Spring AI, which provides us a wrapper to the Open AI Large Language Model (LLM).

### Purpose of the Service

The `PromptService` serves as the **bridge between user intent and machine-readable queries**. When a user enters a question like *“Show me all orders above $50 from last month”*, this service constructs a prompt and uses an LLM to generate a precise Elasticsearch query based on the structure of the index.

---

### Key Components

#### ChatModel

* Injects the LLM backend provided by Spring AI. Here configured to use OpenAI (`gpt-4o`).
    
* This is the model responsible for converting natural language into JSON-based Elasticsearch queries.
    

#### ElasticSearchService

* A helper service to interact with the Elasticsearch index, get mappings, and perform searches.
    

---

### Prompt Initialization (`@PostConstruct`)

```java
@PostConstruct
public void init() {
    String mapping = elasticsearchService.getOrderMapping();
    this.basePrompt = """
        I need you to convert natural language user queries into elasticsearch queries...
        ...
        """.formatted(mapping);
}
```

* On application startup, we retrieve the **Elasticsearch mapping** for the `Order` index.
    
* This mapping is embedded into the `basePrompt`. It gives the LLM context about what fields exist (e.g., `productName`, `productPrice`, `createdTime`).
    
* The prompt instructs the LLM **how to behave**, e.g., “don't use markdown”, “output the mapping without formatting,” and “understand the semantics of what’s being asked.”
    

### Processing the User Query

```java
public List<OrderDTO> processPrompt(String userQuery) {
    String fullPrompt = basePrompt + "\nUser query: " + userQuery;
    ...
}
```

1. The `userQuery` (e.g., “Get me all the orders with quantity greater than 5”) is appended to the `basePrompt`.
    
2. This becomes a **single, complete prompt** that’s fed to the LLM.
    

```java
public List<OrderDTO> processPrompt(String userQuery) {
    String fullPrompt = basePrompt + "\nUser query: " + userQuery;
    log.info("Prompt being used: {}", fullPrompt);
    ChatResponse chatResponse = chatModel.call(
            new Prompt(
                    fullPrompt,
                    OpenAiChatOptions.builder()
                            .model("gpt-4o")
                            .temperature(1.0)
                            .build()
            )
    );
    String elasticQuery = chatResponse.getResult().getOutput().getText();
    log.info("Elastic query: {}", elasticQuery);
    Map<String, Object> response = elasticsearchService.search(elasticQuery);
    return ElasticMapper.mapToOrderDTO(response);
}
```

3. The prompt is sent to the LLM ([`chatModel.call`](http://chatModel.call)`()`), and we expect a raw JSON Elasticsearch query in return.
    

```java
String elasticQuery = chatResponse.getResult().getOutput().getText();
```

4. The LLM's output is extracted as a string. This is the dynamically generated **Elasticsearch DSL query**.
    

```java
Map<String, Object> response = elasticsearchService.search(elasticQuery);
return ElasticMapper.mapToOrderDTO(response);
```

5. The generated query is executed via [`elasticsearchService.search`](http://elasticsearchService.search)`()`.
    
6. The raw search results are then mapped to `OrderDTO` objects for use in the API.
    

---

### Why This Matters

This pattern allows your application to **understand and interpret human-friendly input** without rigid UI constraints or static filters. It unlocks a much more intuitive experiences, especially valuable for user-facing search interfaces & dashboards.

It also **abstracts the complexity of search syntax** away from the user while still allowing them to perform advanced, flexible queries based on their intent.

## Sequence Diagram

To better illustrate the flow, here’s a sequence diagram, showing the order of events in the application:

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1748837305442/e9bf0fe2-d221-4565-933f-27cd99f34fe7.png align="center")

---

# Wrapping it up

In this post, I’ve demonstrated how we can:

* Set up a full-stack data ingestion pipeline & search stack using PostgreSQL, Kafka, and Elasticsearch
    
* Use Spring AI and OpenAI's LLMs to handle natural language queries
    
* Automatically translate plain English into structured Elasticsearch queries
    

### Future Use Cases

Here are a few ideas for extending this:

* Add user roles and access filters to contextualize results
    
* Enable conversational memory with a chat-style interface allowing users to ask follow up questions
    
* Use RAG (Retrieval-Augmented Generation) for more dynamic query understanding
    
* Integrate Kibana dashboards that go along with the generated results
    

---

This foundation gives you a practical starting point for building intelligent, search-driven interfaces. If you're exploring ways to make data more accessible, this is a compelling approach worth trying!

Got ideas or questions? Drop them in the comments! 💬
