RTB4FREE Service Integrations with Open Source Software

Last updated: April 18, 2017

Overview

The RTB4FREE architecture consists of distributed services that perform discrete tasks which can be scaled horizontally. We call this approach a "micro-services demand side platform (DSP)", and is shown below. The RTB4FREE component's integration with other open source projects are detailed in this section.

Kafka

In order to bidder logs to be published to Kafka, the bidder configuration file must point to the kafka. The Campaigns/payday.json file must container the following directives.

"zeromq" : {
	"bidchannel" : "kafka://[$BROKERLIST]&topic=bids",
	"winchannel" : "kafka://[$BROKERLIST]&topic=wins",
	"requests" : "kafka://[$BROKERLIST]&topic=requests",
	"clicks" : "kafka://[$BROKERLIST]&topic=clicks",
	"pixels" : "kafka://[$BROKERLIST]&topic=pixels",
	"videoevents": "kafka://[$BROKERLIST]&topic=videoevents",
	"postbackevents": "kafka://[$BROKERLIST]&topic=postbackevents",
	"status" : "kafka://[$BROKERLIST]&topic=status",
	"reasons" : "kafka://[$BROKERLIST]&topic=reasons",
	  
	"commands": "tcp://$PUBSUB:6001&commands",
	"responses": "tcp://$PUBSUB:6000&responses",
	"xfrport": "6002",  
	"requeststrategy" : "$REQUESTSTRATEGY"
 },

Note the topic names that are used if you want to tap into the channel. Also note $PUBSUB, $BROKERLIST, and $REQUESTSTRATEGY. These are environment variables used to override the standard configuration file entries. You can use your own set of environment variables alongside with a set of "built-in" enviornment variables we defined: like $PUBSUB, $BROKERLIST, and $REQUESTSTRATEGY. You may also choose to hardcode these values in the file. The $BROKERLIST is a comma separated list of Kafka nodes; for example, "kafka1:9092,kafka2:9092,kafka3:9092" is a 3 node cluster.

The RTB4FREE platform's logging consumer that inserts these logs into the ELK stack is Logstash, as described in the ELK section of this document.

Elasticsearch/Logstash/Kibana

Bidder logs are ingested into Elasticsearch via Logstash. The Logstash configuration file defines the Kafka cluster as an input, as show below. Logstash also performs some data transformations from Kafka message json into appropriate Elasticsearch mapped fields so Kibana visualizations can be created.


input {
      kafka {
        codec => "json"
        topics => ["status","requests","reasons","bids","wins","clicks","pixels","video","postbackevents","logs"]
        bootstrap_servers => "kafka:9092"
        group_id => "logstash1"
        consumer_threads => 1
        decorate_events => true
    }
}
filter {
 # Set the Elasticsearch index name to the kafka topic
  mutate {
      add_field => { "topic" => "%{[@metadata][kafka][topic]}" }
      add_field => { "index" => "%{topic}" }
      add_field => { "consumer_group" => "%{[@metadata][kafka][consumer_group]}" }
  }
  # Change the type field for these topics.
  if [topic]=="pixels" or [topic]=="clicks" or [topic]=="logs" {
      mutate {
          remove_field => [ "type" ]
     }
      mutate {
         add_field => { "type" => "%{topic}" }
      }
  }
  else {
     mutate {
        update => { "type" => "%{topic}" }
     }
  }

  if [type] == "requests" {
      date {
          match => ["[ext][timestamp]","UNIX_MS"]
      }
      mutate {
	add_field => { document_id => "%{id}" }
      }
      if [device][geo][lat] and [device][geo][lon] {
        mutate {
          add_field => {
              "[device][geo][location]" => "%{[device][geo][lat]},%{[device][geo][lon]}"
          }
        }
      }
  }
  else if [type] == "bids" {
      if !("" in [oidStr]) {
            drop { }
      }
      mutate {
           update => { "index" => "bids" }
	   add_field => { document_id => "%{oidStr}" }
      }
      date {
       #     match => ["utc","UNIX_MS"]
            match => ["timestamp","UNIX_MS"]
      }
      if [cost] {
        mutate {
          convert => {
              "cost" => "float"
          }
        }
      }
      if [lat] and [lon] {
        mutate {
          add_field => {
              "location" => "%{lat},%{lon}"
          }
        }
      }
  }
  else if [type] == "wins" {
      if !("" in [hash]) {
            drop { }
      }
      mutate {
           update => { "index" => "wins" }
	   add_field => { document_id => "%{hash}" }
      }
      date {
        #    match => ["utc","UNIX_MS"]
            match => ["timestamp","UNIX_MS"]
      }
      if [price] {
        mutate {
          convert => {
              "price" => "float"
          }
        }
      }
      if [lat] and [lon] {
        mutate {
          add_field => {
              "location" => "%{lat},%{lon}"
          }
        }
      }
  }
  else if [type] == "pixels" or [type] == "clicks" {
      if !("" in [bid_id]) {
            drop { }
      }
      mutate {
           update => { "index" => "%{type}" }
	   add_field => { document_id => "%{bid_id}" }
      }
      date {
        #    match => ["time","UNIX_MS"]
            match => ["timestamp","UNIX_MS"]
      }
      if [payload] =~ /^\/pixel/ {
          mutate {
            replace => { "type" => "pixels" }
          }
      } else if [payload] =~ /^\/redirect/ {
          mutate {
            replace => { "type" => "clicks" }
          }
      }
      grok {
            match => { "payload" => "/site_domain=(?[\w\.\-]+)/?"}
       }
      grok {
            match => { "payload" => "/ip=(?[\d\.]+)/?"}
       }
      grok {
            match => { "payload" => "/app_name=(?.+)"}
       }
      if [price] {
        mutate {
          convert => {
              "price" => "float"
          }
        }
      }
      if [lat] and [lon] {
        mutate {
          add_field => {
              "location" => "%{lat},%{lon}"
          }
        }
      }
  }
  else if [type] == "status" {
      mutate {
           update => { "type" => "stats" }
           update => { "index" => "stats" }
      }
      date {
        match => ["timestamp","UNIX_MS"]
      }
  }
  else if [type] == "reasons" {
      date {
          match => ["[timestamp]","UNIX_MS"]
      }
      mutate {
          rename => {
              "campaign" => "ad_id"
          }
          rename => {
              "creative" => "creative_id"
          }
      }
  }
  else if [type] == "logs" {
  	  # Change logs to rtblogs
      mutate {
           update => { "type" => "rtblogs" }
           update => { "index" => "rtblogs" }
      }
  }
  else if [type] == "fraud" {
      date {
          match => ["[timestamp]","UNIX_MS"]
      }
  }
}
output {
# Uncomment for debug
#    stdout { codec => rubydebug }
# Set the document id if defined.
# 
    if ("" in [document_id]) {
         elasticsearch {
            hosts => ["elastic1"]
            index => "%{index}-%{+YYYY.MM.dd}"
            document_id => "%{document_id}"
         }
    } else {
         elasticsearch {
            hosts => ["elastic1"]
            index => "%{index}-%{+YYYY.MM.dd}"
         }
    }
}

This link contains a list of pre-defined Kibana Visualizations and Dashboards that show Bidder activity and system performence.
TBD

Campaign Manager to ELK

The campaign manager dashboard extracts it's data from Elasticsearch. Define the Elasticsearch host:port in the Rails initializer file config/initializer/1_rtb4free.rb by modifying the following directive.


env_replace("ELASTICSEARCH_HOST",'elastic1:9200')
env_replace("ELASTICSEARCH_HOSTS", '{"US" => "elastic1:9200"}', "hash")

In this example, the Elasticsearch host is using host name elastic1, and is in region "US". You can define multiple regions mapping to different regions as a Ruby hash here.

Secor Cloud Logging

If you do not wish to use the ELK stack for analytics, or you wish to add other data analytic tools, you may need to store raw bidder logs in your own data store. Cloud storage, such as Amazon S3 or Google Compute Cloud, is a good choice since capacity is virtually unlimitied and it is cheaper than disk. You can use the Secor open source project to do this (https://github.com/pinterest/secor). You can then download the files as needed for offline processing. Secor templates are provided for Hive/Hadoop processing.

Golang Consumer

Kafka consumers can be used to process log data in real time for features such as customer dashboards, budget controls, dynamic campaign changes, feeding other dat stores, performing custom data aggregation, etc. A sample consumer in Golang which reads the Kafka topics is shown below. This can be used as a base to perform custom analytics.

https://github.com/jacamars

HAProxy

When deplying multiple bidders, a load balancer is used to distribute the traffic from the SSP exchanges. Depending on you environment, you can use a cloud service, such as AWS/ELB, our your own load balancer such as NGINX or HAPROXY. The RTB4FREE preferred option is HAPROXY in a Docker continer. The HAPROXY configuration file used is shown below.



#
# HAPROXY using on 4 processor cores
# Monitor stats using prometheus for each cpu
#
global
    maxconn 15000
    nbproc 4
    cpu-map 1 0
    cpu-map 2 1
    cpu-map 3 2
    cpu-map 4 3

    pidfile /var/run/haproxy.pid
    tune.ssl.default-dh-param 2048

    # disable sslv3, prefer modern ciphers
    ssl-default-bind-options no-sslv3
    ssl-default-bind-ciphers ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:RSA+AESGCM:RSA+AES:!aNULL:!MD5:!DSS

    ssl-default-server-options no-sslv3
    ssl-default-server-ciphers ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:RSA+AESGCM:RSA+AES:!aNULL:!MD5:!DSS

resolvers docker
    nameserver dns 127.0.0.11:53

defaults
    mode    http
    balance roundrobin
    option  dontlognull
    option  dontlog-normal
    option http-keep-alive
    option  forwardfor
    option  redispatch

    timeout connect 30s
    timeout client  30s
    timeout server  30s
    timeout queue   30s
    timeout http-request 5s
    timeout http-keep-alive 15s

listen stats1
    bind-process 1
    bind :9001
    mode http
    stats enable
    stats hide-version
    stats realm Haproxy
    stats uri /haproxy_stats
    stats auth admin:change_me

listen stats2
    bind-process 2
    bind :9002
    mode http
    stats enable
    stats hide-version
    stats realm Haproxy
    stats uri /haproxy_stats
    stats auth admin:change_me

listen stats3
    bind-process 3
    bind :9003
    mode http
    stats enable
    stats hide-version
    stats realm Haproxy
    stats uri /haproxy_stats
    stats auth admin:change_me

listen stats4
    bind-process 4
    bind :9004
    mode http
    stats enable
    stats hide-version
    stats realm Haproxy
    stats uri /haproxy_stats
    stats auth admin:change_me


frontend services
    bind *:80
    bind *:443 ssl crt /usr/local/etc/haproxy/bundle.pem
    mode http
    acl url_rtb_bidder18080_0 path_beg /
    use_backend rtb_bidder-be8080_0 if url_rtb_bidder18080_0

backend rtb_bidder-be8080_0
    mode http
    balance leastconn
    http-request add-header X-Forwarded-Proto https if { ssl_fc }
    server rtb_bidder1 rtb_bidder1:8080 maxconn 5000 weight 20 check
    server rtb_bidder2 rtb_bidder2:8080 maxconn 5000 weight 20 check