published in 'JuptrTech von Juptr' on Juptr.io

0
0

Full Stack Engineer, Software enthusiarchitect, Open Source programmer, CTO@Juptr,io


Rüdiger Möller

Make distributed programming great again: Microservices with Distributed Actor Systems

Part one of a series of our journey building juptr.io: 

How a fundamental abstraction streamlines, simplifies and speeds up developing a web scale system

Part two can be found here:

 

goes higher, faster and avoids dirty feet :)


Juptr.io is a content personalization, sharing and authoring platform. We crawl 10'th of thousands of blogs and media sites (german + english), then classify the documents to enable personalized content curation, consumption & discussion.

This requires

  • to crawl, store, query, analyze and classify millions of documents
  • horizontal scalability
  • a failure resistant distributed (micro)services architecture
  • java, javascript interoperability

Compared to ordinary enterprise-level projects, a startup has much tighter time and budget constraints (aka startup in europe), so reduction of development effort was a major priority.

We decided to use the following foundation stack (I'll skip the 'why' here to avoid WOT):

  • Java 8 for backend, analytics and NLP
  • Javascript + Polymer.js in the front end
  • NoSql horizontal scalable DataGrid for persistence and big data analytics/NLP 

 

the back end of juptr.io

Even simple application logic becomes complicated when implemented as a remote service

Building a distributed system requires quite some boilerplate work such as definition and continuous update of communication messages and datastructures. 

 


Service implementations spend a significant amount of code preprocessing and copying data from communication-level messages to internal datastructures.
Communication transport type frequently gets hardwired (httpx, tcp/ip, websockets), which limits choices regarding operation platforms. 

Distributed Actors


Long story short: in order to make distributed programming as productive as developing a monolithic application, we need
  • a foundation framework abstracting out network transport boundaries 
  • fully automates encoding/decoding of network messages directly from source code (or runtime) inspection. 
  • captures incompatibe changes inside communication protocol at compile time
  • transparently streams over the wire
  • remote execution of lambdas. Move code, not data.

 

Two "Actors" communicating

Benefits:

  • simple things stay simple. Our ServiceRegistry service has less than 100 lines of code including service health monitoring, availability state change broadcasts and central configuration management (see code example below)
  • seamless integration of java and javascript (just use different message en-/decoding)
  • messages can be temporary persisted with little effort and replayed for testing, failover and replication purposes.
  • sharding is implementable by some simple message routing logic  
  • remote lambda execution enables easy and highly performant distributed mass data processing.
  • high development productivity

Findings when architecting + building (micro-)service based systems

  • Data Locality is key. Move code, not data.
  • Handle failure at the lowest layer possible. E.g. rather persist outgoing actor messages to an absent service and resent them behind the scenes instead of bothering your application level code with messaging failure. 
  • Avoid direct communication amongst services. Prefer distributed (persisted) queues.
  • "Partial software update" is a very expensive feature (development and test). Avoid if possible.
  • Don't go overboard with failover, get a stable system in the first place
    Failover should be a scenario of exceptional circuumstances, not daily routine. Failover timings and scenarios grow exponentially with number of cluster nodes, so its usually much cheaper to fix your application.

...

Showcase: A Service Registry in 100 lines of code

  • Remoted via TCP
  • Service health monitoring via heartbeats, broadcasts availability state changes
  • provides acess to global configuration file, so a connecting service just needs to know host and port of Gravity in order to join the cluster, a service can run on an arbitrary machine.

Though this looks like ordinary local code, all public methods are exposed to remote processes (using runtime introspection and bytecode magic). 

Click here for more information on the actor framework used.

public class Gravity extends Actor<Gravity> {
    HashMap<String, List<ServiceDescription>> services = new HashMap<>();
    List<Callback> listeners = new ArrayList<>();
    JuptrCfg config;

    @Local public void init() {
        config = JuptrCfg.read();
        checkTimeout(); // start cycle
    }
    public void registerService( ServiceDescription desc ) {
        List<ServiceDescription> serviceList = getServiceList(desc.getName());
        serviceList.add(desc);
        desc.receiveHeartbeat();
        if (serviceList.size()==1)
            broadcastAvailable(desc);
    }
    public IPromise<Map<String,ServiceDescription>> getServiceMap() {
        HashMap<String,ServiceDescription> servMap = new HashMap<>();
        services.forEach((name, list) -> {
            if (list.size() > 0)
                servMap.put(name, list.get(0));
        });
        return resolve(servMap);
    }
    public void subscribe( Callback<Pair<String,ServiceDescription>> cb ) {
        listeners.add(cb);
    }
    protected void broadcastAvailable(ServiceDescription desc) {
        Pair msg = new Pair(AVAILABLE,desc);
        listeners.forEach( cb -> {
            try {
                cb.stream(msg);
            } catch (Throwable th) {
                Log.Info(this, th);
            }
        });
    }
    protected void broadCastTimeOut(ServiceDescription desc) {
        Pair msg = new Pair(TIMEOUT,desc);
        for (int i = 0; i < listeners.size(); i++) {
            Callback cb = listeners.get(i);
            try {
                cb.stream(msg);
            } catch (Throwable th) {
                Log.Info(this, th);
                listeners.remove(i);
                i--;
            }
        }
    }
    public IPromise<JuptrCfg> getConfig() {
        return resolve(config);
    }
    public void receiveHeartbeat( String serviceName, String uniqueKey ) {
        getServiceList(serviceName).forEach(sdesc -> {
            if (sdesc.getUniqueKey().equals(uniqueKey)) {
                sdesc.receiveHeartbeat();
            }
        });
    }
    @Local public void checkTimeout() {
        services.values().forEach( list -> {
            int prevsiz = list.size();
            for (int i = 0; i < list.size(); i++) {
                ServiceDescription serviceDescription = list.get(i);
                if ( serviceDescription.hasTimedOut() ) {
                    list.remove(i);
                    i--;
                    broadCastTimeOut(serviceDescription);
                }
            }
            // if a service timed out, but there is a replacement,
            // broadcast availability
            if ( prevsiz != list.size() && list.size() > 0 ) {
                broadcastAvailable(list.get(0));
            }
        });
        if ( ! isStopped() ) {
            delayed(1000, () -> checkTimeout());
        }
    }
    protected List<ServiceDescription> getServiceList(String serviceName) {
        List<ServiceDescription> slist = services.get(serviceName);
        if ( slist == null ) {
            slist = new ArrayList<>();
            services.put(serviceName, slist);
        }
        return slist;
    }
    public static void main(String[] args) {
        Gravity gravity = Actors.AsActor(Gravity.class);
        gravity.init();
        // publish
        new TCPNIOPublisher(gravity,options.getGravityPort()).publish(actor -> {
            Log.Info(null, actor + " has disconnected");
        });
        // log service activity
        gravity.subscribe((pair, err) -> {
            Log.Info(gravity.getClass(), pair.car() + " " + pair.cdr());
        });
    }
}

Neat and easy, isn't it ? ;).
For the books, a remote service connecting and subscribing to our Registry:

ConnectableActor gravity = new TCPConnectable(Gravity.class, host, port );
gravity = (Gravity) gravityConnectable.connect((conn, err) -> {
    gravityDisconnected(conn,err);
}).await();
// make this service available to all cluster members
gravity.registerService(getServiceDescription());
// listen to events emitted by gravity
gravity.subscribe( (pair, err) -> serviceEvent(pair.car(), pair.cdr(), err) );


Published in: