Monday, August 15, 2011

Fixing Java Unix Domain Sockets (JUDS): Too many file descriptors

A while ago, we started the development of a distributed system to collect usage statistics about a search engine (based on Apache Solr) and a web portal (typical Java web application) from scratch in Java, running on a Linux platform. We searched for the best way of implementing such a system, and found really interesting and valuable tips in the architectural notes of LinkedIn's Kafka (a great article describing its main features can be found in http://incubator.apache.org/kafka/design.html). Our system is split into two parts: a local component running in the same physical machine that each instance of search engine/web portal and a remote component, that would poll each local component for statistics.
One of the first problems that we faced was: how to connect each instance of search engine/web portal to the associated local component? The first thought was: "OK, if each local component will run in the same machine that each search engine or web portal, and Linux will be the underlaying operating system, then we should use a Unix Domain Socket". The advantages of Unix Domain Sockets over TCP/IP sockets, even on the loopback interface, are very clear (you can see http://lists.freebsd.org/pipermail/freebsd-performance/2005-February/001143.html for more information). But, if we will develop using Java, how can we use Unix domain sockets, given that there is no standard way of using them from the Java.
Well, the answer was JUDS (http://github.com/mcfunley/juds). JUDS provides classes to address the need in Java for accessing Unix domain sockets. The source is provided for a shared library containing the compiled native C code, which is called into by the Java UnixDomainSocket classes via JNI (Java Native Interface) to open, close, unlink (delete), read, and write to Unix domain sockets. We adopted it for our project, and it worked great: the local component may or may not be working, and the performance of the search engines and web portals would not be affected. They open a connection to the local component and send a statistics message without knowing if it was working or not (if it is not working, the connection exception is simply ignored), therefore a failure in the statistics system cannot affect the normal functioning of the engines and portals.
The distributed statistics system was put into production for the first time by our customer, and it failed :). So, it was just turned off, and as expected the engines and portals continued working. After a few hours of the failure, the search engines and portals stopped responding. We analyzed the logs and found the following Java exception:

java.io.IOException: Too many open files.

over and over.
After a lot of headaches, we found the problem. JUDS provides a native open() method in order to open the socket and connect to the server (in stream mode). The code of the native open() method was this (some code cut off):


[...]
s = socket(PF_UNIX, SOCK_TYPE(jSocketType), 0);
ASSERTNOERR(s == -1, "nativeOpen: socket");
ASSERTNOERR(connect(s, (struct sockaddr *)&sa, salen) == -1,"nativeOpen: connect");
[...]
return s;

There was a clear error here. The socket is created using the socket() call, followed by the connect() call. But if the connect() fails, the socket is NOT closed. So, basically, like the connect() call was always failing because the local component was not working, the operating system ran out of file descriptors.
The fix is as follows:


[...]
if (connect(s, (struct sockaddr *)&sa, salen) == -1) {

perror("nativeOpen: connect");
int close = close(s);
ASSERTNOERR(close == -1, "nativeOpen: close connect error socket");
return -1;
}

[...]
return s;

It worked like a charm! Finally, we contributed this fix to the maintainer of the project in GitHub, Dan McKinley, and it has been merged to JUDS master branch.

Implementing a custom Lucene query to obtain flexible score customization


In our R&D department, we have been successfully using Lucene (and Solr too) since its version 2.2 to implement information retrieval solutions in internal (own products) and external (customer's products) projects. Recently, we have been developing on top of Solr a search engine for one of our customers. This customer owns a web portal where users can search for different types of companies (from ice-cream saloons to car sellers). Those companies that want their ads displayed in this web site must paid a certain amount of money. Each ad is made of: name, additional information and a series of keywords that the company can buy in order to appear in response to certain queries. For example, a car seller that sells Ford cars can buy the keyword “Ford” in order to appear as a result if a user searches “Ford”.
The challenge here was the ranking formula that the customer wanted us to implement: the formula must compute the product of the length norm (tokens that matches the query divided by the total number of tokens in the field) of the field that best matches with the query with the investment of each company. To better understand the proposed formula, let's take a look to the following example, where the Lucene/Solr documents are represented as a single entry in the index for each company
Company 1
Name: John Doe Car Seller
Info: The best cars in the middlewest
Keyword: Cars And Bikes
Investment: 2000
Company 2
Name: Uncle Sam Car Seller
Info: The best cars in the east
Keyword: Cars
Investment: 1500
For this example, these should be the scoring calculations, if the query is “cars”.
For company 1: (1/2) * 2000 = 1000. The factor (½) is because the matching field with shortest length is the Keyword field, and it has two terms: “cars” and “bikes” (let's assume “and” is a stopword).
For company 2: (1/1) * 1500 = 1500. The factor (1/1) is because the matching field with shortest length is the Keyword field, and it has one term: “cars” which matches the query exactly.
So, using the proposed formula, company 2 should appear first and company 1 should appear second in the result list. The formula has three different parts:
  1. How to include the investment as part of the scoring formula.

  2. How to obtain the field that is the best match for the query.

  3. How to multiply 1) and 2) in order to obtain the score.

We started analyzing 1), and our first idea was to use Lucene's document boosting (1) to represent the investment of each company. At first glance, it looked like a good idea because using it we can solve the problem of using investment and multiply it for something else almost automatically, but shortly after using this solution, we started having some problems. We did not account for the fact that in Lucene, document boosting is encoded in 1 byte, along with fields boostings and lengthNorms. As a result, in scoring calculation, when retrieving the single byte, the values lost resolution, because there are only 256 fixed values for represent the boostings. This did not appear as a problem for big differences in investments (for example, 1000 vs. 10000), but it did for small differences (like 1000 vs. 1050) that must be reflected in the ranking.
We found instead the class CustomScoreQuery (2)(3) which is a special type of query that sets document score as a programmatic function of several (sub) scores. This was extremely useful because we could use it to multiply the scoring of one “regular” Lucene query (the one with the length norm of the best matching field, which still we had to solve) by a ValueSourceQuery associated with the “investment” field of each company document.
As we said previously, we still had to resolve the problem of build a query that return as its score the length norm of the best matching field. After a lot of searching we did not find any “standard” solution for this issue, so we decided to build our own custom Lucene query. We designed and implemented a custom Query subclass named MaxQuery. Its class constructor expects two parameters: a boolean query, where boolean query clauses are split and applied individually to find the one with the highest length norm, and another query, typically a FieldScoreQuery, that is associated with the investment field. The boolean query is used because it fits with the original query (and legacy code) that was used to retrieve search results, but it could also be a collection of term queries for example. We also had to implement the related Weight and Scorer subclasses, that in a moment of incredible inspiration, we called MaxWeight and MaxScorer. The latter is the “heart” of this implementation, and we will explain its code.
Before starting, the implemented classes are fully compatible with the latest version of Lucene at the time (3.3.0). We have used deprecated code, like the Searcher class, but as of version 3.3.0 the deprecated code is still in use in Solr core classes, so that is why we used it. Our classes should be modified when Lucene 4.0 becomes the stable version.
Now, let's take a look to the code of the MaxScorer class, and then we will explain each part (some code has been cut off for clarity).




class MaxScorer {
/**
* The scorers of the queries of the clauses of the boolean query.
* These scorers will be used to get the maximum score of these queries,
* that will be used in the final score.
*/
protected Scorer[] maxScorers;
/**
* The scorer of the query related with the investment field
*/
protected Scorer investmentScorer;
/**
* Array of next document numbers that match the {@link #maxScorers}.
* The indexes
of this array matches the indexes of {@link #maxScorers}
This array it is initialized in
*/
protected int[] maxScorersNextDocs;

/**
* Document number which corresponds to the minimum document number * in {@link #maxScorersNextDocs}.
* This document number is a candidate to match the query
*/
protected int maxScorersMinOfNextDocs = -1;

/**
* Array that contains for each position:
* <ul>
* <li><code>true</code> if the scorer in
* {@link #maxScorers} in that position matches {@link #doc}
* <li><code>false</code> otherwise</li>
* </ul>
* This array is used in {@link #score()} method in order to
* determine which max scorers should be used for scoring {@link
* #doc}
* and get the maximum scoring.
* This array it's initialized in FALSE
*/
protected boolean[] maxScorersForCurrDoc;

protected void advanceMaxScorers() throws IOException {
for (int idx = 0; idx < this.maxScorersNextDocs.length; idx++) {

if (this.maxScorersNextDocs[idx] != NO_MORE_DOCS && this.maxScorersNextDocs[idx] <= this.maxScorersMinOfNextDocs) {
this.maxScorersNextDocs[idx] = this.maxScorers[idx].nextDoc();
}
}
int minDoc = NO_MORE_DOCS;
for (int idx = 0; idx < this.maxScorersNextDocs.length; idx++) {
if (this.maxScorersNextDocs[idx] < minDoc) {
minDoc = this.maxScorersNextDocs[idx];
}
}
this.maxScorersMinOfNextDocs = minDoc;
}

@Override
public float score() throws IOException {
/* * Calculate the scoring, as the maximum of the max scorers score * multiplied by the investment */
float max = Float.MIN_VALUE;
for (int idx = 0; idx < this.maxScorersForCurrDoc.length; idx++) {
if (this.maxScorersForCurrDoc[idx]) {
float score = this.maxScorers[idx].score();
if (score > max) {
max = score;
}
}
}
float priority = this.priorityScorer.score();
return this.qWeight * max * priority;
}

@Override
public int nextDoc() throws IOException {
//Advance the max scorers
this.advanceMaxScorers();
if (this.maxScorersMinOfNextDocs == NO_MORE_DOCS) {
this.doc = NO_MORE_DOCS;
} else {
/* Assign the current document that matches this query and scorer */
this.doc = maxScorersMinOfNextDocs;
/* Advance the investment scorer to the current matching document (all documents have investment) */
this.priorityScorer.advance(this.doc);
/* Determine which of the max scorers match the current matching document, in order to use them for the score calculation */
for (int idx = 0; idx < this.maxScorersNextDocs.length; idx++) { this.maxScorersForCurrDoc[idx] = this.maxScorersNextDocs[idx] == this.doc ? true : false;

}
}
/* Return the current matching document */
return this.doc;
}


The nextDoc() method must always return a doc id that matches at least one of the clauses of the boolean query. For this reason the overloaded nextDoc() method calls the advanceMaxScorers() method. This method does the following: it calls the nextDoc() method of the scorer for each clause and stores the doc id in an array of length N (maxScorersNextDocs), where N is the number of clauses, and stores the minimum doc id in the variable maxScorersMinOfNextDocs. In subsequent calls to this method, the nextDoc() method of each clause's scorer will be called only if the document number in maxScorersNextDocs for each scorer is less or equal than maxScorersMinOfNextDocs. The aim of the method is to advance each scorer in a way that can be determined if one or more of the scorers match a single document number. For example:
MaxScorerADocs = [3,4,18,20]
MaxScorerBDocs = [18,20]
In this example, the aim is to advance only the MaxScorerA and not the MaxScorerB, so in a certain moment we will have the document number 18 as the current document in maxScorersNextDocs for both scorers.
The score() method reads maxScorersForCurrDoc in order to determine which of the scorers match the current document setted by nextDoc(), and then gets the maximum score from the maxScorers that match and multiplies it for the score of the investmentScorer.
This way, we can customize the scoring of each document for implement the proposed formula. The MaxQuery class could be generalized for other similar problems that need a customized formula, and even be included (a generalized version of this class) in a future relese of Lucene.
There are a certain number of tickets related with this:
References: