Sherlock

Building Sherlock, the Search Engine that Does

Written By:
Ken Michie
@McManCSU

Scouring the Internet for publicly available, social media based content is a complicated matter, but for a few here at FullContact it is our full time job. The FullContact Person API leverages the available data from both the social media giants, a la Facebook and Twitter, along with a handful of new kids on the block.

Most data collection is accomplished through RESTful APIs whose content fuels a recursive provider of public profiles – all initiated from a singular point of reference. More simply, our API ingests an email address, Twitter handle or a Facebook username, instruments a search across the relevant social sites and then aggregates the results into a public profile, which then is returned in the requested format. Granted, this is a slightly over exaggerated depiction of simplicity, but it is the foundation for all searches here at FullContact.

Search Framework

The search system diagramed above, which we have lovingly personified as Sherlock, identifies key components to the underlying architecture:

  • Front end API servers for servicing all REST requests
  • Aggregated Profile persistence using Cassandra
  • Message queueing to both help shield Sherlock from traffic bursts and to ensure reliability for our customers
  • The search engine itself, Sherlock, running in a Storm topology
  • Search Target results caching in Cassandra – a cache for faster lookups on data we’ve previously processed
  • The Search Targets themselves for our destination APIs and miscellaneous database lookups

… all of which operates in AWS’s backyard.

(You can read more about our migration from MongoDB to Cassandra here.)

Functionally speaking…

For every initiated search (the Person API search parameter), all relevant Search Targets that can search on the criteria are invoked.

Results of these searches are then analyzed for additional search criteria to search on; each possibly producing sub-searches.

For each leg of a sub-search, relevant profile data is extracted and internalized into well-typed objects.

The byproduct of this process is a recursively generated tree of searches. At the end of the search, the results are aggregated and saved away for a rainy day in the Profiles collection.

The FullContact Person API handles load factors ranging from 50 queries per second (qps) to 300 qps, many of which make their way into the search system. If the provided search parameter is relevant to one or more social sites, this will fire off roughly 10 sub searches, each destined for their respective search target.

Doing some complicated math, this translates into somewhere between 500 to 3000 concurrently outstanding search requests at any second.

Do this for an entire day and some more complicated math yields between roughly 40M and 260M sub-searches a day.

As if this wasn’t enough, we wanted to achieve all this in under a second or two – effectively creating a near real-time search system.

Achieving the throughput and scale described above in an AWS environment proved a tall order. Not only did we have to ensure proper coordination of the searches while dealing with transient APIs and rate limits, but we had to so in a system resource-friendly manner.

Having thousands of blocked threads at any given second was a great recipe for disaster.

The Tools

Without these libraries and tools, we would have been up shit creek:

  • Netty’s AsyncHttpClient - resource friendly http connections. Give me asynchronous callbacks, I want to do something useful with my CPU!
  • Jackson – fasterxml variant for parsing the http responses
  • Netflix’s Hystrix - protecting our internal services and databases from being overrun.  Fail fast and have a backup plan in place
  • Storm - providing a reliable, distributed computing platform – we only needed 3 physical servers, but several workers to avoid contending over shared JVM resources (0.9.0.1)
  • Graphite and StatsD with a StatsDClient – a candy shop of metrics so we know what was really going on at all times:
    Timing, request flavors, qps, rate limits, connection timeouts, response codes, search targets

Netty proved a little convoluted in structuring the asynchronous callbacks, but by leveraging its NIO driven asynchronous callbacks, we could easily chew through our production load without blocking a single thread. Another nice feature of Netty was the ability to easily write your own OAuth code (using the SignatureCalculator), further increasing the reusability of the core client code.

We surgically implanted this framework into a Storm topology to quickly achieve a pretty robust, distributed system. Storm not only provides the means for quickly standing up a distributed workflow platform, but is inherently fault tolerant. Granted, Storm is geared for a single threaded model (one thread per bolt instance), but using asynchronous callbacks to acknowledge the tuples on their respective threads worked out well (synchronize your outputCollector!).

Example SimpleSearcher

The following provides a dumbed down example of how to construct a public API search system. The Coordinator simply handles the coordination of a single search, but could easily be extended to produce additional sub-searches.

package com.fullcontact.sherlock;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.ListenableFuture;
import com.ning.http.client.Response;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.*;

/**
 * Super primitive recursive search system.  This example illustrates how a
 * recursive search system might be structured using the Netty
 * AsyncHttpClient to do the heavy lifting.
 *
 * @author michie @McManCSU
 */
public class SimpleSearcher implements FutureCallback<Response> {
    public final static String FACEBOOK_API = "http://graph.facebook.com/";
    private AsyncHttpClient nettyClient = new AsyncHttpClient();
    private ObjectMapper mapper = new ObjectMapper();
    protected Coordinator coordinator;

    public void search(String url) throws IOException {
        // You can add OAuth headers, etc. with this request object
        AsyncHttpClient.BoundRequestBuilder getRequest =
                nettyClient.prepareGet(url);
        ListenableFuture<Response> responseFuture =
                nettyClient.executeRequest(getRequest.build());
        Futures.addCallback(new ListenableFutureAdapter<Response>(
                responseFuture), this);
    }

    @Override
    public void onSuccess(Response result) {
        PublicProfile profile = null;
        String url = null;
        try {
            url = result.getUri().toString();
            if (result.getStatusCode() == 200) {
                // Tailored for Facebook's API
                Map<String, String> facebookResult =
                        mapper.readValue(result.getResponseBody(), Map.class);
                profile = new PublicProfile(
                        facebookResult.get("id"),
                        facebookResult.get("username"),
                        facebookResult.get("name"));
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            coordinator.handleResults(profile, url);
        }
    }

    @Override
    public void onFailure(Throwable t) {
        t.printStackTrace();
        coordinator.handleResults(null, "");
    }

    public static void main(String[] args) throws Exception {
        String facebookSearchCriteria = args.length > 0 ?
                args[0] : "McManCSU"; // Default value
        long startTime = System.currentTimeMillis();
        Coordinator coordinator = new Coordinator();
        SimpleSearcher searcher = new SimpleSearcher();
        searcher.coordinator = coordinator;
        searcher.search(FACEBOOK_API + facebookSearchCriteria);
        // Block on waiting for the callback - normally you'd return to doing
        // something useful, and handle the callback elsewhere
        coordinator.latch.await();
        System.out.println("Successfully got our callback!  Search took " +
                (System.currentTimeMillis() - startTime) + " ms");
    }
}

/**
 * The POJO representing a PublicProfile - ID, UserName and Name
 */
class PublicProfile {
    protected String id, userName, name;

    public PublicProfile(String id, String userName, String name) {
        this.id = id;
        this.userName = userName;
        this.name = name;
    }

    @Override
    public String toString() {
        return "id=" + id + ", userName=" + userName + ", " + "name=" + name;
    }
}

/**
 * Coordinate the search in a primitive way - expand the coordinator to
 * allow you to recursively search.
 */
class Coordinator {
    // Constrain this example to 1 search
    protected CountDownLatch latch = new CountDownLatch(1);

    public void handleResults(PublicProfile profile, String searchedURI) {
        if (profile == null)
            System.out.println("No dice for " + searchedURI);
        else
            System.out.println("The results of the search: " +
                    searchedURI + " -> " + profile);
        latch.countDown();
    }
}

/**
 * Aids in converting the Netty ListenableFuture into the the Google
 * Guice's ListenableFuture.  Simple delegation...
 */
class ListenableFutureAdapter<T> implements
        com.google.common.util.concurrent.ListenableFuture {
    private final com.ning.http.client.ListenableFuture<T> ningFuture;

    public ListenableFutureAdapter(
            com.ning.http.client.ListenableFuture<T> future) {
        this.ningFuture = future;
    }

    public void addListener(Runnable runnable, Executor executor) {
        ningFuture.addListener(runnable, executor);
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return ningFuture.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return ningFuture.isCancelled();
    }

    public boolean isDone() {
        return ningFuture.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return ningFuture.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        return ningFuture.get(timeout, unit);
    }
}

Privacy

FullContact takes your privacy seriously! Even though all data that we expose is already public information on the web, we offer the requisite capability to claim your profile, exposing as much or as little, as you deem necessary.

Image Credits:

Writers-Network

shining.darkness via Flickr

Like this post? Share it:

Recent Posts