https://archive.ics.uci.edu/ml/datasets.html
分类:技术
技术
将kafka的数据导入至ElasticSearch
环境:ElasticSearch 1.4.4, elasticsearch-river-kafka-1.2.1-plugin, kafka 0.8.1
安装ElasticSearch的kafka插件
.bin/plugin -install kafka-river -url https://github.com/mariamhakobyan/elasticsearch-river-kafka/releases/download/v1.2.1/elasticsearch-river-kafka-1.2.1-plugin.zip
增加元数据
curl -XPUT ‘localhost:9200/_river/kafka-river/_meta’ -d ‘
{
“type” : “kafka”,
“kafka” : {
“zookeeper.connect” : “xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181”,
“zookeeper.connection.timeout.ms” : 10000,
“topic” : “flume-topic1”,
“message.type” : “json”
},
“index” : {
“index” : “kafka-index”,
“type” : “status”,
“bulk.size” : 3,
“concurrent.requests” : 1,
“action.type” : “index”,
“flush.interval” : “12h”
}
}’
重启ElasticSearch的服务
查看元数据状态
curl -XGET ‘http://localhost:9200/_river/kafka-river/_search?pretty’
curl -XGET ‘http://localhost:9200/_river/kafka-index/_search?pretty’
curl -XDELETE ‘localhost:9200/_river/kafka-river/’
在kafka生成json数据
bin/kafka-console-producer.sh –topic flume-topic1 –broker-list xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092
{“id”:”123″, “name”:”hq”}
{“id”:”123″, “name”:”hq”}
{“id”:”123″, “name”:”hq”}
{“id”:”123″, “name”:”hq”}
查看最终数据
curl -XGET ‘http://localhost:9200/kafka-index/_search?pretty’
Flume配置导入kafka,ElasticSearch
环境:CentOS 6.3, Kafka 8.1, Flume 1.6, elasticsearch-1.4.4
配置文件如下:
[adadmin@s9 apache-flume-1.6.0-bin]$ vi conf/flume.conf
#define source, sink, channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /home/adadmin/.bash_history
# Describe the sink
#only test
#a1.sinks.k1.type = logger
#load to Kafka
#a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.k1.batchSize = 5
#a1.sinks.k1.brokerList = xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092
#a1.sinks.k1.topic = flume_topic1
#load to ElasticSearch
a1.sinks.k1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
a1.sinks.k1.hostNames = xxx.xxx.xxx.xxx:9300
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 100
a1.sinks.k1.indexName = logstash
a1.sinks.k1.ttl = 5
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启用Flume agent
[adadmin@s9 apache-flume-1.6.0-bin]$ bin/flume-ng agent -c /home/adadmin/apache-flume-1.6.0-bin/conf -f /home/adadmin/apache-flume-1.6.0-bin/conf/flume.conf -n a1 -Dflume.root.logger=INFO,console
(注:在导入ElasticSearch时需要把此文件的lib导入到flume的库目录下,操作如下:
[adadmin@s9 apache-flume-1.6.0-bin]$ mkdir -p plugins.d/elasticsearch/libext
[adadmin@s9 apache-flume-1.6.0-bin]$cp /home/adadmin/elasticsearch-1.4.4/lib/*.jar plugins.d/elasticsearch/libext
)
Pentaho BI Server实现同比和环比
环境: Pentaho 5.3, postgresql 9.3
最近在看pentaho report designer和CDE有没有实现类似同比和环比的功能,可惜的是没有找到。那只好从数据库的角度来解决这个问题。
假如有两表test, dim_date
test:
20140818;4
20150817;40
20150818;10
20150819;55
20160817;30
20160818;50
dim_date:
20150104;”2015年01月04日”;”2015年”;”第01月”;”2015-01-04″;”第1周”
20150103;”2015年01月03日”;”2015年”;”第01月”;”2015-01-03″;”第1周”
20150102;”2015年01月02日”;”2015年”;”第01月”;”2015-01-02″;”第1周”
通过olap窗口函数lag,语句实现如下:
select * from (
select date_id, volume, lag(volume, 1) over (order by date_fmt) pre_volume, lag(volume, 2) over (order by date_fmt) pre_365_volume from (
select b.date_id, b.date_fmt, a.volume from test a, dim_date b where a.date_id = b.date_id and b.date_fmt in(to_date(‘2016-08-18’, ‘yyyy-mm-dd’) – interval ‘1 day’, ‘2016-08-18’, to_date(‘2016-08-18’, ‘yyyy-mm-dd’) – interval ‘1 year’)
) m
) n where n.date_id = 20160818
可查实现查询日期的前一天和前一年同一日期的数据,这样就可以实现同比和环比的功能。
单个zookeeper集群布署多个kafka集群
1. 首先的zookeeper上创建目录kafka2
bin/zkCli.sh -server xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181
create /kafka2 ”
2. 修改另外一个kafka集群内的server.properties配置文件
zookeeper.connect=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181/kafka2
调度系统Azkaban
Azkaban包括三个关键组件:
关系数据库:使用 Mysql数据库,主要用于保存流程、权限、任务状态、任务计划等信息。
AzkabanWebServer:为用户提供管理留存、任务计划、权限等功能。
AzkabanExecutorServer:执行任务,并把任务执行的输出日志保存到 Mysql;可以同时启动多个 AzkabanExecutorServer,通过mysql获取流程状态来协调工作。
安装步骤
1. 创建数据库azkaban,加载相应的元数据表(azkaban-sql-script-2.5.0.tar.gz)
CREATE DATABASE azkaban;
GRANT all privileges ON azkaban.* to ‘hq’@’%’;
mysql> source create-all-sql-2.5.0.sql
2. 下载并安装azkaban-web-server-2.5.0.tar.gz
tar xvf azkaban-web-server-2.5.0.tar.gz
创建SSL配置,命令如下:
keytool -keystore keystore -alias jetty -genkey -keyalg RSA
cp keystore azkaban-web-2.5.0/
cd azkaban-web-2.5.0
修改配置参数
vi conf/azkaban.properties
default.timezone.id=Asia/Shanghai
database.type=mysql
mysql.port=3306
mysql.host=xxx.xxx.xxx.xxx
mysql.database=azkaban
mysql.user=hq
mysql.password=xxxxxx
jetty.keystore=keystore
jetty.password=azkaban #(配置SSL的密码)
jetty.keypassword=azkaban #(配置SSL的密码)
jetty.truststore=keystore
jetty.trustpassword=azkaban #(配置SSL的密码)
3. 下载并安装azkaban-executor-server-2.5.0.tar.gz
tar xvf azkaban-executor-server-2.5.0.tar.gz
cd azkaban-executor-2.5.0
修改executor的运行参数
vi conf/azkaban.properties
mysql.host=xxx.xxx.xxx.xxx
mysql.database=azkaban
mysql.user=hq
mysql.password=xxxxxx
4. 启动web和executor的服务
cd azkaban-web-2.5.0
bin/azkaban-web-start.sh
cd azkaban-executor-2.5.0
bin/azkaban-executor-start.sh
升级gcc至4.9
环境: Ubuntu 14.02, gcc 4.8
安装gcc 4.9
sudo add-apt-repository ppa:ubuntu-toolchain-r/test
sudo apt-get update
sudo apt-get install g++-4.9
修改默认的gcc版本
sudo update-alternatives –install /usr/bin/gcc gcc /usr/bin/gcc-4.9 150
sudo update-alternatives –install /usr/bin/gcc gcc /usr/bin/gcc-4.8 100
sudo update-alternatives –config gcc
提取postgresql备份数据库内的数据
环境:CentOS 6.4, Postgresql9.4
今天一不小心把一张表的数据直接truncate掉了,头脑顿时发黑。 庆幸的是还有一份前几天的数据备份。
步骤如下:
找出对应表装载数据对应的行
cat alldb20150717.sql | grep -C number “COPY tab_stats”
提取出该行往后3000行的数据
cat alldb20150717.sql | grep -A3000 “COPY tab_stats”
postgresql压力测试pgbench
环境:CentOS 6.5, postgresql 9.4.2
pgbench是一款oltp压力测试软件, 使用TPC-B来模拟压力测试。
使用如下:
./pgbench –help
pgbench is a benchmarking tool for PostgreSQL.
Usage:
pgbench [OPTION]… [DBNAME]
Initialization options:
-i, –initialize invokes initialization mode
-F, –fillfactor=NUM set fill factor
-n, –no-vacuum do not run VACUUM after initialization
-q, –quiet quiet logging (one message each 5 seconds)
-s, –scale=NUM scaling factor
–foreign-keys create foreign key constraints between tables
–index-tablespace=TABLESPACE
create indexes in the specified tablespace
–tablespace=TABLESPACE create tables in the specified tablespace
–unlogged-tables create tables as unlogged tables
Benchmarking options:
-c, –client=NUM number of concurrent database clients (default: 1)
-C, –connect establish new connection for each transaction
-D, –define=VARNAME=VALUE
define variable for use by custom script
-f, –file=FILENAME read transaction script from FILENAME
-j, –jobs=NUM number of threads (default: 1)
-l, –log write transaction times to log file
-M, –protocol=simple|extended|prepared
protocol for submitting queries (default: simple)
-n, –no-vacuum do not run VACUUM before tests
-N, –skip-some-updates skip updates of pgbench_tellers and pgbench_branches
-P, –progress=NUM show thread progress report every NUM seconds
-r, –report-latencies report average latency per command
-R, –rate=NUM target rate in transactions per second
-s, –scale=NUM report this scale factor in output
-S, –select-only perform SELECT-only transactions
-t, –transactions=NUM number of transactions each client runs (default: 10)
-T, –time=NUM duration of benchmark test in seconds
-v, –vacuum-all vacuum all four standard tables before tests
–aggregate-interval=NUM aggregate data over NUM seconds
–sampling-rate=NUM fraction of transactions to log (e.g. 0.01 for 1%)
Common options:
-d, –debug print debugging output
-h, –host=HOSTNAME database server host or socket directory
-p, –port=PORT database server port number
-U, –username=USERNAME connect as specified database user
-V, –version output version information, then exit
-?, –help show this help, then exit
Report bugs to <pgsql-bugs@postgresql.org>.
先创建测试数据库benchdb,
./pgbench -i benchdb
./pgbench -c 50 -t 10 -r benchdb
starting vacuum…end.
transaction type: TPC-B (sort of)
scaling factor: 1
query mode: simple
number of clients: 50
number of threads: 1
number of transactions per client: 10
number of transactions actually processed: 500/500
latency average: 0.000 ms
tps = 119.220347 (including connections establishing)
tps = 124.124611 (excluding connections establishing)
statement latencies in milliseconds:
0.007204 \\set nbranches 1 * :scale
0.001772 \\set ntellers 10 * :scale
0.001308 \\set naccounts 100000 * :scale
0.001818 \\setrandom aid 1 :naccounts
0.001568 \\setrandom bid 1 :nbranches
0.001390 \\setrandom tid 1 :ntellers
0.001702 \\setrandom delta -5000 5000
1.218338 BEGIN;
1.219992 UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;
0.406070 SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
257.778830 UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;
53.106822 UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;
0.349952 INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
7.491730 END;
通过user agent信息判断浏览器信息
环境: Ubuntu 14.04
在做广告日志分析有一个字段是user-agent,是Http协议中的一部分,属于头域的组成部分,可以通过该属性来识别出所使用的浏览器类型及版本、操作系统及版本、浏览器内核、等信息的标识。
之前想通过字符串的正则匹配来识别出相应的字段,发现无法做到。后来查找有一个网站提供api能精确识别出来。
代码如下:
#coding:utf-8
#/usr/bin/python2.6
import sys
import urllib
import urllib2
import json
import pprint
ua = “Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.4 (KHTML, like Gecko) Chrome/13.0.782.41 Safari/535.4”
ua_url = urllib.quote(ua) # 转url编码
url = “http://www.useragentstring.com/?uas=%s&getJSON=all” % ua_url
req = urllib2.Request(url)
j = urllib2.urlopen(req).read()
j = json.loads(j)
print json.dumps(j, indent=4, sort_keys=True)
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(j)