published in 'JuptrTech von Juptr' on Juptr.io

0
0

Full Stack Engineer, Software enthusiarchitect, Open Source programmer, CTO@Juptr.io


Rüdiger Möller

Move Code Not Data: 'Sandboxing' using Remote Lambda Execution

Leveraging remote lambda execution to avoid being IO bound in a data intensive  micro service system. 

 

Magnification of a network packet drawn randomly out of a database network connection 

The issue with data storage as its handled today ..


Data storage solutions available today enable access using an API or kind of query language. In case an API is provided, client applications need to pull data out of the storage (data is transmitted over the network), operate on the data, finally write back modifications by again transmitting data over the network.

For advanced query languages, things look better. However most query languages are interpreted, provide their own data types, syntax and programming paradigm. Frequently those languages have limited ability to express complex distributed computations.

Consequently, most apps connecting a datastorage/database out there are IO bound.

Sandboxing is the new API

An ideal approach to process and analyze big amounts of (distributed) data would be to send snippets of executable code towards the data-hosting machines, let them execute 'near' the data and just send back final (or intermediate) results to the client application. This way the amount of data transfered over the network is drastically reduced. In addition, processing load at application side is also reduced as queries execute remotely.

To implement such a pattern, a Just-in-time-compiled and portable execution platform is required (e.g. JVM or V8 javascript engine).

 

This can be seen as an implementation of a "sandbox pattern". The data hosting application provides and defines the outer processing environment. A remote client application then can "inject" a lambda (or anonymous class).  
Simple sandbox example: A loop throwing each record into a lambda provided by a remote client app.

How does this work out in a real app ?

Long story short: really good ;).
At juptr.io, we use memory mapped files to back persistent, iterable HashMap's of ordinary (fast-serialized) java objects, which can be write-through cached in-memory. Iterable Hashmaps support both column-oriented data organization and queries as well as document store key/value-style access patterns.
In order to scale horizontally, those data hashmaps are sharded amongst multiple nodes making up a cluster.

Data modification and query is achieved by sending (fast-)serialized plain Java lambda-functions over the wire. The receiving node executes the lambda function inside kind of 'sandbox' (outer code calling the lambda provided by sender) by throwing in either a reference to its local data map/table or by iterating and feeding the lambda record by record.

Locality
We rarely need to transfer data over the network just in order to operate on it, instead we send a lambda which operates "nearby" the data and sends back operation results.
Move code not data.

Performance: Leveraging sophisticated Just-in-time-compilers
The byte code of a remoted lambda enjoys full HotSpot close-to-the-metal-compilation and optimization.
Because an ordinary lambda implementation naturally separates parameters from actual code, compilation of a lambda query happens once, not with each query.
Less work - better results.


Flexibility
We are not limited by some proprietary query language. A lambda query can perform any operation expressable with java just like an ordinary forEach iteration on a local  collection.
When using anonymous classes instead of lambdas, its possible to use arbitrary data structures in order to accumulate and capture intermediate results during query record iteration. An additional aggregation step at sender side is required then as N intermediate results of N data nodes are received in a scaled setup (map reduce'ish). 
Get around work instead of working around.

Simplicity
As there is no impedance mismatch, working with big data feels as simple as looping  a collection on the heap. 

Conclusion

Reduced network load, better data locality and the leverage of HotSpot JIT for high performance query execution makes this implementation pattern worth a try. Its probably better than schnitzel and very close to spaghetti. 

This is part three of a series of our journey building juptr.io
Previous parts here:


Code ..

Example:
Manipulating a larg'ish persistent object without transmitting it

A User Record holds a lot of data, amongst others a list of updated comments. Instead of pulling the whole user-blob over the network in order to add an item, we just send a piece of code to the appropriate data node and do the processing there:

    // commentTreeKey, parentCommentId, newComment, ukey
    // defined in local context. Do a transaction:
    userTable.atomicQuery( recordKey, rec -> { 
        // executes remote inside data node !
        if ( rec != null ) {
            // Helper to operate on record in a typesafe manner
            User user = new User(rec); 
            user.addNewReply(commentTreeKey, parentCommentId, newComment.getId(), ukey );
            return user.getNumCommens();
        }
        return null;
    }).then( result -> { // async ftw
       // result (number of comments) is received and processed here ..
       // this one runs locally e.g. inside a connecting webserver
       if ( result == null ) {
           ..record not found handling ..
       } else {
           ..success processing..
       }
    });

"userTable" proxies a set of sharded remote data nodes each storing a shard.

Example:
Mass analytics transmitting final results only

With 'Mass analytics' I mean typical processing such as 'get top 100 records of a sorted set', 'average size of all user comments', etc.). One needs to match a specific set of records, perform some computation on those and return the results. Classic database solution would be a 'Stored Procedure'.

We can solve this by sending anonymous classes implementing a query (instead of lambdas) to the datanodes. Each class then computes intermediate results per-shard  and sends them back to the querying client process. The querying process finally aggregates the intermediate results.

Concrete example: in juptr.io, the article distribution over time of specific topic query is needed in order to paginate a channel's stream efficiently (e.g. http://www.juptr.io/@NerdNews). This means we have to apply the query to any of our (5 million and growing ..) articles. In case of a match, the key and publishing time of the matching article needs returned.

int did; // denotes stream id for querying

// holds intermediate results from a shard
List<RecordEntry> result = new ArrayList<>();

// collects the final result (aggregation of all data node results)
List<RecordEntry> articleEntries;

// method executed REMOTE, iterates over all records of a shard
public void remote(Record input) {
    if ( ! Article.isMemberStatic(did,input) ) { // test if article matches
        return;
    }
    if ( result == null ) {
        // allocate intermediate result list. (safe as we are single threaded per node)
        result = new ArrayList<>();
    }
    result.add(
        new ResultEntry(input.getKey(),Article.getShareTimeStatic(did,input))
    );
}

// method executed LOCAL,
// intermediate results from each node are streamed to here
public void forEachIntermediate( List<ResultEntry>, Object err ) {
    if ( list != null ) {
        articleEntries.addAll(list);
    }
}

// method executed LOCAL, all data nodes have replied
public void onFinish() {
    [.. done. further processing (articleEntries) ..]
});


Published in: