diff --git a/examples/data-processing/README.md b/examples/data-processing/README.md new file mode 100644 index 000000000..f141cef97 --- /dev/null +++ b/examples/data-processing/README.md @@ -0,0 +1,278 @@ +# Data processing Example +Welcome to the data processing example. It shows you how to deploy a data processing application using Deployment Manager. + +## Prerequisites +### Deployment Manager +First, make sure DM is installed in your Kubernetes cluster by following the instructions in the top level +[README.md](../../README.md). + +### Apache Storm +We assume you have Apache Storm installed. + +### Google Cloud Resources +The data processing application will make use of several persistent disks and a BigQuery table, which we will host on Google Cloud. To create these resources we will create a deployment using Google Cloud Deployment Manager: + +``` +gcloud deployment-manager deployments create data-processing-resources --config data-processing-resources.yaml +``` + +where `data-processing-resources.yaml` looks as follows: + +``` +imports: +- path: data-processing-resources.jinja + +resources: +- name: data-processing-resources + type: data-processing-resources.jinja + properties: + project: + db-fields: + - name: param1 + type: STRING + mode: REQUIRED + - name: param2 + type: STRING + mode: NULLABLE + - name: param3 + type: STRING + mode: REQUIRED +``` + +and `data-processing-resources.jinja` looks as follows: + +``` +{% set PROJECT = properties['project'] or 'dm-k8s-testing' %} +{% set DATASET = properties['dataset'] or 'messages_dataset' %} +{% set TABLE = properties['table'] or 'messages_dataset_table' %} +{% set DB_FIELDS = properties['db-fields'] %} +{% set RABBITMQ = properties['rabbitmq'] or {} %} +{% set RABBITMQ_DISK = RABBITMQ['disk'] or {} %} +{% set RABBITMQ_DISK_NAME = RABBITMQ_DISK['disk'] or 'rabbitmq-disk' %} +{% set RABBITMQ_DISK_SIZE = RABBITMQ_DISK['size'] or 200 %} + +resources: +- name: {{ RABBITMQ_DISK_NAME }} + type: compute.v1.disk + properties: + zone: us-central1-b + sizeGb: {{ RABBITMQ_DISK_SIZE }} +- name: {{ DATASET }} + type: bigquery.v2.dataset + properties: + datasetReference: + projectId: {{ PROJECT }} + datasetId: {{ DATASET }} +- name: {{ TABLE }} + type: bigquery.v2.table + properties: + # Need to use a reference below so that the dataset gets created before the table + datasetId: $(ref.{{ DATASET }}.datasetReference.datasetId) + tableReference: + projectId: {{ PROJECT }} + datasetId: {{ DATASET }} + tableId: {{ TABLE }} + schema: + fields: {{ DB_FIELDS }} +``` + +### Storm Topology +In `example/data-processing/storm-topology/src/jvm/storm/dataprocessing/DataProcessingTopology.java` you need to change the `PROJECT_ID` on line 78. You probably also need to change the IP address so that it points to your RabbitMQ service. + +## Understanding the data processing example template +Let's take a closer look at the template used by the data processing example. The data processing application consists of 3 services: a http-frontend service, a RabbitMQ service (which can be running in Kubernetes or on a server somewhere else), and a Storm service which itself contains three microservices: a Zookeeper service, a Storm master service, and Storm workers. The architecture looks as follows: + +![Architecture](architecture.png) + +### Variables +The template contains the following variables: + +``` +{% set PROJECT = properties['project'] %} +{% set RABBITMQ = properties['rabbitmq'] or {} %} +{% set RABBITMQ_USERNAME = RABBITMQ['username'] %} +{% set RABBITMQ_PASSWORD = RABBITMQ['password'] %} +``` + +### Http-frontend service +The http-frontend service is a replicated service with 2 replicas: + +``` +- name: http-frontend + type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py + properties: + service_port: 80 + container_port: 80 + replicas: 2 + external_service: true + image: gcr.io/dm-k8s-testing/http-frontend:latest + env: + - name: RABBITMQ_SERVER + value: rabbitmq-service + - name: RABBITMQ_USERNAME + value: {{ RABBITMQ_USERNAME }} + - name: RABBITMQ_PASSWORD + value: {{ RABBITMQ_PASSWORD }} +``` + +The http-frontend image contains a simple Go service that accepts GET requests at `/send_data/` with the parameters `param1`, `param2`, `param3`. It then stores these messages in a RabbitMQ queue. + +### RabbitMQ service +The RabbitMQ service either contains endpoints which point to an external RabbitMQ server or it points towards pods that are running inside Kubernetes. + +``` +{% set RABBITMQ = properties or {} %} +{% set RABBITMQ_DISK = RABBITMQ['disk'] or 'rabbitmq-disk' %} +{% set RABBITMQ_FSTYPE = RABBITMQ['fstype'] or 'ext4' %} +{% set RABBITMQ_IPS = RABBITMQ['ips'] %} +{% set RABBITMQ_USERNAME = RABBITMQ['username'] %} +{% set RABBITMQ_PASSWORD = RABBITMQ['password'] %} + +resources: +{% if RABBITMQ_IPS %} +- name: rabbitmq-service + type: Service + properties: + kind: Service + apiVersion: v1 + metadata: + name: rabbitmq-service + spec: + ports: + - protocol: TCP + port: 5672 + targetPort: 5672 +- name: rabbitmq-endpoints + type: Endpoints + properties: + kind: Endpoints + apiVersion: v1 + metadata: + name: rabbitmq-service + subsets: + - addresses: + {% for IP in RABBITMQ_IPS %} + - IP: {{ IP }} + {% endfor %} + ports: + - port: 5672 +{% else %} +- name: rabbitmq + type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py + properties: + service_port: 5672 + container_port: 5672 + replicas: 1 + image: gcr.io/dm-k8s-testing/rabbitmq:latest + env: + - name: RABBITMQ_DEFAULT_USER + value: {{ RABBITMQ_USERNAME }} + - name: RABBITMQ_DEFAULT_PASS + value: {{ RABBITMQ_PASSWORD }} + volumes: + - mount_path: /var/lib/rabbitmq/ + gcePersistentDisk: + pdName: {{ RABBITMQ_DISK }} + fsType: {{ RABBITMQ_FSTYPE }} +{% endif %} +``` + +### Storm service +#### Zookeeper service +The Zookeeper service consists of a single replicated pod: + +``` +- name: zookeeper + type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py + properties: + service_name: zookeeper + image: mattf/zookeeper + service_port: 2181 + container_port: 2181 + replicas: 1 +``` + +#### Storm nimbus/master service +The Storm nimbus/master service consits of a single replicated pod: + +``` +- name: storm-nimbus + type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py + properties: + service_name: nimbus + service_port: 6627 + container_port: 6627 + replicas: 1 + external_service: true + image: gcr.io/dm-k8s-testing/storm-nimbus:latest +``` +#### Storm workers +The Storm worker pods are created by a single replication controller. Because of the way Storm defines its own cluster we do not need a service for the Storm workers. It looks as follows: + +``` +- name: storm-workers + type: ReplicationController + properties: + kind: ReplicationController + apiVersion: v1 + metadata: + name: storm-worker-rc + labels: + name: storm-worker + spec: + replicas: 2 + selector: + name: storm-worker + template: + metadata: + labels: + name: storm-worker + uses: nimbus + spec: + containers: + - name: storm-worker + image: gcr.io/{{ PROJECT }}/storm-worker:latest + ports: + - hostPort: 6700 + containerPort: 6700 + - hostPort: 6701 + containerPort: 6701 + - hostPort: 6702 + containerPort: 6702 + - hostPort: 6703 + containerPort: 6703 + resources: + limits: + cpu: 200m +``` + +#### Storm images +We use three Storm images, a common base image, a master/nimbus image, and a worker image. These images are similar to the images used in the [Kubernetes Storm example](https://github.com/kubernetes/kubernetes/tree/master/examples/storm). To allow Storm workers to connect to BigQuery we need to provide authentication, you can do this within GCE or you can supply a Google Application Credential file. We use the latter method, where we add a json file to the worker image. To run this example you need to generate a JSON key and save it in `image/storm-worker/google_application_credentials.json`. More information on how to generate a JSON key can be found [here](https://developers.google.com/identity/protocols/application-default-credentials#howtheywork). + +## Deploying the data processing application +We can now deploy the data processing application using: + +``` +dm deploy examples/data-processing/data-processing.yaml +``` + +where `data-processing.yaml` looks as follows: + +``` +imports: +- path: data-processing.jinja + +resources: +- name: data-processing + type: data-processing.jinja + properties: + project: + rabbitmq: + # Replace with your information + username: rabbit + password: rabbit-password + # ips: + # - +``` + +Once the application has been deployed we need to launch a Storm topology on it. First acquire the IP address of the Storm master: `kubectl describe service nimbus | grep LoadBalancer`, and then either add this IP address to your local Storm configuration (ie. Add the `storm.nimbus` property to `~/.storm/storm.yaml` with the IP address as value) or add the config property on every command (ie. `-c nimbus.host=`). We can then launch the Storm topology using `storm jar target/data-processing-0.9.5-jar-with-dependencies.jar storm.dataprocessing.DataProcessingTopology data-processing-topology`. diff --git a/examples/data-processing/architecture.png b/examples/data-processing/architecture.png new file mode 100644 index 000000000..4334757c5 Binary files /dev/null and b/examples/data-processing/architecture.png differ diff --git a/examples/data-processing/data-processing-resources.jinja b/examples/data-processing/data-processing-resources.jinja new file mode 100644 index 000000000..85f1203f9 --- /dev/null +++ b/examples/data-processing/data-processing-resources.jinja @@ -0,0 +1,32 @@ +{% set PROJECT = properties['project'] or 'dm-k8s-testing' %} +{% set DATASET = properties['dataset'] or 'messages_dataset' %} +{% set TABLE = properties['table'] or 'messages_dataset_table' %} +{% set DB_FIELDS = properties['db-fields'] %} +{% set RABBITMQ = properties['rabbitmq'] or {} %} +{% set RABBITMQ_DISK = RABBITMQ['disk'] or {} %} +{% set RABBITMQ_DISK_NAME = RABBITMQ_DISK['disk'] or 'rabbitmq-disk' %} +{% set RABBITMQ_DISK_SIZE = RABBITMQ_DISK['size'] or 200 %} + +resources: +- name: {{ RABBITMQ_DISK_NAME }} + type: compute.v1.disk + properties: + zone: us-central1-b + sizeGb: {{ RABBITMQ_DISK_SIZE }} +- name: {{ DATASET }} + type: bigquery.v2.dataset + properties: + datasetReference: + projectId: {{ PROJECT }} + datasetId: {{ DATASET }} +- name: {{ TABLE }} + type: bigquery.v2.table + properties: + # Need to use a reference below so that the dataset gets created before the table + datasetId: $(ref.{{ DATASET }}.datasetReference.datasetId) + tableReference: + projectId: {{ PROJECT }} + datasetId: {{ DATASET }} + tableId: {{ TABLE }} + schema: + fields: {{ DB_FIELDS }} diff --git a/examples/data-processing/data-processing-resources.yaml b/examples/data-processing/data-processing-resources.yaml new file mode 100644 index 000000000..af09b9836 --- /dev/null +++ b/examples/data-processing/data-processing-resources.yaml @@ -0,0 +1,18 @@ +imports: +- path: data-processing-resources.jinja + +resources: +- name: data-processing-resources + type: data-processing-resources.jinja + properties: + project: dm-k8s-testing + db-fields: + - name: param1 + type: STRING + mode: REQUIRED + - name: param2 + type: STRING + mode: NULLABLE + - name: param3 + type: STRING + mode: REQUIRED diff --git a/examples/data-processing/data-processing.jinja b/examples/data-processing/data-processing.jinja new file mode 100644 index 000000000..62e8db6eb --- /dev/null +++ b/examples/data-processing/data-processing.jinja @@ -0,0 +1,31 @@ +{% set PROJECT = properties['project'] %} +{% set RABBITMQ = properties['rabbitmq'] or {} %} +{% set RABBITMQ_USERNAME = RABBITMQ['username'] %} +{% set RABBITMQ_PASSWORD = RABBITMQ['password'] %} + +resources: +- name: rabbitmq + type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/rabbitmq/v1/rabbitmq.jinja + properties: + {{ RABBITMQ }} +- name: http-frontend + type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py + properties: + service_port: 80 + container_port: 80 + replicas: 2 + external_service: true + image: gcr.io/dm-k8s-testing/http-frontend:latest + env: + - name: RABBITMQ_SERVER + value: rabbitmq-service + - name: RABBITMQ_USERNAME + value: {{ RABBITMQ_USERNAME }} + - name: RABBITMQ_PASSWORD + value: {{ RABBITMQ_PASSWORD }} +- name: storm + type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/storm/v1/storm.jinja + properties: +{% if PROJECT is defined %} + project: {{ PROJECT }} +{% endif %} diff --git a/examples/data-processing/data-processing.jinja.schema b/examples/data-processing/data-processing.jinja.schema new file mode 100644 index 000000000..014e8f27b --- /dev/null +++ b/examples/data-processing/data-processing.jinja.schema @@ -0,0 +1,31 @@ +info: + title: Data processing system + description: Defines a system that takes messages as input, stores them into RabbitMQ clusters, processes them using Apache Storm, and then stores them into BigQuery. + +properties: + rabbitmq: + type: object + required: + - username + - password + properties: + disk: + type: string + default: rabbitmq-disk + description: The name of the persistent disk the RabbitMQ service uses. + fstype: + type: string + default: ext4 + description: The filesystem the disk of the RabbitMQ service uses. + ips: + type: array + description: If any IP addresses are given we assume RabbitMQ runs on an external server (say GCE) and is accessible by these IP addresses. + items: + type: string + uniqueItems: true + username: + type: string + description: The username for the RabbitMQ service. + password: + type: string + description: The password for the RabbitMQ service. diff --git a/examples/data-processing/data-processing.yaml b/examples/data-processing/data-processing.yaml new file mode 100644 index 000000000..9853c5320 --- /dev/null +++ b/examples/data-processing/data-processing.yaml @@ -0,0 +1,14 @@ +imports: +- path: data-processing.jinja + +resources: +- name: data-processing + type: data-processing.jinja + properties: + project: dm-k8s-testing + rabbitmq: + # Replace with your information + username: rabbit + password: rabbit-password + # ips: + # - diff --git a/examples/data-processing/images/http-frontend/Dockerfile b/examples/data-processing/images/http-frontend/Dockerfile new file mode 100644 index 000000000..3fa58ff7a --- /dev/null +++ b/examples/data-processing/images/http-frontend/Dockerfile @@ -0,0 +1,2 @@ +FROM golang:onbuild +EXPOSE 8080 diff --git a/examples/data-processing/images/http-frontend/Makefile b/examples/data-processing/images/http-frontend/Makefile new file mode 100644 index 000000000..2ea87a4fc --- /dev/null +++ b/examples/data-processing/images/http-frontend/Makefile @@ -0,0 +1,23 @@ +.PHONY: all build push clean + +DOCKER_REGISTRY = gcr.io +PREFIX = $(DOCKER_REGISTRY)/$(PROJECT) +IMAGE = http-frontend +TAG = latest + +DIR = . + +all: build + +build: + docker build -t $(PREFIX)/$(IMAGE):$(TAG) $(DIR) + +push: build +ifeq ($(DOCKER_REGISTRY),gcr.io) + gcloud docker push $(PREFIX)/$(IMAGE):$(TAG) +else + docker push $(PREFIX)/$(IMAGE):$(TAG) +endif + +clean: + docker rmi $(PREFIX)/$(IMAGE):$(TAG) diff --git a/examples/data-processing/images/http-frontend/main.go b/examples/data-processing/images/http-frontend/main.go new file mode 100644 index 000000000..0cbfab077 --- /dev/null +++ b/examples/data-processing/images/http-frontend/main.go @@ -0,0 +1,58 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "os" + + "github.com/codegangsta/negroni" + "github.com/gorilla/mux" + "github.com/streadway/amqp" +) + +var channel *amqp.Channel +var queue amqp.Queue + +func SendMessageHandler(rw http.ResponseWriter, req *http.Request) { + params := []string{"param1", "param2", "param3"} + data := map[string]string{} + get_variables := req.URL.Query() + for _, param := range params { + data[param] = get_variables.Get(param) + } + json_data, err := json.Marshal(data) + HandleError(err, "Error while converting query data to JSON") + err = channel.Publish("", queue.Name, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent, ContentType: "application/json", Body: []byte(json_data)}) + HandleError(err, "Failed to publish a message") + fmt.Fprintln(rw, "Success!") +} + +func HandleError(err error, msg string) { + if err != nil { + log.Fatalf("%s: %s", msg, err) + panic(fmt.Sprintf("%s: %s", msg, err)) + } +} + +func main() { + broker := os.Getenv("RABBITMQ_SERVER") + username := os.Getenv("RABBITMQ_USERNAME") + password := os.Getenv("RABBITMQ_PASSWORD") + conn, err := amqp.Dial("amqp://" + username + ":" + password + "@" + broker + ":5672/") + HandleError(err, "Failed to connect to RabbitMQ") + defer conn.Close() + channel, err = conn.Channel() + HandleError(err, "Failed to open a channel") + defer channel.Close() + queue, err = channel.QueueDeclare("messages", true, false, false, false, nil) + HandleError(err, "Failed to declare a queue") + + r := mux.NewRouter() + r.Path("/send_data/").Methods("GET").HandlerFunc(SendMessageHandler) + + n := negroni.Classic() + n.UseHandler(r) + n.Run(":80") +} diff --git a/examples/data-processing/images/storm-worker/Dockerfile b/examples/data-processing/images/storm-worker/Dockerfile new file mode 100644 index 000000000..448ea3c13 --- /dev/null +++ b/examples/data-processing/images/storm-worker/Dockerfile @@ -0,0 +1,11 @@ +FROM gcr.io/dm-k8s-testing/storm-base + +ADD start.sh / + +EXPOSE 6700 6701 6702 6703 + +WORKDIR /opt/apache-storm +ADD google_application_credentials.json ./ +ENV GOOGLE_APPLICATION_CREDENTIALS /opt/apache-storm/google_application_credentials.json + +ENTRYPOINT ["/start.sh"] diff --git a/examples/data-processing/images/storm-worker/Makefile b/examples/data-processing/images/storm-worker/Makefile new file mode 100644 index 000000000..f7af1fc2d --- /dev/null +++ b/examples/data-processing/images/storm-worker/Makefile @@ -0,0 +1,23 @@ +.PHONY: all build push clean + +DOCKER_REGISTRY = gcr.io +PREFIX = $(DOCKER_REGISTRY)/$(PROJECT) +IMAGE = storm-worker +TAG = latest + +DIR = . + +all: build + +build: + docker build -t $(PREFIX)/$(IMAGE):$(TAG) $(DIR) + +push: build +ifeq ($(DOCKER_REGISTRY),gcr.io) + gcloud docker push $(PREFIX)/$(IMAGE):$(TAG) +else + docker push $(PREFIX)/$(IMAGE):$(TAG) +endif + +clean: + docker rmi $(PREFIX)/$(IMAGE):$(TAG) diff --git a/examples/data-processing/images/storm-worker/google_application_credentials.json b/examples/data-processing/images/storm-worker/google_application_credentials.json new file mode 100644 index 000000000..e69de29bb diff --git a/examples/data-processing/images/storm-worker/start.sh b/examples/data-processing/images/storm-worker/start.sh new file mode 100755 index 000000000..224029777 --- /dev/null +++ b/examples/data-processing/images/storm-worker/start.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +/configure.sh ${ZOOKEEPER_SERVICE_HOST:-$1} ${NIMBUS_SERVICE_HOST:-$2} + +exec bin/storm supervisor diff --git a/examples/data-processing/storm-topology/pom.xml b/examples/data-processing/storm-topology/pom.xml new file mode 100644 index 000000000..7e117c9ea --- /dev/null +++ b/examples/data-processing/storm-topology/pom.xml @@ -0,0 +1,136 @@ + + + + 4.0.0 + + storm + org.apache.storm + 0.9.5 + ../../pom.xml + + + org.google.cloud + jar + + + + + + org.apache.storm + storm-core + ${project.version} + + provided + + + commons-collections + commons-collections + 3.2.1 + + + com.rabbitmq + amqp-client + 3.5.6 + + + io.latent + storm-rabbitmq + 0.6.0 + + + com.google.apis + google-api-services-bigquery + v2-rev158-1.19.0 + + + com.google.oauth-client + google-oauth-client + 1.19.0 + + + com.google.http-client + google-http-client-jackson2 + 1.19.0 + + + com.google.oauth-client + google-oauth-client-jetty + 1.19.0 + + + com.google.code.gson + gson + 2.3.1 + + + + + src/jvm + test/jvm + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + + + + + + make-assembly + package + + single + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + + exec + + + + + java + true + false + compile + ${storm.topology} + + + + + data-processing + data-processing + diff --git a/examples/data-processing/storm-topology/src/jvm/storm/dataprocessing/DataProcessingTopology.java b/examples/data-processing/storm-topology/src/jvm/storm/dataprocessing/DataProcessingTopology.java new file mode 100644 index 000000000..bcd40e54f --- /dev/null +++ b/examples/data-processing/storm-topology/src/jvm/storm/dataprocessing/DataProcessingTopology.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.dataprocessing; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.GetResponse; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.spout.Scheme; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; +import io.latent.storm.rabbitmq.RabbitMQSpout; +import io.latent.storm.rabbitmq.config.ConnectionConfig; +import io.latent.storm.rabbitmq.config.ConsumerConfig; +import io.latent.storm.rabbitmq.config.ConsumerConfigBuilder; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.BigqueryScopes; +import com.google.api.services.bigquery.model.TableDataInsertAllRequest; +import com.google.api.services.bigquery.model.TableDataInsertAllResponse; +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; +import com.google.gson.stream.JsonReader; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Collections; +import java.util.Collection; +import java.io.StringReader; +import java.io.IOException; + +/** + * This is a basic example of a Storm topology. + */ +public class DataProcessingTopology { + + private static final String RABBIT_USERNAME = "rabbit"; + private static final String RABBIT_PASSWORD = "rabbit-password"; + private static final String RABBIT_HOST = "rabbitmq-service"; + private static final int RABBIT_PORT = 5672; + // Replace with your project id + private static final String PROJECT_ID = "dm-k8s-testing"; + private static final String DATASET_ID = "messages_dataset"; + private static final String TABLE_ID = "messages_dataset_table"; + private static Bigquery bigquery; + + /** + * This class creates our Service to connect to Bigquery including auth. + */ + public final static class BigqueryServiceFactory { + + /** + * Private constructor to disable creation of this utility Factory class. + */ + private BigqueryServiceFactory() { + + } + + /** + * Singleton service used through the app. + */ + private static Bigquery service = null; + + /** + * Mutex created to create the singleton in thread-safe fashion. + */ + private static Object serviceLock = new Object(); + + /** + * Threadsafe Factory that provides an authorized Bigquery service. + * @return The Bigquery service + * @throws IOException Thronw if there is an error connecting to Bigquery. + */ + public static Bigquery getService() throws IOException { + if (service == null) { + synchronized (serviceLock) { + if (service == null) { + service = createAuthorizedClient(); + } + } + } + return service; + } + + /** + * Creates an authorized client to Google Bigquery. + * + * @return The BigQuery Service + * @throws IOException Thrown if there is an error connecting + */ + // [START get_service] + private static Bigquery createAuthorizedClient() throws IOException { + // Create the credential + HttpTransport transport = new NetHttpTransport(); + JsonFactory jsonFactory = new JacksonFactory(); + GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory); + + // Depending on the environment that provides the default credentials (e.g. Compute Engine, App + // Engine), the credentials may require us to specify the scopes we need explicitly. + // Check for this case, and inject the Bigquery scope if required. + if (credential.createScopedRequired()) { + Collection bigqueryScopes = BigqueryScopes.all(); + credential = credential.createScoped(bigqueryScopes); + } + + return new Bigquery.Builder(transport, jsonFactory, credential) + .setApplicationName("Data processing storm").build(); + } + // [END get_service] + + } + + public static void submit_to_bigquery(JsonReader rows) throws IOException { + final Gson gson = new Gson(); + Map rowData = gson.>fromJson(rows, (new HashMap()).getClass()); + final TableDataInsertAllRequest.Rows row = new TableDataInsertAllRequest.Rows().setJson(rowData); + bigquery.tabledata().insertAll(PROJECT_ID, DATASET_ID, TABLE_ID, new TableDataInsertAllRequest().setRows(Collections.singletonList(row))).execute(); + } + + public static class StoreToBigQueryBolt extends BaseRichBolt { + OutputCollector _collector; + + @Override + public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + _collector = collector; + try { + bigquery = BigqueryServiceFactory.getService(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void execute(Tuple tuple) { + String json = tuple.getString(0); + // Do processing + // Send it to bigquery + try { + submit_to_bigquery(new JsonReader(new StringReader(json))); + } catch (IOException e) { + e.printStackTrace(); + } + // Pass it on + // _collector.emit(tuple, new Values(json)); + // Acknowledge it + _collector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + } + + public static class MessageScheme implements Scheme { + @Override + public List deserialize(byte[] ser) { + List objects = new ArrayList(); + objects.add(new String(ser)); + return objects; + } + + @Override + public Fields getOutputFields() { + Fields fields = new Fields("json_string"); + return fields; + } + } + + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + Scheme scheme = new MessageScheme(); + IRichSpout spout = new RabbitMQSpout(scheme); + ConnectionConfig connectionConfig = new ConnectionConfig(RABBIT_HOST, RABBIT_PORT, RABBIT_USERNAME, RABBIT_PASSWORD, ConnectionFactory.DEFAULT_VHOST, 580); // host, port, username, password, virtualHost, heartBeat + ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig) + .queue("messages") + .prefetch(200) + .requeueOnFail() + .build(); + builder.setSpout("rabbitmq", spout, 100) + .setNumTasks(100) + .addConfigurations(spoutConfig.asMap()) + .setMaxSpoutPending(200); + builder.setBolt("process-message", new StoreToBigQueryBolt(), 100).shuffleGrouping("rabbitmq").setNumTasks(100); + + Config conf = new Config(); + + if (args != null && args.length > 0) { + conf.setDebug(false); + conf.setNumWorkers(14); + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); + } else { + conf.setDebug(true); + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", conf, builder.createTopology()); + Utils.sleep(10000); + cluster.killTopology("test"); + cluster.shutdown(); + } + } +} diff --git a/examples/data-processing/storm-topology/target/.plxarc b/examples/data-processing/storm-topology/target/.plxarc new file mode 100644 index 000000000..67ea6eea2 --- /dev/null +++ b/examples/data-processing/storm-topology/target/.plxarc @@ -0,0 +1 @@ +maven-shared-archive-resources \ No newline at end of file diff --git a/examples/data-processing/storm-topology/target/data-processing-0.9.5-jar-with-dependencies.jar b/examples/data-processing/storm-topology/target/data-processing-0.9.5-jar-with-dependencies.jar new file mode 100644 index 000000000..d54250176 Binary files /dev/null and b/examples/data-processing/storm-topology/target/data-processing-0.9.5-jar-with-dependencies.jar differ diff --git a/templates/rabbitmq/v1/images/rabbitmq/Dockerfile b/templates/rabbitmq/v1/images/rabbitmq/Dockerfile new file mode 100644 index 000000000..3fbc2eb07 --- /dev/null +++ b/templates/rabbitmq/v1/images/rabbitmq/Dockerfile @@ -0,0 +1,3 @@ +FROM rabbitmq + +COPY docker-entrypoint.sh / diff --git a/templates/rabbitmq/v1/images/rabbitmq/Makefile b/templates/rabbitmq/v1/images/rabbitmq/Makefile new file mode 100644 index 000000000..28892d62d --- /dev/null +++ b/templates/rabbitmq/v1/images/rabbitmq/Makefile @@ -0,0 +1,23 @@ +.PHONY: all build push clean + +DOCKER_REGISTRY = gcr.io +PREFIX = $(DOCKER_REGISTRY)/$(PROJECT) +IMAGE = rabbitmq +TAG = latest + +DIR = . + +all: build + +build: + docker build -t $(PREFIX)/$(IMAGE):$(TAG) $(DIR) + +push: build +ifeq ($(DOCKER_REGISTRY),gcr.io) + gcloud docker push $(PREFIX)/$(IMAGE):$(TAG) +else + docker push $(PREFIX)/$(IMAGE):$(TAG) +endif + +clean: + docker rmi $(PREFIX)/$(IMAGE):$(TAG) diff --git a/templates/rabbitmq/v1/images/rabbitmq/docker-entrypoint.sh b/templates/rabbitmq/v1/images/rabbitmq/docker-entrypoint.sh new file mode 100755 index 000000000..9109c74be --- /dev/null +++ b/templates/rabbitmq/v1/images/rabbitmq/docker-entrypoint.sh @@ -0,0 +1,87 @@ +#!/bin/bash +set -e + +if [ "$RABBITMQ_ERLANG_COOKIE" ]; then + cookieFile='/var/lib/rabbitmq/.erlang.cookie' + if [ -e "$cookieFile" ]; then + if [ "$(cat "$cookieFile" 2>/dev/null)" != "$RABBITMQ_ERLANG_COOKIE" ]; then + echo >&2 + echo >&2 "warning: $cookieFile contents do not match RABBITMQ_ERLANG_COOKIE" + echo >&2 + fi + else + echo "$RABBITMQ_ERLANG_COOKIE" > "$cookieFile" + chmod 600 "$cookieFile" + chown rabbitmq "$cookieFile" + fi +fi + +if [ "$1" = 'rabbitmq-server' ]; then + configs=( + # https://www.rabbitmq.com/configure.html + default_vhost + default_user + default_pass + ) + + haveConfig= + for conf in "${configs[@]}"; do + var="RABBITMQ_${conf^^}" + val="${!var}" + if [ "$val" ]; then + haveConfig=1 + break + fi + done + + if [ "$haveConfig" ]; then + cat > /etc/rabbitmq/rabbitmq.config <<-'EOH' + [ + {rabbit, + [ + EOH + for conf in "${configs[@]}"; do + var="RABBITMQ_${conf^^}" + val="${!var}" + [ "$val" ] || continue + cat >> /etc/rabbitmq/rabbitmq.config <<-EOC + {$conf, <<"$val">>}, + EOC + done + cat >> /etc/rabbitmq/rabbitmq.config <<-'EOF' + {loopback_users, []} + ] + } + ]. + EOF + fi + + chown -R rabbitmq /var/lib/rabbitmq + set -- gosu rabbitmq "$@" +fi + +mkdir -p /var/lib/rabbitmq/mnesia +cd /var/lib/rabbitmq/mnesia +if [ -e rabbit@hostname ]; +then + OLDHOSTNAME=$(readlink rabbit@hostname) + # Copy old database over to current hostname + # cp rabbit@hostname.pid rabbit@`hostname`.pid || : + cd rabbit@hostname + # Replace any references to the old hostname to the current one + # Add this behind the above command to remove everything in front of the @ + # symbol (including the @) | grep -o '@.*' | cut -f2- -d'@' + find . -type f -exec sed -i "s/$OLDHOSTNAME/rabbit@$(hostname)/g" {} + + cd .. + cp -r "$OLDHOSTNAME" rabbit@`hostname` + cp -r "$OLDHOSTNAME-plugins-expand" rabbit@`hostname`-plugins-expand + rm -fr "$OLDHOSTNAME" + rm -fr "$OLDHOSTNAME-plugins-expand" +fi +# Link current database to the rabbit@hostname files +ln -sfn rabbit@`hostname` rabbit@hostname +# ln -sfn rabbit@`hostname`.pid rabbit@hostname.pid +ln -sfn rabbit@`hostname`-plugins-expand rabbit@hostname-plugins-expand +chown -R rabbitmq:rabbitmq /var/lib/rabbitmq + +exec "$@" diff --git a/templates/rabbitmq/v1/rabbitmq.jinja b/templates/rabbitmq/v1/rabbitmq.jinja new file mode 100644 index 000000000..bc5f497a5 --- /dev/null +++ b/templates/rabbitmq/v1/rabbitmq.jinja @@ -0,0 +1,54 @@ +{% set RABBITMQ = properties or {} %} +{% set RABBITMQ_DISK = RABBITMQ['disk'] or 'rabbitmq-disk' %} +{% set RABBITMQ_FSTYPE = RABBITMQ['fstype'] or 'ext4' %} +{% set RABBITMQ_IPS = RABBITMQ['ips'] %} +{% set RABBITMQ_USERNAME = RABBITMQ['username'] %} +{% set RABBITMQ_PASSWORD = RABBITMQ['password'] %} + +resources: +{% if RABBITMQ_IPS %} +- name: rabbitmq-service + type: Service + properties: + kind: Service + apiVersion: v1 + metadata: + name: rabbitmq-service + spec: + ports: + - protocol: TCP + port: 5672 + targetPort: 5672 +- name: rabbitmq-endpoints + type: Endpoints + properties: + kind: Endpoints + apiVersion: v1 + metadata: + name: rabbitmq-service + subsets: + - addresses: + {% for IP in RABBITMQ_IPS %} + - IP: {{ IP }} + {% endfor %} + ports: + - port: 5672 +{% else %} +- name: rabbitmq + type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/replicatedservice/v2/replicatedservice.py + properties: + service_port: 5672 + container_port: 5672 + replicas: 1 + image: gcr.io/dm-k8s-testing/rabbitmq:latest + env: + - name: RABBITMQ_DEFAULT_USER + value: {{ RABBITMQ_USERNAME }} + - name: RABBITMQ_DEFAULT_PASS + value: {{ RABBITMQ_PASSWORD }} + volumes: + - mount_path: /var/lib/rabbitmq/ + gcePersistentDisk: + pdName: {{ RABBITMQ_DISK }} + fsType: {{ RABBITMQ_FSTYPE }} +{% endif %} diff --git a/templates/rabbitmq/v1/rabbitmq.jinja.schema b/templates/rabbitmq/v1/rabbitmq.jinja.schema new file mode 100644 index 000000000..38ca64c4a --- /dev/null +++ b/templates/rabbitmq/v1/rabbitmq.jinja.schema @@ -0,0 +1,28 @@ +info: + title: RabbitMQ + description: Defines a RabbitMQ service. + +required: +- username +- password +properties: + disk: + type: string + default: rabbitmq-disk + description: The name of the persistent disk the RabbitMQ service uses. + fstype: + type: string + default: ext4 + description: The filesystem the disk of the RabbitMQ service uses. + ips: + type: array + description: If any IP addresses are given we assume RabbitMQ runs on an external server (say GCE) and is accessible by these IP addresses. + items: + type: string + uniqueItems: true + username: + type: string + description: The username for the RabbitMQ service. + password: + type: string + description: The password for the RabbitMQ service. diff --git a/templates/rabbitmq/v1/rabbitmq.yaml b/templates/rabbitmq/v1/rabbitmq.yaml new file mode 100644 index 000000000..f821bde45 --- /dev/null +++ b/templates/rabbitmq/v1/rabbitmq.yaml @@ -0,0 +1,8 @@ +- name: rabbitmq + type: https://raw.githubusercontent.com/kubernetes/deployment-manager/master/templates/rabbitmq/v1/rabbitmq.jinja + properties: + # Replace with your information + username: rabbit + password: rabbit-password + ips: + - 10.240.0.8 diff --git a/templates/storm/v1/images/storm-base/Dockerfile b/templates/storm/v1/images/storm-base/Dockerfile new file mode 100644 index 000000000..8ed9bb178 --- /dev/null +++ b/templates/storm/v1/images/storm-base/Dockerfile @@ -0,0 +1,9 @@ +FROM ubuntu:latest + +RUN apt-get update -y && apt-get install -y openjdk-7-jre tar dnsutils curl + +RUN curl -s http://archive.apache.org/dist/storm/apache-storm-0.9.5/apache-storm-0.9.5.tar.gz | \ + tar zxf - -C /opt && \ + mv /opt/apache-storm-0.9.5 /opt/apache-storm + +ADD configure.sh / diff --git a/templates/storm/v1/images/storm-base/Makefile b/templates/storm/v1/images/storm-base/Makefile new file mode 100644 index 000000000..b35225b4c --- /dev/null +++ b/templates/storm/v1/images/storm-base/Makefile @@ -0,0 +1,23 @@ +.PHONY: all build push clean + +DOCKER_REGISTRY = gcr.io +PREFIX = $(DOCKER_REGISTRY)/$(PROJECT) +IMAGE = storm-base +TAG = latest + +DIR = . + +all: build + +build: +» docker build -t $(PREFIX)/$(IMAGE):$(TAG) $(DIR) + +push: build +ifeq ($(DOCKER_REGISTRY),gcr.io) +» gcloud docker push $(PREFIX)/$(IMAGE):$(TAG) +else +» docker push $(PREFIX)/$(IMAGE):$(TAG) +endif + +clean: +» docker rmi $(PREFIX)/$(IMAGE):$(TAG) diff --git a/templates/storm/v1/images/storm-base/configure.sh b/templates/storm/v1/images/storm-base/configure.sh new file mode 100755 index 000000000..728e310b9 --- /dev/null +++ b/templates/storm/v1/images/storm-base/configure.sh @@ -0,0 +1,24 @@ +#!/bin/sh + +cat >> conf/storm.yaml <> conf/storm.yaml <> conf/storm.yaml <> conf/storm.yaml <