Reflection on Designing Distributed System

Just finished reading “Designing Distributed Systems: Patterns and Paradigms for Scalable, Reliable Services” by Brendan Burns. I enjoyed reading the book. I think this book is going to add value to people who has recently get their hands on docker and orchestration tools like kubernetes. It is going to empower those developers/devops/admins by helping them to learn more about containerization design patterns. This book is only 150+ pages and it discusses wide range of tools and techniques with hands on example but I would definitely recommend this book for the theory and discussion rather than the practical aspect of it.

Containers establish boundaries that helps to provide separation of concerns around specific resources. When the scaling is not requirement and application runs on a single node, but the application is being run only on a single node, containerising still is going to be better choice because not only it helps us to scale but also it can be used to organise and maintain better.

Sometime in an application, we have tasks that runs in the background and other tasks that are served to endusers. Often the tasks that serves endusers are more important. We can prioritise those tasks in our containers by scaling those containers in a node appropriately.

When engineering teams starts to scale, for better engineering performance, usually a team of developers comprises of six to eight people. When more engineers join, usually it increases the number of teams. A better engineering performance, maintainability, accountability and ownership structure can be implemented by assigning the responsibility of a containerised application to a team. Additionally, often some of the components, if factored properly, are reusable modules that can be used by many teams. 

Single node patterns:

  • Sidecar Pattern: Sidecar containers, are a group of containers that runs together and stayed together on the same node, and they share processes, ports and so on. It can be a very useful when dealing with legacy services or 3rd party maintained closed source application about which we have very little knowledge. In sidecar pattern, unencrypted traffic is only sent via the local loopback adapter inside the container group, that makes the data and communication between them safe.
  • Ambassador Pattern: In ambassador pattern the two containers are tightly linked in a symbiotic pairing that is scheduled to a single machine. Ambassador can be used as proxy server. In a sharded system, ambassador pattern can be used to route to a Shard. Ambassador can be used for Service Broker as well.
  • Adapters: Although every application is different, they still have some things in common. They need to be logged, monitored, maintained, etc. Adapter patter takes advantage of this use case and implemented in a sidecar pattern. This common use case can be modularized and use as sidecar container. Popular use case can be accessing and collecting desired logs, monitoring performance by periodic executing a command, etc via a container that runs as sidecar.

Serving Patterns:

Replicated Load-Balanced Services: In replicated load balanced services, the same data is being replicated across multiple server and a service is being used as a load balancer.

Sharded Services: When the size of data becomes huge for a single machine, the data is splitted and distributed across multiple machine. A router is placed in front of the service to identify which shard to route to to fetch that data. Usually a carefully chosen hashing key is used to identify where the data is located and which server to route to. For better performance and reliability, replication and sharding is used together.

Scatter/Gather: Scatter/gather is quite useful when there is a large amount of mostly independent processing that is needed to handle a particular request. Scatter/gather can be seen as sharding the computation necessary to service the request, rather than sharding the data.

When there is more data that a single machine can hold or process in time, it becomes necessary to introduce sharding by splitting the data across multiple devices. Then when it comes to querying, it is going to require query to all the root notes then the partial responses from the nodes returns to the root node and that root node merges all of the responses together to form a comprehensive response for the user. The limitation of this approach is, the response time is dependent on lowest performing node, thus increased parallelism doesn’t always speed things up because of overhead or straggler on each node. Also mathematically speaking, 99th percentile latency on 5 node for individual requests becomes a 95th percentile latency for our complete scatter/gather system. And it only gets worse from there: if we scatter out to 100 leaves, then we are more or less guaranteeing that our overall latency for all requests will be 2 seconds. 

A single replica means that if it fails, all scatter/gather requests will fail for the duration that the shard is unavailable because all requests are required to be processed by all leaf nodes in the scatter/gather pattern. This risk is being addressed by adding multiple replica of a node.

Functions and Event-Driven Processing 

Function level decoupling: In microservice approach we splitted our application in small parts and managed by a team. FaaS takes it to the next level. It forces to strongly decouple each function of service.

 There is a common saying that 80% of the daily users are going to be using 20% of the features. For the rest of 80% features that are not being used extensively, can be an ideal candidate for FaaS that charges based on number of requests so we are only paying for the time when your service is actively serving requested and we are always paying for processor cycles that is largely sitting around waiting for a user request. 

The function in FaaS, dynamically spuns up in response to a user request while the user is waiting, the need to load a lot of detail may significantly impact the latency that the user perceives while interacting with your service and this loading cost can be amortized across a large number of requests. 

This approach also has many limitations, the function communicates with each other via network and each function instance cannot have high local memory. So maybe it is a good fit for responding to temporal events, but it is still not sufficient infrastructure for generic background processing and the requiring states needs to be stored in a storage service which adds more complexity in code.

Because of this highly decoupled nature, from the other functions, there is no real representation of the dependencies or interactions between different functions, it becomes difficult to find bugs. Also the cost of a bug related to infinity loop is high.

Batch Computational Patterns: for reliable, long-running server applications. This section describes patterns for batch processing. In contrast to long- running applications, batch processes are expected to only run for a short period of time. 

Work Queue Systems: In the containerised work queue, there are two interfaces: the source container interface, which provides a stream of work items that need processing, and the worker container interface, which knows how to actually process a work item. The work processor can be implemented by a container that processes Shared Work Queue where items in the queue as batch operation or it can be implemented in multi-worker pattern which transforms a collection of different worker containers into a single unified container that implements the worker interface, yet delegates the actual work to a collection of different, reusable containers.

Depending on workload, shared work queued containers needs to be scaled accordingly. Keeping one or few workers running when the frequency of works coming is very low can be overkill. Depending on the workload for a task we can also consider implementing multi-worker pattern using kubernetes jobs to implement work processor. We can easily write a dynamic job creator, which is going to spawn a new job when there is a new item in the queue.

Event-Driven Batch Processing: Work queues are great for enabling individual transformations of one input to one output. However, there are a number of batch applications where you want to perform more than a single action, or you may need to generate multiple different outputs from a single data input. In these cases, you start to link work queues together so that the output of one work queue becomes the input to one or more other work queues, and so on. This forms a series of processing steps that respond to events, with the events being the completion of the preceding step in the work queue that came before it.

In an complex event driven system, often the event needs to go through many steps where data needs to divided into few queues, sometime it requires to merged to get an output.

Copier: The job of a copier is to take a single stream of work items and duplicate it out into two or more identical streams. 

Filter: The role of a filter is to reduce a stream of work items to a smaller stream of work items by filtering out work items that don’t meet particular criteria. 

Splitter: divide them into two separate work queues without dropping any of them. 

Sharder: divide up a single queue into an evenly divided collection of work items based upon some sort of shard‐ ing function

Merger: the job of a merger is to take two different work queues and turn them into a single work queue. 

Coordinated Batch processing:

To achieve more complex event driven architecture, we are going to need more coordinated batch processing techniques.

Join (or Barrier Synchronization) : In merge, it does not ensure that a complete dataset is present prior to the beginning of processing but when it comes to join, it does not complete until all of the work items that are processed. It reduces the parallelism that is possible in the batch workflow, and thus increases the overall latency of running the workflow. 

Reduce : Rather than waiting until all data has been processed, it optimistically merge together all of the parallel data items into a single comprehensive representation of the full set. In order to produce a complete output, this process is repeated untill all of the data is processed, but the ability to begin early means that the batch com‐ putation executes more quickly overall.

How to run a simple load test on Geth Node

In this blog, I am going to share a simple code snippet that I have used to run a load tests on geth node. To get things done, I am using the locust python library, and to connect to get node, I am using web3 library. This piece of code finds 10 addresses at the initialization step. After initialization, it spawns a predefined number of users and tries to find the balance of 1 of those 10 addresses.

Continue reading “How to run a simple load test on Geth Node”

A dirty patch to fix Django annotate related group by year/month/day related bug

Recently I was working on a Django project with a team, where we wanted to run some group-by queries for analytical data representation. As we already know that Django does not directly support group-by but there are ways one can achieve it by using Django values and annotate functions.

Model.objects.annotate(year=ExtractYear('timestamp'))
        .values('year')
        .annotate(ycount=Count('id'))

It was supposed to return a QuerySet that contains a count of entries that has been created in a specific year. Instead, it was returning a QuerySet that containing individual data.

During the first step of my investigation, I tried to log the SQL query that is associated with this query and it logged something like this.

SELECT EXTRACT(YEAR FROM `tablename`.`timestamp`) AS `year`, COUNT(`tablename`.`id`) AS `ycount` FROM `tablename` GROUP BY EXTRACT(YEAR FROM `tablename`.`timestamp`), `tablename`.`timestamp`

The SQL query that I wanted my ORM to create was:

SELECT EXTRACT(YEAR FROM `tablename`.`timestamp`) AS `year`, COUNT(`tablename`.`id`) AS `ycount` FROM `tablename` GROUP BY EXTRACT(YEAR FROM `tablename`.`timestamp`)

The difference was subtle, but the Django was grouping by using two fields and that was the reason behind this unintended result.

How can we possibly bypass this possible bug from Django? Since we had no way to group timestamps. The solution I had in mind is, while running queries, what if I can temporarily replace the value of timestamp on the runtime? Since values, or F does not allow to replace value of a field I had to rely on extra function that comes with Django.

Model.objects.annotate(year=ExtractYear('timestamp'))
             .values('year')extra(select = {'timestamp': 'year'})
             .annotate(ycount=Count('id'))

Which has produced the following SQL:

SELECT DATE(`tablename`.`timestamp`) AS `date`, MAX(`tablename`.`quantity`) AS `count` FROM `tablename` GROUP BY DATE(`tablename`.`timestamp`), (date)

It is probably not the ideal solution but it got things done before the Django team solves the problem. If you have a better solution in mind, I would love to talk about it and implement it.

Writing a k8s controller in GoLang: delete pods on secret change

The motivation behind writing was to explore how customer controller works in kubernetes. As I was doing it, i felt like doing something that solves a problem. Most of the deployment practically we use has some form of secret mounted with it. It is usually a common practice to iterate those secret every now and then, but one problem that we face is that, after we change the secrets of kubernetes, it does not reflect the pods immediately. It is never a flaw but the idea here is, along with the new deployment of the application, the secrets are going to be changed but for people like me, who wants to see the changes immediately, it can be annoying sometime. One way to solve the problem is to kill the pods associated with the deployment one by one. As the pods are being recreated, it picks the latest secret instead of the old one. Usually developers uses kubectl command to delete the pods but in this blog I am going to write a custom controller using go lang.

Continue reading “Writing a k8s controller in GoLang: delete pods on secret change”

Shallow diving k8s components: etcd

Etcd is a high available key-value data storage that stores all the data necessary for running a Kubernetes cluster. The first time I learned about etcd i asked myself why? There are so many production-ready key-value databases out there. Why did the Kubernetes team choose etcd? What am I missing? That lead me to learn more about etcds. Etcd is perfect for kubernetes because of at least 2 reasons. One of them is because it is robust in nature. It makes sure that the data are consistent across the cluster. It makes sure that it is highly available. Another reason is, it has a feature called watch. Watch allows an observer to subscribe to changes on a particular data. It goes perfectly with Kubernete’s design paradigm.

Continue reading “Shallow diving k8s components: etcd”

Collecting docker and syslogs using ssl enabled filebeat OpenDistro ELK

docker-compose.yml

version: '3'

services:

  oelk-node1:
    image: amazon/opendistro-for-elasticsearch:0.9.0
    container_name: oelk-node1
    environment:
      - cluster.name=oelk-cluster
      - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM
      - opendistro_security.ssl.http.enabled=false
      - path.repo=/usr/share/elasticsearch/backup
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - oelk-data1:/usr/share/elasticsearch/data
      - /var/log/elasticsearchbkup:/usr/share/elasticsearch/backup
    ports:
      - 9200:9200
      - 9600:9600 # required for Performance Analyzer
    networks:
      - oelk-net

  oelk-node2:
    image: amazon/opendistro-for-elasticsearch:0.9.0
    container_name: oelk-node2
    environment:
      - cluster.name=oelk-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.zen.ping.unicast.hosts=oelk-node1
      - opendistro_security.ssl.http.enabled=false

    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - oelk-data2:/usr/share/elasticsearch/data
    networks:
      - oelk-net

  kibana:
    image: amazon/opendistro-for-elasticsearch-kibana:0.9.0
    container_name: oelk-kibana
    ports:
      - 5601:5601
    expose:
      - "5601"
    environment:
      ELASTICSEARCH_URL: http://oelk-node1:9200
      ELASTICSEARCH_HOSTS: https://oelk-node1:9200
    networks:
      - oelk-net

  logstash:
    image: docker.elastic.co/logstash/logstash:6.7.1
    volumes:
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
      - ./logstash/pipeline:/usr/share/logstash/pipeline:ro
      - "./certs:/etc/certs"
    ports:
      - "5044:5044"
    environment:
      LS_JAVA_OPTS: "-Xmx256m -Xms256m"
    networks:
      - oelk-net
    depends_on:
      - oelk-node1
      - oelk-node2

  filebeat:
    hostname: filebeat
    build:
      context: filebeat
      dockerfile: Dockerfile
    volumes:
      - "/var/lib/docker/containers:/usr/share/dockerlogs/data:ro"
      - "/var/logs:/usr/share/syslogs:ro"
      - "/var/log/syslog:/var/log/syslog.log:ro"
      - "/var/run/docker.sock:/var/run/docker.sock"
      - "./certs:/etc/certs"
    networks:
      - oelk-net
    depends_on:
      - logstash

volumes:
  oelk-data1:
  oelk-data2:

networks:
  oelk-net:

pipeline/logstash.conf

input{
	 	beats { 
 			port => 5044
			ssl => true
			ssl_certificate_authorities => ["/etc/certs/ca.crt"]
			ssl_certificate => "/etc/certs/logstash.crt"
			ssl_key => "/etc/certs/logstash.key"
			ssl_verify_mode => "force_peer"
		}
        # http{
        #     port => 5044
        # }
}
filter {
#   if [docker][image] =~ /^logstash/ {
#     drop { }
#   }
mutate {
    rename => ["host", "server"]
	convert => {"server" => "string"} #this may be be not necessary but just in case added it
}

}
## Add your filters / logstash plugins configuration here

output {
        elasticsearch {
                hosts => "oelk-node1:9200"
				user => admin
				password => admin
		}
}

filebeat/Dockerfile

FROM docker.elastic.co/beats/filebeat:6.7.1
#FROM docker-logs-elk/filebeat:1.0.0
# Copy our custom configuration file
COPY config/filebeat.yml /usr/share/filebeat/filebeat.yml

USER root
# Create a directory to map volume with all docker log files
#RUN mkdir /usr/share/filebeat/dockerlogs
RUN chown -R root /usr/share/filebeat/filebeat.yml
RUN chmod -R go-w /usr/share/filebeat/filebeat.yml

filebeat.yml

filebeat.inputs:
- type: docker
  combine_partial: true
  containers:
    path: "/usr/share/dockerlogs/data"
    stream: "stdout"
    ids:
      - "*"
# - type: log

#   # Change to true to enable this input configuration.
#   enabled: true

#   # Paths that should be crawled and fetched. Glob based paths.
#   paths:
#     - /var/log/syslog.log

# filebeat.prospectors:
# - type: log
#   enabled: true
#   paths:
#    - '/usr/share/dockerlogs/data/*/*-json.log'
#   json.message_key: log
#   json.keys_under_root: true
#   processors:
#   - add_docker_metadata: ~

output:
  logstash:
    hosts: ["logstash:5044"]
    ssl.certificate_authorities: ["/etc/certs/ca.crt"]
    ssl.certificate: "/etc/certs/beat.crt"
    ssl.key: "/etc/certs/beat.key"

How to add flask-admin to a Blueprint?

For those who works with closed source tools you won’t understand the freedom that we have on opensource tools because in real life our requirements changed over time, the tool that we are using they also start to grow and start to cover things they did not had in mind when they started the project. As a developer, we want our tools to do different things and as a person to person and project to project, we may have different sense of beauty and different meaning of code organization phylosophy. In computer science in general we always try to map our problem with a known solution that we have already solved before. So when you have access to source code of your tool, you can easily dig up that source code extend or alter the functionality and map your solution that matches your situation.

For example while working on python flask framework after couple years I realized how big they have grown over time, they do pretty much everything django is capable to do and it is even better because of their sense of modularity and flexibility. So for this project I am working on I started using flask, flask-admin for administrative panel and I am using flasks blueprint to separate different components of my project. Flask admin is actually not very comfortable or easy to attach with blueprints, that actually makes sense because because it adds admin panel and admin panel should be attached with the main app rather than a sub app like blueprint. But I actually had different use case and with admin panel I had to add my custom views which I don’t want to put at my app.py rather I want it to be in my controller. Other class architecture I had in mind will cause a circular dependency which I always get me in panic. I may not be very much neat and clean, pretty, tidy, person in personal life, I know I have limitations but I try to keep my code pretty and tidy and a thing of beauty that made me dig up the source code of those libraries at my office hours to rewrite this. Enough talk, if Linus Torvalds visits my blog ever he is going to get real mad at me for talking too much. So here you go, my code that I am using that satisfies my need:

# admin_blueprint.py

from flask import Blueprint
from flask_admin.contrib.sqla import ModelView
from flask_admin import Admin

class AdminBlueprint(Blueprint):
    views=None


    def __init__(self,*args, **kargs):
        self.views = []
        return super(AdminBlueprint, self).__init__('admin2', __name__,url_prefix='/admin2',static_folder='static', static_url_path='/static/admin')


    def add_view(self, view):
        self.views.append(view)

    def register(self,app, options, first_registration=False):
        print app
        admin = Admin(app, name='microblog', template_mode='adminlte')

        for v in self.views:
            admin.add_view(v)

        return super(AdminBlueprint, self).register(app, options, first_registration)

#app/admin/controller.py
from admin_blueprint import AdminBlueprint
from common.models import MyModel, db

app = AdminBlueprint('admin2', __name__,url_prefix='/admin2',static_folder='static', static_url_path='/static/admin')
app.add_view(ModelView(MyModel, db.session))
#app/__init__.py

from flask_sqlalchemy import SQLAlchemy

# Define the WSGI application object
app = Flask(__name__,template_folder="../templates",static_folder="../templates")

from app.api.controllers import app as api
from app.frontend.controllers import app as frontend
from app.admin.controllers import app as admin



# Register blueprint(s)
app.register_blueprint(api)
app.register_blueprint(frontend)
app.register_blueprint(admin)

# replacing the following code that I had
#from flask_admin import Admin
#from flask_admin.contrib.sqla import ModelView
#from common.models import *

#admin = Admin(app, name='microblog', template_mode='adminlte')
#admin.add_view(ModelView(MyModel, db.session))


*.VMG to *.DOC Decoder

I have written a decoder for *.vmg to *.doc.

I think this will help you guys to decode *.VMG to *.DOC file since it has been reported that ABC Amber Nokia Converter sometimes fails to convert this type files sometimes and i have got no other software to do this type of task. See if it helps or not!

Caution i suspect 90% it may not work, but rest of the time IT WILL WORK FINE!

VMG To DOC Decoder .jar