Registration is open - Live, Instructor-led Online Classes - Elasticsearch in March - Solr in April - OpenSearch in May. See all classes


Solr Streaming Expressions for Collection auto-updating

One of the things that were extensively changed in Solr 6.0 is the Streaming Expressions and what we can do with them (hint: amazing stuff!). We already described Solr SQL support. Today, we’ll dig into the functionality that makes Solr SQL support possible – the Streaming Expressions. Using Streaming Expressions we will put together a mechanism that lets us re-index data in a given Solr Collection – all within Solr, without any external moving parts or dependencies.

Setting up data source

For the purpose of this blog post we will use MySQL as the source of data. For extra fun, we’ll make that MySQL run in Docker. To start MySQL in a Docker container just use the following command:

docker run --name exampleMySQL -p 3306:3306 -e MYSQL_ROOT_PASSWORD=solr -d mysql

Of course, you don’t need to use Docker, but this is the simplest way of getting an environment independent installation for the purpose of this post. We used the MYSQL_ROOT_PASSWORD property to set the root user password, which we will need to connect to MySQL.

Once we have that we can create a database and a simple table from which we will fetch data. We do that by running the following commands:

CREATE database users;

CREATE TABLE users (id INT, name VARCHAR(255), INDEX USING BTREE (id));

Once we have the database and the table we can populate it with some sample data:

INSERT INTO users (id, name) VALUES (1, 'First user');

INSERT INTO users (id, name) VALUES (2, 'Second user');

Setting up Solr

Of course, we will be using Solr 6 and its cloud mode – SolrCloud. However, before we start the instance we need to put MySQL JDBC driver in the lib directory of our Solr server because we will need to connect to MySQL from Solr. You can download the driver from https://dev.mysql.com/downloads/connector/j/.

Once that is done we can start our Solr instance:

bin/solr start -Dsolr.autoSoftCommit.maxTime=1000 -c

Apart from starting in the cloud mode (that’s what the -c switch is for) we’ve set the solr.autoSoftCommit.maxTime to 1000, which means that the Solr soft autocommit will be sent once every second, just for near real-time visibility of search results. We also use the embedded ZooKeeper instance, which is enough for our test (available on the 9983 port by default).

Once Solr is started we can push an example configuration to ZooKeeper:

bin/solr zk -upconfig -d server/solr/configsets/data_driven_schema_configs/conf/ -n testConfig -z localhost:9983

To keep things simple, we will use one of the configuration sets provided with Solr – the “data driven” example, which we can also call schema-less. It will add fields we need, so we don’t have to worry about the index structure and concentrate on other things.

After that, we can create our Collection, the one to which we will index data. We will call it the same as the table – users. Next, let’s create that Collection:

curl -XPOST 'http://localhost:8983/solr/admin/collections?action=CREATE&name=users&numShards=1&replicationFactor=1&collection.configName=testConfig'

We are ready.

JDBC stream source

We now need to create a Streaming Expression that will fetch data from MySQL data source. We can do that by using one of Solr 6 stream sources – the jdbc one. This looks as follows:

jdbc(
  connection="jdbc:mysql://localhost/users?user=root&password=solr", 
  sql="SELECT id, name FROM users", 
  sort="id asc", 
  driver="com.mysql.jdbc.Driver"
)

We need to provide the connection string, the SQL query, the sorting expression and the driver we want to use. We can test it by using the /stream request handler and running a request as follows:

curl --data-urlencode 'expr=jdbc(connection="jdbc:mysql://localhost/users?user=root&password=solr", sql="SELECT id, name FROM users", sort="id asc", driver="com.mysql.jdbc.Driver")' http://localhost:8983/solr/users/stream

The results should look as follows:

{"result-set":{"docs":[
{"name":"First user","id":1},
{"name":"Second user","id":2},
{"EOF":true,"RESPONSE_TIME":62}]}}

As you can see our Solr was able to connect to MySQL, retrieve the data and return it.

Indexing the data

We were able to retrieve data from MySQL data source so now let’s index it. We can do that by using the update streaming expression. We will use the above configured jdbc expression, provide the name of the target Collection, and the batch size:

update(users, 
       batchSize=10, 
       jdbc(connection="jdbc:mysql://localhost/users?user=root&password=solr", sql="SELECT id, name FROM users", sort="id asc", driver="com.mysql.jdbc.Driver")
)

To test this we can just run:

curl --data-urlencode 'expr=update(users, batchSize=10, jdbc(connection="jdbc:mysql://localhost/users?user=root&password=solr", sql="SELECT id, name FROM users", sort="id asc", driver="com.mysql.jdbc.Driver"))' http://localhost:8983/solr/users/stream

The response from the command should be similar to the following one:

{"result-set":{"docs":[
{"batchIndexed":2,"totalIndexed":2,"worker":"users_shard1_replica1","batchNumber":1},
{"EOF":true,"RESPONSE_TIME":141}]}}

As we can see, we’ve indexed two documents. We can make sure they are visible like this:

curl 'localhost:8983/solr/users/select?q=*:*&indent=true'

The results should be our two documents:

<?xml version="1.0" encoding="UTF-8"?>
<response>

<lst name="responseHeader">
  <bool name="zkConnected">true</bool>
  <int name="status">0</int>
  <int name="QTime">4</int>
  <lst name="params">
    <str name="q">*:*</str>
    <str name="indent">true</str>
  </lst>
</lst>
<result name="response" numFound="2" start="0">
  <doc>
    <arr name="name">
      <str>First user</str>
    </arr>
    <str name="id">1</str>
    <long name="_version_">1533572347251916800</long></doc>
  <doc>
    <arr name="name">
      <str>Second user</str>
    </arr>
    <str name="id">2</str>
    <long name="_version_">1533572347294908416</long></doc>
</result>
</response>

Everything works as it should, but what if we were to add two more documents, just like this, to our MySQL database:

INSERT INTO users (id, name) VALUES (3, 'Third user');

INSERT INTO users (id, name) VALUES (4, 'Fourth user');

These documents wouldn’t be indexed. That’s because we’ve run the update streaming expression only once and we need something running periodically and doesn’t force us to do that manually.

Making that automatic

We will use just another Streaming Expressions – one called daemon. It lets us set the identifier, interval and a Streaming Expression that will be run at the defined intervals. So let’s wrap the above update Streaming Expression and run it using daemon:

daemon(id="12345", 
 runInterval="60000",
 update(users, 
        batchSize=10, 
        jdbc(connection="jdbc:mysql://localhost/users?user=root&password=solr", sql="SELECT id, name FROM users", sort="id asc", driver="com.mysql.jdbc.Driver")
 )
)

We defined the identifier of the daemon (using the id property), which we can use to control this functionality. We also specified the runInterval and the Streaming Expression that will be run every runInterval milliseconds.

To send it to Solr we use the following command:

curl --data-urlencode 'expr=daemon(id="12345", runInterval="10000", update(users, batchSize=10, jdbc(connection="jdbc:mysql://localhost/users?user=root&password=solr", sql="SELECT id, name FROM users", sort="id asc", driver="com.mysql.jdbc.Driver")))' http://localhost:8983/solr/users/stream

The response from Solr should be similar to the following one telling us that everything is in order:

{"result-set":{"docs":[
{"DaemonOp":"Deamon:12345 started on users_shard1_replica1"},
{"EOF":true}]}}

Now, our additional documents will be indexed and what’s more the indexation will be repeated every one minute (according to the defined interval). We can test it by running the same query we used previously:

curl 'localhost:8983/solr/users/select?q=*:*&indent=true'

The results returned by Solr:

<?xml version="1.0" encoding="UTF-8"?>
<response>

<lst name="responseHeader">
  <bool name="zkConnected">true</bool>
  <int name="status">0</int>
  <int name="QTime">0</int>
  <lst name="params">
    <str name="q">*:*</str>
    <str name="indent">true</str>
  </lst>
</lst>
<result name="response" numFound="4" start="0">
  <doc>
    <arr name="name">
      <str>First user</str>
    </arr>
    <str name="id">1</str>
    <long name="_version_">1533572993491402752</long></doc>
  <doc>
    <arr name="name">
      <str>Second user</str>
    </arr>
    <str name="id">2</str>
    <long name="_version_">1533572993492451328</long></doc>
  <doc>
    <arr name="name">
      <str>Third user</str>
    </arr>
    <str name="id">3</str>
    <long name="_version_">1533572993492451329</long></doc>
  <doc>
    <arr name="name">
      <str>Fourth user</str>
    </arr>
    <str name="id">4</str>
    <long name="_version_">1533572993493499904</long></doc>
</result>
</response>

As you can see, the two new documents were indexed and the _version_ of the two previous ones changed, which means that the data was fully re-indexed.

Controlling Daemon Streaming Expression

The nifty thing is that we can control and check the execution of the daemon we’ve defined by using a set of API calls and the specified identifier.

For example, to list all running daemon Streaming Expressions we would run:

curl -XPOST 'http://localhost:8983/solr/users/stream?action=list'

With the response from Solr similar to this:

{"result-set":{"docs":[
{"startTime":1462524558905,"stopTime":0,"id":"12345","state":"TIMED_WAITING","iterations":34},
{"EOF":true}]}}

As you can see, we have a single daemon instance running inside Solr.

We can stop and start the daemon Streaming Expressions by running:

curl -XPOST 'http://localhost:8983/solr/users/stream?action=start&id=123456'

curl -XPOST 'http://localhost:8983/solr/users/stream?action=stop&id=12345'

Finally, we can remove the daemon Streaming Expression instance from Solr:

curl -XPOST 'http://localhost:8983/solr/users/stream?action=kill&id=123456'

It’s as simple as that.

Interested in Solr Streaming Expressions? Subscribe to this blog or follow @sematext – we have more Streaming Expressions blog posts in the queue. If you need any help with Solr / SolrCloud – don’t forget @sematext does Solr Consulting, Production Support, as well as Solr Training!

Start Free Trial