Reflection on Designing Distributed System

Just finished reading “Designing Distributed Systems: Patterns and Paradigms for Scalable, Reliable Services” by Brendan Burns. I enjoyed reading the book. I think this book is going to add value to people who has recently get their hands on docker and orchestration tools like kubernetes. It is going to empower those developers/devops/admins by helping them to learn more about containerization design patterns. This book is only 150+ pages and it discusses wide range of tools and techniques with hands on example but I would definitely recommend this book for the theory and discussion rather than the practical aspect of it.

Containers establish boundaries that helps to provide separation of concerns around specific resources. When the scaling is not requirement and application runs on a single node, but the application is being run only on a single node, containerising still is going to be better choice because not only it helps us to scale but also it can be used to organise and maintain better.

Sometime in an application, we have tasks that runs in the background and other tasks that are served to endusers. Often the tasks that serves endusers are more important. We can prioritise those tasks in our containers by scaling those containers in a node appropriately.

When engineering teams starts to scale, for better engineering performance, usually a team of developers comprises of six to eight people. When more engineers join, usually it increases the number of teams. A better engineering performance, maintainability, accountability and ownership structure can be implemented by assigning the responsibility of a containerised application to a team. Additionally, often some of the components, if factored properly, are reusable modules that can be used by many teams. 

Single node patterns:

  • Sidecar Pattern: Sidecar containers, are a group of containers that runs together and stayed together on the same node, and they share processes, ports and so on. It can be a very useful when dealing with legacy services or 3rd party maintained closed source application about which we have very little knowledge. In sidecar pattern, unencrypted traffic is only sent via the local loopback adapter inside the container group, that makes the data and communication between them safe.
  • Ambassador Pattern: In ambassador pattern the two containers are tightly linked in a symbiotic pairing that is scheduled to a single machine. Ambassador can be used as proxy server. In a sharded system, ambassador pattern can be used to route to a Shard. Ambassador can be used for Service Broker as well.
  • Adapters: Although every application is different, they still have some things in common. They need to be logged, monitored, maintained, etc. Adapter patter takes advantage of this use case and implemented in a sidecar pattern. This common use case can be modularized and use as sidecar container. Popular use case can be accessing and collecting desired logs, monitoring performance by periodic executing a command, etc via a container that runs as sidecar.

Serving Patterns:

Replicated Load-Balanced Services: In replicated load balanced services, the same data is being replicated across multiple server and a service is being used as a load balancer.

Sharded Services: When the size of data becomes huge for a single machine, the data is splitted and distributed across multiple machine. A router is placed in front of the service to identify which shard to route to to fetch that data. Usually a carefully chosen hashing key is used to identify where the data is located and which server to route to. For better performance and reliability, replication and sharding is used together.

Scatter/Gather: Scatter/gather is quite useful when there is a large amount of mostly independent processing that is needed to handle a particular request. Scatter/gather can be seen as sharding the computation necessary to service the request, rather than sharding the data.

When there is more data that a single machine can hold or process in time, it becomes necessary to introduce sharding by splitting the data across multiple devices. Then when it comes to querying, it is going to require query to all the root notes then the partial responses from the nodes returns to the root node and that root node merges all of the responses together to form a comprehensive response for the user. The limitation of this approach is, the response time is dependent on lowest performing node, thus increased parallelism doesn’t always speed things up because of overhead or straggler on each node. Also mathematically speaking, 99th percentile latency on 5 node for individual requests becomes a 95th percentile latency for our complete scatter/gather system. And it only gets worse from there: if we scatter out to 100 leaves, then we are more or less guaranteeing that our overall latency for all requests will be 2 seconds. 

A single replica means that if it fails, all scatter/gather requests will fail for the duration that the shard is unavailable because all requests are required to be processed by all leaf nodes in the scatter/gather pattern. This risk is being addressed by adding multiple replica of a node.

Functions and Event-Driven Processing 

Function level decoupling: In microservice approach we splitted our application in small parts and managed by a team. FaaS takes it to the next level. It forces to strongly decouple each function of service.

 There is a common saying that 80% of the daily users are going to be using 20% of the features. For the rest of 80% features that are not being used extensively, can be an ideal candidate for FaaS that charges based on number of requests so we are only paying for the time when your service is actively serving requested and we are always paying for processor cycles that is largely sitting around waiting for a user request. 

The function in FaaS, dynamically spuns up in response to a user request while the user is waiting, the need to load a lot of detail may significantly impact the latency that the user perceives while interacting with your service and this loading cost can be amortized across a large number of requests. 

This approach also has many limitations, the function communicates with each other via network and each function instance cannot have high local memory. So maybe it is a good fit for responding to temporal events, but it is still not sufficient infrastructure for generic background processing and the requiring states needs to be stored in a storage service which adds more complexity in code.

Because of this highly decoupled nature, from the other functions, there is no real representation of the dependencies or interactions between different functions, it becomes difficult to find bugs. Also the cost of a bug related to infinity loop is high.

Batch Computational Patterns: for reliable, long-running server applications. This section describes patterns for batch processing. In contrast to long- running applications, batch processes are expected to only run for a short period of time. 

Work Queue Systems: In the containerised work queue, there are two interfaces: the source container interface, which provides a stream of work items that need processing, and the worker container interface, which knows how to actually process a work item. The work processor can be implemented by a container that processes Shared Work Queue where items in the queue as batch operation or it can be implemented in multi-worker pattern which transforms a collection of different worker containers into a single unified container that implements the worker interface, yet delegates the actual work to a collection of different, reusable containers.

Depending on workload, shared work queued containers needs to be scaled accordingly. Keeping one or few workers running when the frequency of works coming is very low can be overkill. Depending on the workload for a task we can also consider implementing multi-worker pattern using kubernetes jobs to implement work processor. We can easily write a dynamic job creator, which is going to spawn a new job when there is a new item in the queue.

Event-Driven Batch Processing: Work queues are great for enabling individual transformations of one input to one output. However, there are a number of batch applications where you want to perform more than a single action, or you may need to generate multiple different outputs from a single data input. In these cases, you start to link work queues together so that the output of one work queue becomes the input to one or more other work queues, and so on. This forms a series of processing steps that respond to events, with the events being the completion of the preceding step in the work queue that came before it.

In an complex event driven system, often the event needs to go through many steps where data needs to divided into few queues, sometime it requires to merged to get an output.

Copier: The job of a copier is to take a single stream of work items and duplicate it out into two or more identical streams. 

Filter: The role of a filter is to reduce a stream of work items to a smaller stream of work items by filtering out work items that don’t meet particular criteria. 

Splitter: divide them into two separate work queues without dropping any of them. 

Sharder: divide up a single queue into an evenly divided collection of work items based upon some sort of shard‐ ing function

Merger: the job of a merger is to take two different work queues and turn them into a single work queue. 

Coordinated Batch processing:

To achieve more complex event driven architecture, we are going to need more coordinated batch processing techniques.

Join (or Barrier Synchronization) : In merge, it does not ensure that a complete dataset is present prior to the beginning of processing but when it comes to join, it does not complete until all of the work items that are processed. It reduces the parallelism that is possible in the batch workflow, and thus increases the overall latency of running the workflow. 

Reduce : Rather than waiting until all data has been processed, it optimistically merge together all of the parallel data items into a single comprehensive representation of the full set. In order to produce a complete output, this process is repeated untill all of the data is processed, but the ability to begin early means that the batch com‐ putation executes more quickly overall.

database strategies for microservices

Last blog I have talked about the problem of database lockup but how can we solve it?

Shared Tables: Shared Tables could be a easy to go and a dirty solution that is very common. But be aware that it is high maintenance.
Using mysql we can use FEDERATED ENGINE (http://dev.mysql.com/doc/refman/5.1/en/federated-storage-engine.html) to do this. We have to create a federated table based on the table at another remote location that we want.

CREATE TABLE remote_user (
  username varchar(20) NOT NULL,
  password varbinary(20) NOT NULL,
  PRIMARY KEY(username)
) ENGINE=FEDERATED DEFAULT CHARSET=utf8 CONNECTION='mysql://username:password@someip:port/db/user’;

Database View : A database view is a comparatively better approach when the cases are simple because it allows another representation of database model which is more suitable. Most amazing thing about database view is that it supports wide range of databases. But for heavy use cases we can see performance issues. While considering database view we must ensure that both of the databases can connect with each other without any network or firewall issue. Most of the database views are read only, updating them according to need might get tricky.

CREATE TABLE federated_table (
    [column definitions go here]
)
ENGINE=FEDERATED
CONNECTION='mysql://username:password@someip:port/db/user’;

Triggers:
Database triggers might come handy where one database operation will trigger another database update. We can bind to AFTER INSERT, AFTER UPDATE, and AFTER DELETE triggers.

CREATE TRIGGER user_bi BEFORE INSERT ON user FOR EACH ROW
BEGIN
  INSERT INTO remote_user (username,password) VALUES (NEW.username,NEW.password);
END

Data Virtualization: When we are dealing with micro services possibly some of our databases are running using Mysql while other services are running other DBMS. In that case Data Virtualization strategy is necessary. One open source data virtualization platform is Teiid. But when dealing with data virtualization strategy we must know that if we are dealing with stale data or not, as it will have serious performance issue as it will add another hop as the data is not being accessed directly from database.

Event sourcing: Rather then making database operatins we can consider designing it as a stream of events that goes one after another through as message broker. So it does not matter how many users are accessing your database it will never lock up your database but it would take more time to process the data.

Change Data Capture: Another approach is to use Change Data Capture (CDC), is an integration strategy that captures the changes that are being made to a data and makes them available as a sequence of events in other databases that needs to know about these changes. It can be implemented using Apache Kafka, Debezium and so on.

Simple trick that can can help us to achieve Zero Downtime when dealing with DB migration

Currently we are dealing with quite a few deployment processes. For a company that enables DevOps culture, deployment happens many many times a day. Tiny fraction of code change goes to deployment, and as the change size is so small it gets easier to spot a bug and if the bug is crucial maybe it is time to rollback to an older version and to be able to have a database that accepts rollback, yet we have to do it with zero downtime so that the user do not understand a thing. It is often is not as easy as it sounds in principal.

Before describing about few key idea to solve this common problem lets discuss few of our most common deployment architectures.

In a blue/green deployment architecture, it consists of two different version of application running concurrently, one of them can be the production stage and another one can be development platform, but we need to note that both of the version of the app must be able to handle 100% of the requests. We need to configure the proxy to stop forwarding requests to the blue deployment and start forwarding them to the green one in a manner that it works on-the-fly so that no incoming requests will be lost between the changes from blue deployment to green.

Canary Deployment is a deployment architecture where rather than forwarding all the users to a new version, we migrate a small percentage of users or a group of users to new version. Canary Deployment is a little bit complicated to implement, because it would require smart routing Netflix’s OSS Zuul can be a tool that helps. Feature toggles can be done using FF4J and Togglz.

As we can see that most of the deployment processes requires 2 version of the application running at the same time but the problem arises when there is database involved that has migration associated with it because both of the application must be compatible with the same database.So the schema versions between consecutive releases must be mutually compatible.

Now how can we achieve zero downtime on these deployment strategies?

So we can’t do database migrations that are destructive or can potentially cause us to lose data. In this blog we will be discussing how can we approach database migrations:

One of the most common problem that we face during UPDATE TABLE is that it locks up the database. We don’t control the amount of time it will take to ALTER TABLE but most popular DBMSs available in the market, issuing an ALTER TABLE ADD COLUMN statement won’t lead to locking. For example if we want to change the type of field of database field rather than changing the field type we can add a new column.

When adding column we should not be adding a NOT NULL constraint at the very beginning of the migration even if the model requires it because this new added column will only be consumed by the new version of the application where as the new version still doesn’t provide any value for this newly added column and it breaks the INSERT/UPDATE statements from current version. We need to assure that the new version reads values from the old column but writes on both.  This is to assure that all new rows will have both columns populated with correct values. Now that new columns are being populated in a new way, it is time to deal with the old data, we need to copy the data from the old column to the new column so that all of your current rows also have both columns populated, but the locking problem arises when we try to UPDATE.

Instead of just issuing a single statement to achieve a single column rename, we’ll need to get used to breaking these big changes into multiple smaller changes. One of the solution could be taking baby steps like this:

ALTER TABLE customers ADD COLUMN correct VARCHAR(20); UPDATE customers SET correct = wrong

WHERE id BETWEEN 1 AND 100; UPDATE customers SET correct = wrong

WHERE id BETWEEN 101 AND 200;
ALTER TABLE customers DELETE COLUMN wrong;

When we are done with old column data population. Finally when we would have enough confidence that we will never need the old version, we can delete a column, as it is a destructive operation the data will be lost and no longer recoverable.

As a precaution, we should delete only after a quarantine period. After quarantined period when we are enough confident that we would no longer need our old version of schema or even a rollback that does require that version of schema then we can stop populating the old column.  If you decide to execute this step, make sure to drop any NOT NULL constraint or else you will prevent your code from inserting new rows.

Running a on premise local mysql replica with AWS RDS Aurora master

To solve our problem we are running a hybrid cloud. Few of our services are running on cloud and some of our services are running in premise locally in our country where we have our users and where AWS does not provide service. To able to do that we need a database replica that has read facility.

We need to creating replica user:

CREATE USER 'replica'@'%' IDENTIFIED BY 'slavepass'; 
GRANT REPLICATION SLAVE ON *.* TO 'replica'@'%';

Then create a new DB Cluster parameter group and set binlog_format to MIXED. Modify the Aurora cluster and select the custom parameter group. Restart your db to apply those changes. Now if you run following command you will be able to see the bin log file name and position.

show master status

Now we need to dump our master user data to sql dump so that we can feed our slave database.

mysqldump --single-transaction --routines --triggers --events -h XXX.azhxxxxxx2zkqxh3j.us-east-1.rds.amazonaws.com -u bhuvi –-password='xxx' my_db_name > my_db_name.sql

It can be GB to TB of data depending on your database size. So it will take time to download.

Run follwoing to know your mysql configuration file:

mysqld --help -verbose | grep my.cnf

For me it is /usr/local/etc/my.cnf

vi /usr/local/etc/my.cnf

and change server-id to:

 [mysqld] server-id = 2

now lets import these data into our mysql.

mysql -u root –-password='xxx' my_db_name < my_db_name.sql

Now we need to let our slave database know who is the master:

CHANGE MASTER TO  
MASTER_HOST = 'RDS END Point name',  
MASTER_PORT = 3306,  
MASTER_USER = '',  
MASTER_PASSWORD = '',  
MASTER_LOG_FILE='',  
MASTER_LOG_POS=;

Now we need to start the slave.

start slave;

Sample AWS CodeDeploy configuration for django

AWS has its own continuous integration tool known as CodeDeploy, using a simple command you would be able to deploy on multiple servers when you want to change something on code base.

Installing code deploy to instance

If code deploy client is not installed at your instance, you would need to do that:

sudo yum install -y ruby wget
cd /opt
wget https://aws-codedeploy-ap-south-1.s3.amazonaws.com/latest/install
chmod +x ./install
sudo ./install auto

Create CodeDeploy Application

You have to create Code Deploy application with Deployment type to Inplace deployment, and deployment Configuration set to CodeDeployDefault.OneAtATime.
Give it a name under Ec2 configuration and Amazon ec2 instance, say the name is Code deploy instance. Now you have to add the same tag to all your code deploy instances.

Set IAM Permissions

Now that we are done with installation, we would need to setup IAM rules:
First create an IAM group called CodeDeployGroup. This group needs AmazonS3FullAccess and AWSCodeDeployFullAccess permissions. Create a user and add it to this group. This user only needs programmatic access.Save key and key id to somewhere safe.

Create role that has Trusted entities and Policies are ec2.amazonaws.com and AWSCodeDeployRole AmazonS3FullAccess, respectively.

Edit trust relationship to following:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": [
            "ec2.amazonaws.com",
            "codedeploy.ap-south-1.amazonaws.com"
        ]
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

Create new s3 bucket with previously created IAM rules.

CodeDeploy configuration

My codebase  structure is something like following:

- src
  - <django project>
- scripts
   install_dependencies
   start_server
   stop_server
appspec.yml
codedeploy_deploy.py
deploy.sh

appspec.yml is the file that contains our hooks and configuration for code deploy.

version: 0.0
os: linux
files:
  - source: src
    destination: /home/centos/proj_name
hooks:
  BeforeInstall:
    - location: scripts/install_dependencies
      timeout: 300
      runas: root
  ApplicationStop:
    - location: scripts/stop_server
      timeout: 300
      runas: root
  ApplicationStart:
    - location: scripts/start_server
      timeout: 300
      runas: root

for django scripts/install_dependencies may look like following:

sudo yum install -y gcc openssl-devel bzip2-devel wget
sudo yum install -y make git
cd /opt
command -v python3.6 || {
    wget https://www.python.org/ftp/python/3.6.3/Python-3.6.3.tgz
    tar xzf Python-3.6.3.tgz
    cd Python-3.6.3
    sudo ./configure --enable-optimizations
    sudo make altinstall
}
sudo yum install -y mysql-devel

for scripts/start_server I have following:

cd /home/centos/evaly
pip3.6 install -r requirements.txt
nohup uwsgi --http :80 --module evaly.wsgi > /dev/null 2>&1 &

for scripts/stop_server I have following:

pkill uwsgi

I have borrowed a python script from bitbucket team which looks like following:

# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file
# except in compliance with the License. A copy of the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is distributed on an "AS IS"
# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under the License.
"""
A BitBucket Builds template for deploying an application revision to AWS CodeDeploy
narshiva@amazon.com
v1.0.0
"""
from __future__ import print_function
import os
import sys
from time import strftime, sleep
import boto3
from botocore.exceptions import ClientError

VERSION_LABEL = strftime("%Y%m%d%H%M%S")
BUCKET_KEY = os.getenv('APPLICATION_NAME') + '/' + VERSION_LABEL + \
    '-bitbucket_builds.zip'

def upload_to_s3(artifact):
    """
    Uploads an artifact to Amazon S3
    """
    try:
        client = boto3.client('s3')
    except ClientError as err:
        print("Failed to create boto3 client.\n" + str(err))
        return False
    try:
        client.put_object(
            Body=open(artifact, 'rb'),
            Bucket=os.getenv('S3_BUCKET'),
            Key=BUCKET_KEY
        )
    except ClientError as err:
        print("Failed to upload artifact to S3.\n" + str(err))
        return False
    except IOError as err:
        print("Failed to access artifact.zip in this directory.\n" + str(err))
        return False
    return True

def deploy_new_revision():
    """
    Deploy a new application revision to AWS CodeDeploy Deployment Group
    """
    try:
        client = boto3.client('codedeploy')
    except ClientError as err:
        print("Failed to create boto3 client.\n" + str(err))
        return False

    try:
        response = client.create_deployment(
            applicationName=str(os.getenv('APPLICATION_NAME')),
            deploymentGroupName=str(os.getenv('DEPLOYMENT_GROUP_NAME')),
            revision={
                'revisionType': 'S3',
                's3Location': {
                    'bucket': os.getenv('S3_BUCKET'),
                    'key': BUCKET_KEY,
                    'bundleType': 'zip'
                }
            },
            deploymentConfigName=str(os.getenv('DEPLOYMENT_CONFIG')),
            description='New deployment from BitBucket',
            ignoreApplicationStopFailures=True
        )
    except ClientError as err:
        print("Failed to deploy application revision.\n" + str(err))
        return False     
           
    """
    Wait for deployment to complete
    """
    while 1:
        try:
            deploymentResponse = client.get_deployment(
                deploymentId=str(response['deploymentId'])
            )
            deploymentStatus=deploymentResponse['deploymentInfo']['status']
            if deploymentStatus == 'Succeeded':
                print ("Deployment Succeeded")
                return True
            elif (deploymentStatus == 'Failed') or (deploymentStatus == 'Stopped') :
                print ("Deployment Failed")
                return False
            elif (deploymentStatus == 'InProgress') or (deploymentStatus == 'Queued') or (deploymentStatus == 'Created'):
                continue
        except ClientError as err:
            print("Failed to deploy application revision.\n" + str(err))
            return False      
    return True

def main():
    if not upload_to_s3('/Users/sadafnoor/Projects/evaly/artifact.zip'):
        sys.exit(1)
    if not deploy_new_revision():
        sys.exit(1)

if __name__ == "__main__":
    main()

I have written a script to zip up my source code so that the script can upload it to s3 and eventually all my ec2 instances will be downloading that zip from s3.

export APPLICATION_NAME="CodeDeployApplicationName" 
export AWS_ACCESS_KEY_ID="IAMUserKeyId"
export AWS_DEFAULT_REGION="ap-south-1"

export AWS_SECRET_ACCESS_KEY="IAMUserSecretKey"
export DEPLOYMENT_CONFIG="CodeDeployDefault.OneAtATime"

export DEPLOYMENT_GROUP_NAME="CodeDeployDeploymentGroup"
export S3_BUCKET="S3BucketName"
zip -r ../artifact.zip src/* appspec.yml scripts/*
python codedeploy_deploy.py