Wednesday, August 14, 2013

Connecting to multiple clusters from libcouchbase

The other day I talked to a user who wanted to utilize multiple Couchbase clusters from the same application using libcouchbase. That is pretty simple to do, but it also gave me an idea to a blog post describing a couple of options you have.

If you've used libcouchbase you probably know that you need to supply an lcb_t to all of the functions in libcouchbase. The reason for that is that we don't use global variables in libcouchbase, so in fact the absolute easiest way to communicate with two different clusters can be done as simple as:

lcb_t cluster1, cluster2;
struct lcb_create_st create_options;

/* Create instance to the first cluster */
memset(&create_options, 0, sizeof(create_options));
create_options.v.v0.host = "cluster1";
lcb_create(&cluster1, &create_options);

/* Create instance to the second cluster */
memset(&create_options, 0, sizeof(create_options));
create_options.v.v0.host = "cluster2";
lcb_create(&cluster2, &create_options);

So far so good, but what if I want to access the two clusters concurrently? Using multiple threads seems like the "easiest" solution to this problem, and that is fine as long as you don't use the same lcb_t from multiple threads at the same time. libcouchbase is built for scalability, so we won't put any limitations inside the library that you could be better off solved outside the library. With this in mind the previous example could just as easy be rewritten as (using pthreads):

static void *my_cluster_worker(void *arg) {
    lcb_t instance;
    struct lcb_create_st create_options;
    memset(&create_options, 0, sizeof(create_options));
    create_options.v.v0.host = arg;
    lcb_create(&instance, &create_options);

...

/* Spin up the different threads */
pthread_create(tid, NULL, my_cluster_worker, "cluster1");
pthread_create(tid, NULL, my_cluster_worker, "cluster2");

You could of course just protect the different lcb_t with a lock and ensure that you're using them excluselively:

pthread_mutex_lock(&cluster1_mutex);
lcb_get(cluster1,  ... )
lcb_wait(cluster1);
pthread_mutex_unlock(&cluster1_mutex);

Given that libcouchbase is asynchronous we can also utilize multiple clusters from the same thread by utilizing the same IO instance. This isn't "true multitasking", but in most cases we'd be waiting on the command flying over the network anyway:

lcb_io_opt_t iops;
lcb_t cluster1, cluster2;
struct lcb_create_st create_options;

/* Create io instance */
lcb_create_io_ops(&iops, NULL);

/* Create instance to the first cluster */
memset(&create_options, 0, sizeof(create_options));
create_options.v.v0.host = "cluster1";
create_options.v.v0.io = iops;
lcb_create(&cluster1, &create_options);

/* Create instance to the second cluster */
memset(&create_options, 0, sizeof(create_options));
create_options.v.v0.host = "cluster2";
create_options.v.v0.io = iops;
lcb_create(&cluster2, &create_options);

All operations you try to execute will use the same event loop, so that if you do lcb_wait it will drive the event loop until all operations scheduled for that lcb_t instance is executed (but this will also execute commands scheduled for all other lcb_t instances using the same io instance).

Lets drag the example a bit further and imagine that we're using multiple Couchbase clusters for a high availability cache. I'm not going to look into cache consistency here (trying to limit the scope of the post). Whenever we want to store an item we try to store it on all of the servers, and whenever we want to retrieve an object we'll just use the fastest response (we could of course return the value returned from the quorum of the clusters etc, but you'll figure out how to tweak the code to do so.

You'll find the entire source code in the example directory for libcouchbase, so I'll just comment inline in the code here (making it harder for people doing copy'n'paste).

So let's go ahead and make a MultiClusterClient with the following API:

class MultiClusterClient {
public:
    MultiClusterClient(std::list<std::string> clusters);
    lcb_error_t store(const std::string &key, const std::string &value);
    lcb_error_t get(const std::string &key, std::string &value);
};

The user can then use the client like:

std::vector<std::string> clusters;
clusters.push_back("cluster1");
clusters.push_back("cluster2");
clusters.push_back("cluster3");
clusters.push_back("cluster4");

MultiClusterClient mcc(clusters);
mcc.store("foo", "bar");
...

The way the client works is that instead of using lcb_wait to wait for completion of operations, it starts and stops the event machine whenever it is needed through the io operations interface.

So let's show the entire signature for the MultiClusterClient class:

class MultiClusterClient {
public:
    MultiClusterClient(std::list<std::string> clusters);
    lcb_error_t store(const std::string &key, const std::string &value);
    lcb_error_t get(const std::string &key, std::string &value);

private:
    void wait(void) {
        switch (iops->version) {
        case 0:
            iops->v.v0.run_event_loop(iops);
            break;
        case 1:
            iops->v.v1.run_event_loop(iops);
            break;
        default:
            std::cerr << "Unknown io version " << iops->version << std::endl;
            exit(EXIT_FAILURE);
        }
    }

    void resume(void) {
        switch (iops->version) {
        case 0:
            iops->v.v0.stop_event_loop(iops);
            break;
        case 1:
            iops->v.v1.stop_event_loop(iops);
            break;
        default:
            std::cerr << "Unknown io version " << iops->version << std::endl;
            exit(EXIT_FAILURE);
        }
    }

    lcb_io_opt_t iops;
    std::list<lcb_t> instances;
};

Now that we've got an idea on how the class looks like, lets go ahead and write the constructor. In the constructor I'm going to create all of the instances used to connect to the various nodes, and I'm going to simplify the error handling to terminate the program instead of obfuscating the code with a ton of recovery code/logic.

MultiClusterClient(std::list<std::string> clusters) {
    lcb_error_t err;
    if ((err = lcb_create_io_ops(&iops, &backend)) != LCB_SUCCESS) {
        std::cerr <<"Failed to create io ops: "
                  << lcb_strerror(NULL, err)
                  << std::endl;
        exit(1);
    }

    // Create an lcb_t instance to all of the clusters
    for (std::list<std::string>::iterator iter = clusters.begin();
         iter != clusters.end();
         ++iter) {
        lcb_create_st options(iter->c_str(), NULL, NULL, NULL, iops);
        lcb_t instance;
        if ((err = lcb_create(&instance, &options)) != LCB_SUCCESS) {
            std::cerr <<"Failed to create instance: "
                      << lcb_strerror(NULL, err)
                      << std::endl;
            exit(1);
        }

        lcb_set_error_callback(instance, error_callback);
        lcb_set_get_callback(instance, get_callback);
        lcb_set_store_callback(instance, storage_callback);

        lcb_connect(instance);
        lcb_wait(instance);
        instances.push_back(instance);
    }
}

To summarize the effect of the code above, we've now got list of lcb_t instances connected to all of the requested clusters where all of them is bound to the same event base.

With the list of the instances all set up I guess its time to implement the store method and start discussing that:

lcb_error_t store(const std::string &key, const std::string &value) {
    const lcb_store_cmd_t *commands[1];
    lcb_store_cmd_t cmd;
    commands[0] = &cmd;
    memset(&cmd, 0, sizeof(cmd));
    cmd.v.v0.key = key.c_str();
    cmd.v.v0.nkey = key.length();
    cmd.v.v0.bytes = value.c_str();
    cmd.v.v0.nbytes = value.length();
    cmd.v.v0.operation = LCB_SET;

    lcb_error_t error;
    Operation *oper = new Operation(this);

    // Send the operation to all of the clusters
    for (std::list<lcb_t>::iterator iter = instances.begin();
         iter != instances.end();
         ++iter) {

        if ((error = lcb_store(*iter, oper, 1, commands)) != LCB_SUCCESS) {
            oper->response(error, "");
        }
    }

    wait();
    lcb_error_t ret = oper->getErrorCode();
    oper->release();
    return ret;
}

lcb_error_t get(const std::string &key, std::string &value) {
    lcb_get_cmd_t cmd;
    const lcb_get_cmd_t *commands[1];

    commands[0] = &cmd;
    memset(&cmd, 0, sizeof(cmd));
    cmd.v.v0.key = key.c_str();
    cmd.v.v0.nkey = key.length();

    Operation *oper = new Operation(this);
    lcb_error_t error;
    for (std::list<lcb_t>::iterator iter = instances.begin();
         iter != instances.end();
         ++iter) {

        if ((error = lcb_get(*iter, oper, 1, commands)) != LCB_SUCCESS) {
            oper->response(error, "");
        }
    }

    wait();
    value = oper->getValue();
    lcb_error_t ret = oper->getErrorCode();
    oper->release();
    return ret;
}

This looks pretty much like how you would have done with just a single cluster except for the Operation class and that we're calling wait() instead of lcb_wait(). So what is the Operation class and what is its purpose? As I said earlier we're not going to wait for a response from all of the clusters before responding. This means that the next time I wait for an response I get a response for the previous request I sent out (which should be "silently" ignored). I'm aware of that I really don't need to create a separate class for this (I could have used a counter and assigned a sequence number to each command, but this was just as easy). Given that I don't know the life-time for each request I use "reference-counting" on the object to figure out when to destory the object.

So let take a look at the Operation class:

class Operation {
public:
    Operation(MultiClusterClient *r) :
        root(r),
        error(LCB_SUCCESS),
        numReferences(r->instances.size() + 1),
        numResponses(0)
    {
    }

    void response(lcb_error_t err, const std::string &value) {
        if (err == LCB_SUCCESS) {
            values.push_back(value);
        } else {
            error = err;
        }

        // @todo Currently we're going to proceed at the first
        // response.. you might want more ;-)
        // the wait to resume
        if (++numResponses == 1) {
            root->resume();
        }

        maybeNukeMe();
    }

    lcb_error_t getErrorCode(void) {
        // You might want to do this in a quorum fasion of all the
        // responses
        return error;
    }

    std::string getValue(void) {
        // You might want to do this in a quorum fasion of all the
        // responses
        return values[0];
    }

    void release(void) {
        maybeNukeMe();
    }

private:
    void maybeNukeMe(void) {
        if (--numReferences == 0) {
            delete this;
        }
    }

    MultiClusterClient *root;
    lcb_error_t error;
    int numReferences;
    int numResponses;
    std::vector<std::string> values;
};

As you see the code makes a few shortcuts. For once I let one error mark the entire operation fail (if the first cluster don't have the key but the second does etc you'll get that the key wasn't found), and the error checking should do more retries etc. Anyway you'll figure out how it works.

The last "missing pieces" is the callbacks called from libcouchbase:

static void storage_callback(lcb_t, const void *cookie,
                             lcb_storage_t, lcb_error_t error,
                             const lcb_store_resp_t *)
{
    MultiClusterClient::Operation *o;
    o = (MultiClusterClient::Operation *)cookie;
    o->response(error, "");
}

static void get_callback(lcb_t, const void *cookie, lcb_error_t error,
                         const lcb_get_resp_t *resp)
{
    MultiClusterClient::Operation *o;
    o = (MultiClusterClient::Operation *)cookie;
    if (error == LCB_SUCCESS) {
        std::string value((char*)resp->v.v0.bytes, resp->v.v0.nbytes);
        o->response(error, value);
    } else {
        o->response(error, "");
    }
}

static void error_callback(lcb_t instance,
                           lcb_error_t error,
                           const char *errinfo)
{
    std::cerr << "An error occurred: " << lcb_strerror(instance, error);
    if (errinfo) {
        std::cerr << " (" << errinfo << ")";
    }
    std::cerr << std::endl;
    exit(EXIT_FAILURE);
}

Happy hacking

Thursday, August 8, 2013

Running Couchbase under SMF on SmartOS

In my previous post I showed the steps on how to build the Couchbase server on SmartOS, but you might probably want to keep it running as a service controlled by SMF. I am no expert on SMF so it may offer a lot of features I could utilize, but the following SMF manifest file worked like a charm for me. Save the following as /opt/couchbase/smf/couchbase.xml:

<?xml version="1.0"?>
<!DOCTYPE service_bundle SYSTEM "/usr/share/lib/xml/dtd/service_bundle.dtd.1">
<service_bundle type="manifest" name="couchbase">
    <service name="application/database/couchbase" type="service" version="1">
        <single_instance/>

        <dependency name="multi-user-server" grouping="require_all" restart_on="none" type="service">
            <service_fmri value="svc:/milestone/multi-user-server"/>
        </dependency>

        <property_group name="general" type="framework">
            <propval name="action_authorization" type="astring"
                     value="solaris.smf.manage.couchbase"/>
            <propval name="value_authorization" type="astring"
                     value="solaris.smf.value.couchbase"/>
        </property_group>

        <property_group name="couchbase" type="application">
            <propval name="corepattern" type="astring"
                     value="/opt/couchbase/var/crash/core.%f.%p"/>
        </property_group>

        <instance name="couchbase" enabled="false">
            <exec_method type="method" name="start" exec="/opt/couchbase/smf/couchbase start" timeout_seconds="30">
                <method_context>
                    <method_credential user="couchbase" group="couchbase"/>
                </method_context>
            </exec_method>
            <exec_method type="method" name="stop" exec="/opt/couchbase/smf/couchbase stop  %{restarter/contract}"
                         timeout_seconds="60">
                <method_context>
                    <method_credential user="couchbase" group="couchbase"/>
                </method_context>
            </exec_method>
        </instance>

        <stability value="Unstable"/>
        <template>
            <common_name>
                <loctext xml:lang="C">Couchbase database server</loctext>
            </common_name>
        </template>
    </service>
</service_bundle>


The source bundle we built contains a script to start and stop the server, but we need to wrap it order to make it work under SMF. Let's go ahead and create /opt/couchbase/smf/couchbase with the following content:

#!/sbin/sh
. /lib/svc/share/smf_include.sh

PATH=${BIN_ROOT}/bin:$PATH:/opt/local/bin:/opt/local/gnu/bin
export PATH

case "$1" in
   'start')
      coreadm -p "`svcprop -p couchbase/corepattern $SMF_FMRI`" $$
      /opt/couchbase/bin/couchbase-server -- -noinput -detached &
   ;;

   'stop')
      /opt/couchbase/bin/couchbase-server -k &
   ;;

   *)
      echo "Usage: $0 {start|stop}"
      exit 1
;;
esac

exit $SMF_EXIT_OK


The scripts above wants to run the Couchbase server as couchbase:couchbase, so the first thing we need to do is to create the user and group:

[root@cbbuilder ~] groupadd couchbase
[root@cbbuilder ~] roleadd -g couchbase -d /opt/couchbase/var couchbase
[root@cbbuilder ~] mkdir -p /opt/couchbase/var/crash
[root@cbbuilder ~] chown -R couchbase:couchbase /opt/couchbase

Now import the the service manifest and start the service with:


[root@cbbuilder /opt/couchbase/smf] svccfg import couchbase.xml
[root@cbbuilder /opt/couchbase/smf] svcadm enable couchbase


And let's look at the service:

[root@cbbuilder /opt/couchbase/smf]# svcs -xv couchbase
svc:/application/database/couchbase:couchbase (Couchbase database server)
 State: online since 11:19:20 UTC  8. august 2013
   See: /var/svc/log/application-database-couchbase:couchbase.log
Impact: None.


Happy hacking :-)

Monday, August 5, 2013

Running Couchbase 2.1.1 on SmartOS

I assume most of my readers knows about my love to the Solaris operating system and its descendants such as SmartOS. I've been on vacation for a couple of weeks now, and during my vacation I noticed some comments on previous blog posts that people had tried to build Couchbase on their SmartOS system without success so I figured I should create a new blog post where I walked through the steps needed.

To make reproducible steps for people who are interested, I decided to ensure that the blog post includes all the steps needed (including creating the environment). The first thing we need to do is to log into our server and update the list of available datasets. I started off this morning by downloading smartos-20130725T202435Z-USB.img.bz2 and created a bootable USB stick and booted my server.

With my SmartOS server running the (as of today) latest bits I imported the dataset I was going to use for my build with the following command:

[root@smartos ~]# imgadm import 9eac5c0c-a941-11e2-a7dc-57a6b041988f

And created a the vm with the following setup:

[root@smartos ~]# cat | vmadm create
{
  "alias" : "couchbase",
  "autoboot": true,
  "brand": "joyent",
  "dns_domain" : "norbye.org",
  “resolvers” : [ "10.0.0.1" ],
  "image_uuid" : "9eac5c0c-a941-11e2-a7dc-57a6b041988f",
  "hostname" : "cbbuilder",
  "max_physical_memory": 4096,
  "nics": [
    {
      "nic_tag": "admin",
      "ip": "10.0.0.150",
      "netmask": "255.255.255.0",
      "gateway": "10.0.0.1"
    }
  ]
}
^D

Listing all of my vm's shows:

[root@smartos ~]# vmadm list
UUID                                  TYPE  RAM      STATE             ALIAS
1200e3a9-a9cc-49e5-b9f0-bed2ec3b005d  OS    4096     running           couchbase

The first thing I did was to log in and set the password for the root user and create my own user to use during the build process:

[root@smartos ~]# zlogin 1200e3a9-a9cc-49e5-b9f0-bed2ec3b005d
[Connected to zone '1200e3a9-a9cc-49e5-b9f0-bed2ec3b005d' pts/5]
Last login: Mon Aug  5 08:22:26 on pts/3
   __        .                   .
 _|  |_      | .-. .  . .-. :--. |-
|_    _|     ;|   ||  |(.-' |  | |
  |__|   `--'  `-' `;-| `-' '  ' `-'
                   /  ; SmartMachine (base64 13.1.0)
                   `-'  http://wiki.joyent.com/jpc2/SmartMachine+Base

[root@cbbuilder ~]# passwd root
[root@cbbuilder ~]# useradd -g 10 -s /usr/bin/bash \
-d /home/trond -m trond
[root@cbbuilder ~]# passwd trond
^D

Now that I've got my own user I logged in as that user over an ssh shell and became root and installed all of the packages I need to build Couchbase:

[trond@cbbuilder ~]$ pfexec su -
[root@cbbuilder ~]# pkgin -y in libtool-base autoconf \
                                automake scmgit-base gcc47 \
                                gnupg gmake libevent icu \
                                py27-expat snappy erlang \
                                subversion-base
[root@cbbuilder ~]# wget --no-check-certificate \
                         -O/opt/local/bin/repo \
        https://git-repo.googlecode.com/files/repo-1.19
[root@cbbuilder ~]# chmod a+x /opt/local/bin/repo

I'll be installing Couchbase to /opt/couchbase, so lets go ahead and create that:

[root@cbbuilder ~]# mkdir /opt/couchbase
[root@cbbuilder ~]# chown trond /opt/couchbase
^D

There are a few dependencies Couchbase use that don't exist in the pkgin repository. Let's go ahead and build them and install them into /opt/couchbase.

[trond@cbbuilder ~]$ wget --no-check-certificate \
https://gperftools.googlecode.com/files/gperftools-2.1.tar.gz

[trond@cbbuilder ~]$ gtar xfz gperftools-2.1.tar.gz
[trond@cbbuilder ~]$ cd gperftools-2.1

[trond@cbbuilder ~/gperftools-2.1]$ ./configure --enable-minimal \
--enable-shared \
--disable-static \
--prefix=/opt/couchbase
[trond@cbbuilder ~/gperftools-2.1]$ gmake install
[trond@cbbuilder ~/gperftools-2.1]$ cd ..
[trond@cbbuilder ~]$ wget --no-check-certificate -Ov8.tar.gz \
https://github.com/v8/v8/archive/3.19.0.tar.gz
[trond@cbbuilder ~]$ gtar xfz v8.tar.gz
[trond@cbbuilder ~]$ cd v8-3.19.0
[trond@cbbuilder ~/v8-3.19.0]$ gmake dependencies
[trond@cbbuilder ~/v8-3.19.0]$ gmake x64 library=shared -j 4
[trond@cbbuilder ~/v8-3.19.0]$ cp out/x64.release/lib.target/libv8.so \
/opt/couchbase/lib
[trond@cbbuilder ~/v8-3.19.0]$ cp include/* /opt/couchbase/include/

To avoid passing too many arguments when we're invoking make we can add them into ~/.couchbase/build/Makefile.extra:

[trond@cbbuilder ~]$ mkdir -p ~/.couchbase/build
[trond@cbbuilder ~]$ cat > ~/.couchbase/build/Makefile.extra
OPTIONS += CPPFLAGS="-I$(PREFIX)/include"
OPTIONS += LDFLAGS="-R/opt/local/lib -L$(PREFIX)/lib -R$(PREFIX)/lib"
OPTIONS += CXX="g++ -L/opt/local/lib -I/opt/local/include" 
OPTIONS += CC="gcc -I/opt/local/include -L/opt/local/lib"
memcached_EXTRA_OPTIONS += --enable-tcmalloc-minimal
^D

We need to "configure" git before we can start use it to download the source code:

[trond@cbbuilder ~]$ git config --global user.email "trond.norbye@localhost"
[trond@cbbuilder ~]$ git config --global user.name "Trond Norbye"
[trond@cbbuilder ~]$ mkdir compile && cd compile
[trond@cbbuilder ~/compile]$ repo init -u git://github.com/membase/manifest.git -m released/2.1.1.xml
[trond@cbbuilder ~/compile]$ repo sync

Unfortunately there is a problem with one of the exceptions being thrown in Couchbase that cause a crash on SmartOS, so we need to "patch" one file. Its not hard, just add the following 3 lines of code:

[trond@cbbuilder ~/compile/ep-engine]$ git diff
diff --git a/src/couch-kvstore/couch-kvstore.cc b/src/couch-kvstore/couch-kvstore.cc
index 931fb30..a48f271 100644
--- a/src/couch-kvstore/couch-kvstore.cc
+++ b/src/couch-kvstore/couch-kvstore.cc
@@ -515,6 +515,9 @@ void CouchKVStore::getPersistedStats(std::map &stats)
 {
     char *buffer = NULL;
     std::string fname = dbname + "/stats.json";
+    if (access(fname.c_str(), F_OK) == -1) {
+        return;
+    }
     std::ifstream session_stats;
     session_stats.exceptions (session_stats.failbit | session_stats.badbit);
     try {

With that in place we can build Couchbase with the following command:

[trond@cbbuilder ~/compile]$ gmake PREFIX=/opt/couchbase

When make completes /opt/couchbase should contain a successful build of Couchbase 2.1.1, and at this time you should probably go ahead and create your startup scripts etc. We can try to emulate a cluster by starting 2 nodes on the same machine by running the following command:

[trond@cbbuilder ~/compile]$ cd ns_server
[trond@cbbuilder ~/compile/]$ ./cluster_run -n 2

And in another terminal we can build the cluster by executing:

[trond@cbbuilder ~/compile/]$ ./cluster_connect -n 2

Now tune your browser to the IP address of your server at port 9000 and enjoy your cluster.

Happy hacking!

Trond