Adds data processing example

pull/97/head
Robert Leenders 10 years ago
parent 19d68d096f
commit 0f19bbf25f

@ -0,0 +1,278 @@
# Data processing Example
Welcome to the data processing example. It shows you how to deploy a data processing application using Deployment Manager.
## Prerequisites
### Deployment Manager
First, make sure DM is installed in your Kubernetes cluster by following the instructions in the top level
[README.md](../../README.md).
### Apache Storm
We assume you have Apache Storm installed.
### Google Cloud Resources
The data processing application will make use of several persistent disks and a BigQuery table, which we will host on Google Cloud. To create these resources we will create a deployment using Google Cloud Deployment Manager:
```
gcloud deployment-manager deployments create data-processing-resources --config data-processing-resources.yaml
```
where `data-processing-resources.yaml` looks as follows:
```
imports:
- path: data-processing-resources.jinja
resources:
- name: data-processing-resources
type: data-processing-resources.jinja
properties:
project: <YOUR PROJECT>
db-fields:
- name: param1
type: STRING
mode: REQUIRED
- name: param2
type: STRING
mode: NULLABLE
- name: param3
type: STRING
mode: REQUIRED
```
and `data-processing-resources.jinja` looks as follows:
```
{% set PROJECT = properties['project'] or 'dm-k8s-testing' %}
{% set DATASET = properties['dataset'] or 'messages_dataset' %}
{% set TABLE = properties['table'] or 'messages_dataset_table' %}
{% set DB_FIELDS = properties['db-fields'] %}
{% set RABBITMQ = properties['rabbitmq'] or {} %}
{% set RABBITMQ_DISK = RABBITMQ['disk'] or {} %}
{% set RABBITMQ_DISK_NAME = RABBITMQ_DISK['disk'] or 'rabbitmq-disk' %}
{% set RABBITMQ_DISK_SIZE = RABBITMQ_DISK['size'] or 200 %}
resources:
- name: {{ RABBITMQ_DISK_NAME }}
type: compute.v1.disk
properties:
zone: us-central1-b
sizeGb: {{ RABBITMQ_DISK_SIZE }}
- name: {{ DATASET }}
type: bigquery.v2.dataset
properties:
datasetReference:
projectId: {{ PROJECT }}
datasetId: {{ DATASET }}
- name: {{ TABLE }}
type: bigquery.v2.table
properties:
# Need to use a reference below so that the dataset gets created before the table
datasetId: $(ref.{{ DATASET }}.datasetReference.datasetId)
tableReference:
projectId: {{ PROJECT }}
datasetId: {{ DATASET }}
tableId: {{ TABLE }}
schema:
fields: {{ DB_FIELDS }}
```
### Storm Topology
In `example/data-processing/storm-topology/src/jvm/storm/dataprocessing/DataProcessingTopology.java` you need to change the `PROJECT_ID` on line 78. You probably also need to change the IP address so that it points to your RabbitMQ service.
## Understanding the data processing example template
Let's take a closer look at the template used by the data processing example. The data processing application consists of 3 services: a http-frontend service, a RabbitMQ service (which can be running in Kubernetes or on a server somewhere else), and a Storm service which itself contains three microservices: a Zookeeper service, a Storm master service, and Storm workers. The architecture looks as follows:
![Architecture](architecture.png)
### Variables
The template contains the following variables:
```
{% set PROJECT = properties['project'] %}
{% set RABBITMQ = properties['rabbitmq'] or {} %}
{% set RABBITMQ_USERNAME = RABBITMQ['username'] %}
{% set RABBITMQ_PASSWORD = RABBITMQ['password'] %}
```
### Http-frontend service
The http-frontend service is a replicated service with 2 replicas:
```
- name: http-frontend
type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py
properties:
service_port: 80
container_port: 80
replicas: 2
external_service: true
image: gcr.io/dm-k8s-testing/http-frontend:latest
env:
- name: RABBITMQ_SERVER
value: rabbitmq-service
- name: RABBITMQ_USERNAME
value: {{ RABBITMQ_USERNAME }}
- name: RABBITMQ_PASSWORD
value: {{ RABBITMQ_PASSWORD }}
```
The http-frontend image contains a simple Go service that accepts GET requests at `/send_data/` with the parameters `param1`, `param2`, `param3`. It then stores these messages in a RabbitMQ queue.
### RabbitMQ service
The RabbitMQ service either contains endpoints which point to an external RabbitMQ server or it points towards pods that are running inside Kubernetes.
```
{% set RABBITMQ = properties or {} %}
{% set RABBITMQ_DISK = RABBITMQ['disk'] or 'rabbitmq-disk' %}
{% set RABBITMQ_FSTYPE = RABBITMQ['fstype'] or 'ext4' %}
{% set RABBITMQ_IPS = RABBITMQ['ips'] %}
{% set RABBITMQ_USERNAME = RABBITMQ['username'] %}
{% set RABBITMQ_PASSWORD = RABBITMQ['password'] %}
resources:
{% if RABBITMQ_IPS %}
- name: rabbitmq-service
type: Service
properties:
kind: Service
apiVersion: v1
metadata:
name: rabbitmq-service
spec:
ports:
- protocol: TCP
port: 5672
targetPort: 5672
- name: rabbitmq-endpoints
type: Endpoints
properties:
kind: Endpoints
apiVersion: v1
metadata:
name: rabbitmq-service
subsets:
- addresses:
{% for IP in RABBITMQ_IPS %}
- IP: {{ IP }}
{% endfor %}
ports:
- port: 5672
{% else %}
- name: rabbitmq
type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py
properties:
service_port: 5672
container_port: 5672
replicas: 1
image: gcr.io/dm-k8s-testing/rabbitmq:latest
env:
- name: RABBITMQ_DEFAULT_USER
value: {{ RABBITMQ_USERNAME }}
- name: RABBITMQ_DEFAULT_PASS
value: {{ RABBITMQ_PASSWORD }}
volumes:
- mount_path: /var/lib/rabbitmq/
gcePersistentDisk:
pdName: {{ RABBITMQ_DISK }}
fsType: {{ RABBITMQ_FSTYPE }}
{% endif %}
```
### Storm service
#### Zookeeper service
The Zookeeper service consists of a single replicated pod:
```
- name: zookeeper
type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py
properties:
service_name: zookeeper
image: mattf/zookeeper
service_port: 2181
container_port: 2181
replicas: 1
```
#### Storm nimbus/master service
The Storm nimbus/master service consits of a single replicated pod:
```
- name: storm-nimbus
type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py
properties:
service_name: nimbus
service_port: 6627
container_port: 6627
replicas: 1
external_service: true
image: gcr.io/dm-k8s-testing/storm-nimbus:latest
```
#### Storm workers
The Storm worker pods are created by a single replication controller. Because of the way Storm defines its own cluster we do not need a service for the Storm workers. It looks as follows:
```
- name: storm-workers
type: ReplicationController
properties:
kind: ReplicationController
apiVersion: v1
metadata:
name: storm-worker-rc
labels:
name: storm-worker
spec:
replicas: 2
selector:
name: storm-worker
template:
metadata:
labels:
name: storm-worker
uses: nimbus
spec:
containers:
- name: storm-worker
image: gcr.io/{{ PROJECT }}/storm-worker:latest
ports:
- hostPort: 6700
containerPort: 6700
- hostPort: 6701
containerPort: 6701
- hostPort: 6702
containerPort: 6702
- hostPort: 6703
containerPort: 6703
resources:
limits:
cpu: 200m
```
#### Storm images
We use three Storm images, a common base image, a master/nimbus image, and a worker image. These images are similar to the images used in the [Kubernetes Storm example](https://github.com/kubernetes/kubernetes/tree/master/examples/storm). To allow Storm workers to connect to BigQuery we need to provide authentication, you can do this within GCE or you can supply a Google Application Credential file. We use the latter method, where we add a json file to the worker image. To run this example you need to generate a JSON key and save it in `image/storm-worker/google_application_credentials.json`. More information on how to generate a JSON key can be found [here](https://developers.google.com/identity/protocols/application-default-credentials#howtheywork).
## Deploying the data processing application
We can now deploy the data processing application using:
```
dm deploy examples/data-processing/data-processing.yaml
```
where `data-processing.yaml` looks as follows:
```
imports:
- path: data-processing.jinja
resources:
- name: data-processing
type: data-processing.jinja
properties:
project: <YOUR PROJECT>
rabbitmq:
# Replace with your information
username: rabbit
password: rabbit-password
# ips:
# - <YOUR IP>
```
Once the application has been deployed we need to launch a Storm topology on it. First acquire the IP address of the Storm master: `kubectl describe service nimbus | grep LoadBalancer`, and then either add this IP address to your local Storm configuration (ie. Add the `storm.nimbus` property to `~/.storm/storm.yaml` with the IP address as value) or add the config property on every command (ie. `-c nimbus.host=<IP>`). We can then launch the Storm topology using `storm jar target/data-processing-0.9.5-jar-with-dependencies.jar storm.dataprocessing.DataProcessingTopology data-processing-topology`.

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

@ -0,0 +1,32 @@
{% set PROJECT = properties['project'] or 'dm-k8s-testing' %}
{% set DATASET = properties['dataset'] or 'messages_dataset' %}
{% set TABLE = properties['table'] or 'messages_dataset_table' %}
{% set DB_FIELDS = properties['db-fields'] %}
{% set RABBITMQ = properties['rabbitmq'] or {} %}
{% set RABBITMQ_DISK = RABBITMQ['disk'] or {} %}
{% set RABBITMQ_DISK_NAME = RABBITMQ_DISK['disk'] or 'rabbitmq-disk' %}
{% set RABBITMQ_DISK_SIZE = RABBITMQ_DISK['size'] or 200 %}
resources:
- name: {{ RABBITMQ_DISK_NAME }}
type: compute.v1.disk
properties:
zone: us-central1-b
sizeGb: {{ RABBITMQ_DISK_SIZE }}
- name: {{ DATASET }}
type: bigquery.v2.dataset
properties:
datasetReference:
projectId: {{ PROJECT }}
datasetId: {{ DATASET }}
- name: {{ TABLE }}
type: bigquery.v2.table
properties:
# Need to use a reference below so that the dataset gets created before the table
datasetId: $(ref.{{ DATASET }}.datasetReference.datasetId)
tableReference:
projectId: {{ PROJECT }}
datasetId: {{ DATASET }}
tableId: {{ TABLE }}
schema:
fields: {{ DB_FIELDS }}

@ -0,0 +1,18 @@
imports:
- path: data-processing-resources.jinja
resources:
- name: data-processing-resources
type: data-processing-resources.jinja
properties:
project: dm-k8s-testing
db-fields:
- name: param1
type: STRING
mode: REQUIRED
- name: param2
type: STRING
mode: NULLABLE
- name: param3
type: STRING
mode: REQUIRED

@ -0,0 +1,31 @@
{% set PROJECT = properties['project'] %}
{% set RABBITMQ = properties['rabbitmq'] or {} %}
{% set RABBITMQ_USERNAME = RABBITMQ['username'] %}
{% set RABBITMQ_PASSWORD = RABBITMQ['password'] %}
resources:
- name: rabbitmq
type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/rabbitmq/v1/rabbitmq.jinja
properties:
{{ RABBITMQ }}
- name: http-frontend
type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py
properties:
service_port: 80
container_port: 80
replicas: 2
external_service: true
image: gcr.io/dm-k8s-testing/http-frontend:latest
env:
- name: RABBITMQ_SERVER
value: rabbitmq-service
- name: RABBITMQ_USERNAME
value: {{ RABBITMQ_USERNAME }}
- name: RABBITMQ_PASSWORD
value: {{ RABBITMQ_PASSWORD }}
- name: storm
type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/storm/v1/storm.jinja
properties:
{% if PROJECT is defined %}
project: {{ PROJECT }}
{% endif %}

@ -0,0 +1,31 @@
info:
title: Data processing system
description: Defines a system that takes messages as input, stores them into RabbitMQ clusters, processes them using Apache Storm, and then stores them into BigQuery.
properties:
rabbitmq:
type: object
required:
- username
- password
properties:
disk:
type: string
default: rabbitmq-disk
description: The name of the persistent disk the RabbitMQ service uses.
fstype:
type: string
default: ext4
description: The filesystem the disk of the RabbitMQ service uses.
ips:
type: array
description: If any IP addresses are given we assume RabbitMQ runs on an external server (say GCE) and is accessible by these IP addresses.
items:
type: string
uniqueItems: true
username:
type: string
description: The username for the RabbitMQ service.
password:
type: string
description: The password for the RabbitMQ service.

@ -0,0 +1,14 @@
imports:
- path: data-processing.jinja
resources:
- name: data-processing
type: data-processing.jinja
properties:
project: dm-k8s-testing
rabbitmq:
# Replace with your information
username: rabbit
password: rabbit-password
# ips:
# - <YOUR IP>

@ -0,0 +1,23 @@
.PHONY: all build push clean
DOCKER_REGISTRY = gcr.io
PREFIX = $(DOCKER_REGISTRY)/$(PROJECT)
IMAGE = http-frontend
TAG = latest
DIR = .
all: build
build:
docker build -t $(PREFIX)/$(IMAGE):$(TAG) $(DIR)
push: build
ifeq ($(DOCKER_REGISTRY),gcr.io)
gcloud docker push $(PREFIX)/$(IMAGE):$(TAG)
else
docker push $(PREFIX)/$(IMAGE):$(TAG)
endif
clean:
docker rmi $(PREFIX)/$(IMAGE):$(TAG)

@ -0,0 +1,58 @@
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"github.com/codegangsta/negroni"
"github.com/gorilla/mux"
"github.com/streadway/amqp"
)
var channel *amqp.Channel
var queue amqp.Queue
func SendMessageHandler(rw http.ResponseWriter, req *http.Request) {
params := []string{"param1", "param2", "param3"}
data := map[string]string{}
get_variables := req.URL.Query()
for _, param := range params {
data[param] = get_variables.Get(param)
}
json_data, err := json.Marshal(data)
HandleError(err, "Error while converting query data to JSON")
err = channel.Publish("", queue.Name, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent, ContentType: "application/json", Body: []byte(json_data)})
HandleError(err, "Failed to publish a message")
fmt.Fprintln(rw, "Success!")
}
func HandleError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
broker := os.Getenv("RABBITMQ_SERVER")
username := os.Getenv("RABBITMQ_USERNAME")
password := os.Getenv("RABBITMQ_PASSWORD")
conn, err := amqp.Dial("amqp://" + username + ":" + password + "@" + broker + ":5672/")
HandleError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
channel, err = conn.Channel()
HandleError(err, "Failed to open a channel")
defer channel.Close()
queue, err = channel.QueueDeclare("messages", true, false, false, false, nil)
HandleError(err, "Failed to declare a queue")
r := mux.NewRouter()
r.Path("/send_data/").Methods("GET").HandlerFunc(SendMessageHandler)
n := negroni.Classic()
n.UseHandler(r)
n.Run(":80")
}

@ -0,0 +1,11 @@
FROM gcr.io/dm-k8s-testing/storm-base
ADD start.sh /
EXPOSE 6700 6701 6702 6703
WORKDIR /opt/apache-storm
ADD google_application_credentials.json ./
ENV GOOGLE_APPLICATION_CREDENTIALS /opt/apache-storm/google_application_credentials.json
ENTRYPOINT ["/start.sh"]

@ -0,0 +1,23 @@
.PHONY: all build push clean
DOCKER_REGISTRY = gcr.io
PREFIX = $(DOCKER_REGISTRY)/$(PROJECT)
IMAGE = storm-worker
TAG = latest
DIR = .
all: build
build:
docker build -t $(PREFIX)/$(IMAGE):$(TAG) $(DIR)
push: build
ifeq ($(DOCKER_REGISTRY),gcr.io)
gcloud docker push $(PREFIX)/$(IMAGE):$(TAG)
else
docker push $(PREFIX)/$(IMAGE):$(TAG)
endif
clean:
docker rmi $(PREFIX)/$(IMAGE):$(TAG)

@ -0,0 +1,5 @@
#!/bin/sh
/configure.sh ${ZOOKEEPER_SERVICE_HOST:-$1} ${NIMBUS_SERVICE_HOST:-$2}
exec bin/storm supervisor

@ -0,0 +1,136 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
<version>0.9.5</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.google.cloud</groupId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
<!-- keep storm out of the jar-with-dependencies -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.6</version>
</dependency>
<dependency>
<groupId>io.latent</groupId>
<artifactId>storm-rabbitmq</artifactId>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-bigquery</artifactId>
<version>v2-rev158-1.19.0</version>
</dependency>
<dependency>
<groupId>com.google.oauth-client</groupId>
<artifactId>google-oauth-client</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client-jackson2</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>com.google.oauth-client</groupId>
<artifactId>google-oauth-client-jetty</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/jvm</sourceDirectory>
<testSourceDirectory>test/jvm</testSourceDirectory>
<plugins>
<!--
Bind the maven-assembly-plugin to the package phase
this will create a jar file without the storm dependencies
suitable for deployment to a cluster.
-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass />
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>${storm.topology}</mainClass>
</configuration>
</plugin>
</plugins>
</build>
<artifactId>data-processing</artifactId>
<name>data-processing</name>
</project>

@ -0,0 +1,237 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.dataprocessing;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.Scheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import io.latent.storm.rabbitmq.RabbitMQSpout;
import io.latent.storm.rabbitmq.config.ConnectionConfig;
import io.latent.storm.rabbitmq.config.ConsumerConfig;
import io.latent.storm.rabbitmq.config.ConsumerConfigBuilder;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.gson.stream.JsonReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.Collection;
import java.io.StringReader;
import java.io.IOException;
/**
* This is a basic example of a Storm topology.
*/
public class DataProcessingTopology {
private static final String RABBIT_USERNAME = "rabbit";
private static final String RABBIT_PASSWORD = "rabbit-password";
private static final String RABBIT_HOST = "rabbitmq-service";
private static final int RABBIT_PORT = 5672;
// Replace with your project id
private static final String PROJECT_ID = "dm-k8s-testing";
private static final String DATASET_ID = "messages_dataset";
private static final String TABLE_ID = "messages_dataset_table";
private static Bigquery bigquery;
/**
* This class creates our Service to connect to Bigquery including auth.
*/
public final static class BigqueryServiceFactory {
/**
* Private constructor to disable creation of this utility Factory class.
*/
private BigqueryServiceFactory() {
}
/**
* Singleton service used through the app.
*/
private static Bigquery service = null;
/**
* Mutex created to create the singleton in thread-safe fashion.
*/
private static Object serviceLock = new Object();
/**
* Threadsafe Factory that provides an authorized Bigquery service.
* @return The Bigquery service
* @throws IOException Thronw if there is an error connecting to Bigquery.
*/
public static Bigquery getService() throws IOException {
if (service == null) {
synchronized (serviceLock) {
if (service == null) {
service = createAuthorizedClient();
}
}
}
return service;
}
/**
* Creates an authorized client to Google Bigquery.
*
* @return The BigQuery Service
* @throws IOException Thrown if there is an error connecting
*/
// [START get_service]
private static Bigquery createAuthorizedClient() throws IOException {
// Create the credential
HttpTransport transport = new NetHttpTransport();
JsonFactory jsonFactory = new JacksonFactory();
GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);
// Depending on the environment that provides the default credentials (e.g. Compute Engine, App
// Engine), the credentials may require us to specify the scopes we need explicitly.
// Check for this case, and inject the Bigquery scope if required.
if (credential.createScopedRequired()) {
Collection<String> bigqueryScopes = BigqueryScopes.all();
credential = credential.createScoped(bigqueryScopes);
}
return new Bigquery.Builder(transport, jsonFactory, credential)
.setApplicationName("Data processing storm").build();
}
// [END get_service]
}
public static void submit_to_bigquery(JsonReader rows) throws IOException {
final Gson gson = new Gson();
Map<String, Object> rowData = gson.<Map<String, Object>>fromJson(rows, (new HashMap<String, Object>()).getClass());
final TableDataInsertAllRequest.Rows row = new TableDataInsertAllRequest.Rows().setJson(rowData);
bigquery.tabledata().insertAll(PROJECT_ID, DATASET_ID, TABLE_ID, new TableDataInsertAllRequest().setRows(Collections.singletonList(row))).execute();
}
public static class StoreToBigQueryBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
try {
bigquery = BigqueryServiceFactory.getService();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void execute(Tuple tuple) {
String json = tuple.getString(0);
// Do processing
// Send it to bigquery
try {
submit_to_bigquery(new JsonReader(new StringReader(json)));
} catch (IOException e) {
e.printStackTrace();
}
// Pass it on
// _collector.emit(tuple, new Values(json));
// Acknowledge it
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static class MessageScheme implements Scheme {
@Override
public List<Object> deserialize(byte[] ser) {
List<Object> objects = new ArrayList<Object>();
objects.add(new String(ser));
return objects;
}
@Override
public Fields getOutputFields() {
Fields fields = new Fields("json_string");
return fields;
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
Scheme scheme = new MessageScheme();
IRichSpout spout = new RabbitMQSpout(scheme);
ConnectionConfig connectionConfig = new ConnectionConfig(RABBIT_HOST, RABBIT_PORT, RABBIT_USERNAME, RABBIT_PASSWORD, ConnectionFactory.DEFAULT_VHOST, 580); // host, port, username, password, virtualHost, heartBeat
ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)
.queue("messages")
.prefetch(200)
.requeueOnFail()
.build();
builder.setSpout("rabbitmq", spout, 100)
.setNumTasks(100)
.addConfigurations(spoutConfig.asMap())
.setMaxSpoutPending(200);
builder.setBolt("process-message", new StoreToBigQueryBolt(), 100).shuffleGrouping("rabbitmq").setNumTasks(100);
Config conf = new Config();
if (args != null && args.length > 0) {
conf.setDebug(false);
conf.setNumWorkers(14);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}

@ -0,0 +1,3 @@
FROM rabbitmq
COPY docker-entrypoint.sh /

@ -0,0 +1,23 @@
.PHONY: all build push clean
DOCKER_REGISTRY = gcr.io
PREFIX = $(DOCKER_REGISTRY)/$(PROJECT)
IMAGE = rabbitmq
TAG = latest
DIR = .
all: build
build:
docker build -t $(PREFIX)/$(IMAGE):$(TAG) $(DIR)
push: build
ifeq ($(DOCKER_REGISTRY),gcr.io)
gcloud docker push $(PREFIX)/$(IMAGE):$(TAG)
else
docker push $(PREFIX)/$(IMAGE):$(TAG)
endif
clean:
docker rmi $(PREFIX)/$(IMAGE):$(TAG)

@ -0,0 +1,87 @@
#!/bin/bash
set -e
if [ "$RABBITMQ_ERLANG_COOKIE" ]; then
cookieFile='/var/lib/rabbitmq/.erlang.cookie'
if [ -e "$cookieFile" ]; then
if [ "$(cat "$cookieFile" 2>/dev/null)" != "$RABBITMQ_ERLANG_COOKIE" ]; then
echo >&2
echo >&2 "warning: $cookieFile contents do not match RABBITMQ_ERLANG_COOKIE"
echo >&2
fi
else
echo "$RABBITMQ_ERLANG_COOKIE" > "$cookieFile"
chmod 600 "$cookieFile"
chown rabbitmq "$cookieFile"
fi
fi
if [ "$1" = 'rabbitmq-server' ]; then
configs=(
# https://www.rabbitmq.com/configure.html
default_vhost
default_user
default_pass
)
haveConfig=
for conf in "${configs[@]}"; do
var="RABBITMQ_${conf^^}"
val="${!var}"
if [ "$val" ]; then
haveConfig=1
break
fi
done
if [ "$haveConfig" ]; then
cat > /etc/rabbitmq/rabbitmq.config <<-'EOH'
[
{rabbit,
[
EOH
for conf in "${configs[@]}"; do
var="RABBITMQ_${conf^^}"
val="${!var}"
[ "$val" ] || continue
cat >> /etc/rabbitmq/rabbitmq.config <<-EOC
{$conf, <<"$val">>},
EOC
done
cat >> /etc/rabbitmq/rabbitmq.config <<-'EOF'
{loopback_users, []}
]
}
].
EOF
fi
chown -R rabbitmq /var/lib/rabbitmq
set -- gosu rabbitmq "$@"
fi
mkdir -p /var/lib/rabbitmq/mnesia
cd /var/lib/rabbitmq/mnesia
if [ -e rabbit@hostname ];
then
OLDHOSTNAME=$(readlink rabbit@hostname)
# Copy old database over to current hostname
# cp rabbit@hostname.pid rabbit@`hostname`.pid || :
cd rabbit@hostname
# Replace any references to the old hostname to the current one
# Add this behind the above command to remove everything in front of the @
# symbol (including the @) | grep -o '@.*' | cut -f2- -d'@'
find . -type f -exec sed -i "s/$OLDHOSTNAME/rabbit@$(hostname)/g" {} +
cd ..
cp -r "$OLDHOSTNAME" rabbit@`hostname`
cp -r "$OLDHOSTNAME-plugins-expand" rabbit@`hostname`-plugins-expand
rm -fr "$OLDHOSTNAME"
rm -fr "$OLDHOSTNAME-plugins-expand"
fi
# Link current database to the rabbit@hostname files
ln -sfn rabbit@`hostname` rabbit@hostname
# ln -sfn rabbit@`hostname`.pid rabbit@hostname.pid
ln -sfn rabbit@`hostname`-plugins-expand rabbit@hostname-plugins-expand
chown -R rabbitmq:rabbitmq /var/lib/rabbitmq
exec "$@"

@ -0,0 +1,54 @@
{% set RABBITMQ = properties or {} %}
{% set RABBITMQ_DISK = RABBITMQ['disk'] or 'rabbitmq-disk' %}
{% set RABBITMQ_FSTYPE = RABBITMQ['fstype'] or 'ext4' %}
{% set RABBITMQ_IPS = RABBITMQ['ips'] %}
{% set RABBITMQ_USERNAME = RABBITMQ['username'] %}
{% set RABBITMQ_PASSWORD = RABBITMQ['password'] %}
resources:
{% if RABBITMQ_IPS %}
- name: rabbitmq-service
type: Service
properties:
kind: Service
apiVersion: v1
metadata:
name: rabbitmq-service
spec:
ports:
- protocol: TCP
port: 5672
targetPort: 5672
- name: rabbitmq-endpoints
type: Endpoints
properties:
kind: Endpoints
apiVersion: v1
metadata:
name: rabbitmq-service
subsets:
- addresses:
{% for IP in RABBITMQ_IPS %}
- IP: {{ IP }}
{% endfor %}
ports:
- port: 5672
{% else %}
- name: rabbitmq
type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py
properties:
service_port: 5672
container_port: 5672
replicas: 1
image: gcr.io/dm-k8s-testing/rabbitmq:latest
env:
- name: RABBITMQ_DEFAULT_USER
value: {{ RABBITMQ_USERNAME }}
- name: RABBITMQ_DEFAULT_PASS
value: {{ RABBITMQ_PASSWORD }}
volumes:
- mount_path: /var/lib/rabbitmq/
gcePersistentDisk:
pdName: {{ RABBITMQ_DISK }}
fsType: {{ RABBITMQ_FSTYPE }}
{% endif %}

@ -0,0 +1,28 @@
info:
title: RabbitMQ
description: Defines a RabbitMQ service.
required:
- username
- password
properties:
disk:
type: string
default: rabbitmq-disk
description: The name of the persistent disk the RabbitMQ service uses.
fstype:
type: string
default: ext4
description: The filesystem the disk of the RabbitMQ service uses.
ips:
type: array
description: If any IP addresses are given we assume RabbitMQ runs on an external server (say GCE) and is accessible by these IP addresses.
items:
type: string
uniqueItems: true
username:
type: string
description: The username for the RabbitMQ service.
password:
type: string
description: The password for the RabbitMQ service.

@ -0,0 +1,8 @@
- name: rabbitmq
type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/rabbitmq/v1/rabbitmq.jinja
properties:
# Replace with your information
username: rabbit
password: rabbit-password
ips:
- 10.240.0.8

@ -0,0 +1,9 @@
FROM ubuntu:latest
RUN apt-get update -y && apt-get install -y openjdk-7-jre tar dnsutils curl
RUN curl -s http://archive.apache.org/dist/storm/apache-storm-0.9.5/apache-storm-0.9.5.tar.gz | \
tar zxf - -C /opt && \
mv /opt/apache-storm-0.9.5 /opt/apache-storm
ADD configure.sh /

@ -0,0 +1,23 @@
.PHONY: all build push clean
DOCKER_REGISTRY = gcr.io
PREFIX = $(DOCKER_REGISTRY)/$(PROJECT)
IMAGE = storm-base
TAG = latest
DIR = .
all: build
build:
» docker build -t $(PREFIX)/$(IMAGE):$(TAG) $(DIR)
push: build
ifeq ($(DOCKER_REGISTRY),gcr.io)
» gcloud docker push $(PREFIX)/$(IMAGE):$(TAG)
else
» docker push $(PREFIX)/$(IMAGE):$(TAG)
endif
clean:
» docker rmi $(PREFIX)/$(IMAGE):$(TAG)

@ -0,0 +1,24 @@
#!/bin/sh
cat >> conf/storm.yaml <<EOF
storm.local.hostname: "$(hostname -I)"
EOF
cat >> conf/storm.yaml <<EOF
storm.local.dir: "/tmp"
EOF
if [ -n "$1" ]; then
cat >> conf/storm.yaml <<EOF
storm.zookeeper.servers:
- "$1"
EOF
fi
if [ -n "$2" ]; then
cat >> conf/storm.yaml <<EOF
nimbus.host: "$2"
EOF
fi
cat conf/storm.yaml

@ -0,0 +1,9 @@
FROM gcr.io/dm-k8s-testing/storm-base
ADD start.sh /
EXPOSE 3772 6627
WORKDIR /opt/apache-storm
ENTRYPOINT ["/start.sh"]

@ -0,0 +1,23 @@
.PHONY: all build push clean
DOCKER_REGISTRY = gcr.io
PREFIX = $(DOCKER_REGISTRY)/$(PROJECT)
IMAGE = storm-nimbus
TAG = latest
DIR = .
all: build
build:
» docker build -t $(PREFIX)/$(IMAGE):$(TAG) $(DIR)
push: build
ifeq ($(DOCKER_REGISTRY),gcr.io)
» gcloud docker push $(PREFIX)/$(IMAGE):$(TAG)
else
» docker push $(PREFIX)/$(IMAGE):$(TAG)
endif
clean:
» docker rmi $(PREFIX)/$(IMAGE):$(TAG)

@ -0,0 +1,5 @@
#!/bin/sh
/configure.sh ${ZOOKEEPER_SERVICE_HOST:-$1}
exec bin/storm nimbus

@ -0,0 +1,11 @@
FROM gcr.io/dm-k8s-testing/storm-base
ADD start.sh /
EXPOSE 6700 6701 6702 6703
WORKDIR /opt/apache-storm
ADD google_application_credentials.json ./
ENV GOOGLE_APPLICATION_CREDENTIALS /opt/apache-storm/google_application_credentials.json
ENTRYPOINT ["/start.sh"]

@ -0,0 +1,23 @@
.PHONY: all build push clean
DOCKER_REGISTRY = gcr.io
PREFIX = $(DOCKER_REGISTRY)/$(PROJECT)
IMAGE = storm-worker
TAG = latest
DIR = .
all: build
build:
» docker build -t $(PREFIX)/$(IMAGE):$(TAG) $(DIR)
push: build
ifeq ($(DOCKER_REGISTRY),gcr.io)
» gcloud docker push $(PREFIX)/$(IMAGE):$(TAG)
else
» docker push $(PREFIX)/$(IMAGE):$(TAG)
endif
clean:
» docker rmi $(PREFIX)/$(IMAGE):$(TAG)

@ -0,0 +1,5 @@
#!/bin/sh
/configure.sh ${ZOOKEEPER_SERVICE_HOST:-$1} ${NIMBUS_SERVICE_HOST:-$2}
exec bin/storm supervisor

@ -0,0 +1,58 @@
{% set PROPERTIES = properties or {} %}
{% set PROJECT = PROPERTIES['project'] or 'dm-k8s-testing' %}
imports:
- path: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py
resources:
- name: zookeeper
type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py
properties:
service_name: zookeeper
image: mattf/zookeeper
service_port: 2181
container_port: 2181
replicas: 1
- name: storm-nimbus
type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py
properties:
service_name: nimbus
service_port: 6627
container_port: 6627
replicas: 1
external_service: true
image: gcr.io/dm-k8s-testing/storm-nimbus:latest
- name: storm-workers
type: ReplicationController
properties:
kind: ReplicationController
apiVersion: v1
metadata:
name: storm-worker-rc
labels:
name: storm-worker
spec:
replicas: 2
selector:
name: storm-worker
template:
metadata:
labels:
name: storm-worker
uses: nimbus
spec:
containers:
- name: storm-worker
image: gcr.io/{{ PROJECT }}/storm-worker:latest
ports:
- hostPort: 6700
containerPort: 6700
- hostPort: 6701
containerPort: 6701
- hostPort: 6702
containerPort: 6702
- hostPort: 6703
containerPort: 6703
resources:
limits:
cpu: 200m

@ -0,0 +1,8 @@
info:
title: Storm cluster
description: Defines a storm cluster, a single replica zookeeper replicatedservice, a single replica storm nimbus replicatedservice, and two replica storm worker replicatedservice.
properties:
project:
type: string
description: Project location to load the images from.

@ -0,0 +1,7 @@
imports:
- path: storm.jinja
resources:
- name: storm
type: storm.jinja
properties: null
Loading…
Cancel
Save