Read, convert, and store sync log data to Parquet form per bug 1291340.
Conversion code is ported from the smt repo.
from datetime import datetime as dt, timedelta, date from moztelemetry.dataset import Dataset from os import environ # Determine run parameters source_bucket = 'net-mozaws-prod-us-west-2-pipeline-analysis' dest_bucket = source_bucket dest_s3_prefix = "s3://{}/mreid".format(dest_bucket) if "bucket" in os.environ: dest_bucket = environ["bucket"] dest_s3_prefix = "s3://{}".format(dest_bucket) yesterday = dt.strftime(dt.utcnow() - timedelta(1), "%Y%m%d") # Default to running for "yesterday" unless we've been given a # specific date via the environment. target_day = environ.get("date", yesterday) print "Running import for {}".format(target_day)
The sync data on S3 is stored in framed heka format, and is read using the Dataset
API.
# Read the source data schema = [] target_prefix = 'sync-metrics/data' sync = Dataset(source_bucket, schema, prefix=target_prefix) # The sync data on S3 does not have a proper "date" dimension, but the date is encoded # in the key names themselves. # Fetch the summaries and filter the list to the target day. summary_prefix = "{}/{}".format(target_prefix, target_day) sync_summaries = [ s for s in sync.summaries(sc) if s['key'].startswith(summary_prefix) ]
The standard heka decoder assumes (based on Telemetry data) that all fields whose names have a .
in them contain nested json strings. This is not true for sync log messages, which have fields such as syncstorage.storage.sql.db.execute
with simple scalar values.
import ssl from moztelemetry.heka.message_parser import unpack # Custom decoder for sync messages since we can have scalar fields with dots in their names. def sync_decoder(message): try: for record, total_bytes in unpack(message): result = {} result["meta"] = { "Timestamp": record.message.timestamp, "Type": record.message.type, "Hostname": record.message.hostname, } for field in record.message.fields: name = field.name value = field.value_string if field.value_type == 1: # TODO: handle bytes in a way that doesn't cause problems with JSON # value = field.value_bytes continue elif field.value_type == 2: value = field.value_integer elif field.value_type == 3: value = field.value_double elif field.value_type == 4: value = field.value_bool result[name] = value[0] if len(value) else "" yield result except ssl.SSLError: pass # https://github.com/boto/boto/issues/2830 sync_records = sync.records(sc, decode=sync_decoder, summaries=sync_summaries)
# What do the records look like? # Example heka message: #Timestamp: 2016-10-28 15:11:45.98653696 -0300 ADT #Type: mozsvc.metrics #Hostname: ip-172-31-39-11 #Pid: 11383 #UUID: 155866c8-cc58-4048-a58c-6226c620fc57 #Logger: Sync-1_5 #Payload: #EnvVersion: 1 #Severity: 7 #Fields: [name:"remoteAddressChain" representation:"" value_string:"" value_string:"" # name:"path" value_string:"https://host/ver/somenum/storage/tabs" # name:"fxa_uid" value_string:"some_id" # name:"user_agent_version" value_type:DOUBLE value_double:49 # name:"user_agent_os" value_string:"Windows 7" # name:"device_id" value_string:"some_device_id" # name:"method" value_string:"POST" # name:"user_agent_browser" value_string:"Firefox" # name:"name" value_string:"mozsvc.metrics" # name:"request_time" value_type:DOUBLE value_double:0.003030061721801758 # name:"code" value_type:DOUBLE value_double:200 # ] # Example record: #sync_records.first() # {u'code': 200.0, # u'device_id': u'some_device_id', # u'fxa_uid': u'some_id', # 'meta': {'Hostname': u'ip-172-31-39-11', # 'Timestamp': 1477678305976742912L, # 'Type': u'mozsvc.metrics'}, # u'method': u'GET', # u'name': u'mozsvc.metrics', # u'path': u'https://host/ver/somenum/storage/crypto/keys', # u'remoteAddressChain': u'', # u'request_time': 0.017612934112548828, # u'syncstorage.storage.sql.db.execute': 0.014925241470336914, # u'syncstorage.storage.sql.pool.get': 5.221366882324219e-05, # u'user_agent_browser': u'Firefox', # u'user_agent_os': u'Windows 7', # u'user_agent_version': 49.0}
# Convert data. Code ported from https://github.com/dannycoates/smt import re import hashlib import math from pyspark.sql import Row def sha_prefix(v): h = hashlib.sha256() h.update(v) return h.hexdigest()[0:32] path_uid = re.compile("(\d+)\/storage\/") path_bucket = re.compile("\d+\/storage\/(\w+)") def getUid(path): if path is None: return None match = re.search(path_uid, path) if match is not None: uid = match.group(1) return sha_prefix(uid) return None def deriveDeviceId(uid, agent): if uid is None: return None return sha_prefix("{}{}".format(uid, agent)) SyncRow = Row("uid", "s_uid", "dev", "s_dev", "ts", "method", "code", "bucket", "t", "ua_browser", "ua_version", "ua_os", "host") def convert(msg): bmatch = re.search(path_bucket, msg.get("path", "")) if bmatch is None: return None bucket = bmatch.group(1) uid = msg.get("fxa_uid") synth_uid = getUid(msg.get("path")) dev = msg.get("device_id") synth_dev = deriveDeviceId(synth_uid, "{}{}{}".format( msg.get("user_agent_browser", ""), msg.get("user_agent_version", ""), msg.get("user_agent_os", "")) ) code = 200 # support modern mozlog's use of errno for http status errno = msg.get("errno") if errno is not None: if errno == 0: # success code = 200 else: code = errno else: code = msg.get("code") if code is not None: code = int(code) t = msg.get("t", 0) if t == 0: t = math.floor(msg.get("request_time", 0) * 1000) if t is None: t = 0 converted = SyncRow( (uid or synth_uid), synth_uid, (dev or synth_dev), synth_dev, msg.get("meta").get("Timestamp"), msg.get("method"), code, bucket, t, msg.get("user_agent_browser"), msg.get("user_agent_version"), msg.get("user_agent_os"), msg.get("meta").get("Hostname"), ) return converted
converted = sync_records.map(lambda x: convert(x))
converted = converted.filter(lambda x: x is not None)
from pyspark.sql import SQLContext sync_df = sqlContext.createDataFrame(converted) sync_df.printSchema()
root |-- uid: string (nullable = true) |-- s_uid: string (nullable = true) |-- dev: string (nullable = true) |-- s_dev: string (nullable = true) |-- ts: long (nullable = true) |-- method: string (nullable = true) |-- code: long (nullable = true) |-- bucket: string (nullable = true) |-- t: double (nullable = true) |-- ua_browser: string (nullable = true) |-- ua_version: double (nullable = true) |-- ua_os: string (nullable = true) |-- host: string (nullable = true)
# Determine if we need to repartition. # A record is something like 112 bytes, so figure out how many partitions # we need to end up with reasonably-sized files. records_per_partition = 2500000 total_records = sync_df.count() print "Found {} sync records".format(total_records)
import math num_partitions = int(math.ceil(float(total_records) / records_per_partition)) if num_partitions != sync_df.rdd.getNumPartitions(): print "Repartitioning with {} partitions".format(num_partitions) sync_df = sync_df.repartition(num_partitions) # Store data sync_log_s3path = "{}/sync_log/v1/day={}".format(dest_s3_prefix, target_day) sync_df.write.parquet(sync_log_s3path, mode="overwrite")
# Transform, compute and store rollups sync_df.registerTempTable("sync") sql_transform = ''' select uid, dev, ts, t, case when substring(ua_os,0,7) in ('iPad', 'iPod', 'iPhone') then 'ios' when substring(ua_os,0,7) = 'Android' then 'android' when substring(ua_os,0,7) = 'Windows' then 'windows' when substring(ua_os,0,7) = 'Macinto' then 'mac' when substring(ua_os,0,7) = 'Linux' then 'linux' when ua_os is null then 'unknown' else 'other' end as ua_os, ua_browser, ua_version, case method when 'POST' then 1 end as posts, case method when 'GET' then 1 end as gets, case method when 'PUT' then 1 end as puts, case method when 'DELETE' then 1 end as dels, case when code < 300 then 1 end as aoks, case when code > 399 and code < 500 then 1 end as oops, case when code > 499 and code < 999 then 1 end as fups, case when bucket = 'clients' and method = 'GET' then 1 end as r_clients, case when bucket = 'crypto' and method = 'GET' then 1 end as r_crypto, case when bucket = 'forms' and method = 'GET' then 1 end as r_forms, case when bucket = 'history' and method = 'GET' then 1 end as r_history, case when bucket = 'keys' and method = 'GET' then 1 end as r_keys, case when bucket = 'meta' and method = 'GET' then 1 end as r_meta, case when bucket = 'bookmarks' and method = 'GET' then 1 end as r_bookmarks, case when bucket = 'prefs' and method = 'GET' then 1 end as r_prefs, case when bucket = 'tabs' and method = 'GET' then 1 end as r_tabs, case when bucket = 'passwords' and method = 'GET' then 1 end as r_passwords, case when bucket = 'addons' and method = 'GET' then 1 end as r_addons, case when bucket = 'clients' and method = 'POST' then 1 end as w_clients, case when bucket = 'crypto' and method = 'POST' then 1 end as w_crypto, case when bucket = 'forms' and method = 'POST' then 1 end as w_forms, case when bucket = 'history' and method = 'POST' then 1 end as w_history, case when bucket = 'keys' and method = 'POST' then 1 end as w_keys, case when bucket = 'meta' and method = 'POST' then 1 end as w_meta, case when bucket = 'bookmarks' and method = 'POST' then 1 end as w_bookmarks, case when bucket = 'prefs' and method = 'POST' then 1 end as w_prefs, case when bucket = 'tabs' and method = 'POST' then 1 end as w_tabs, case when bucket = 'passwords' and method = 'POST' then 1 end as w_passwords, case when bucket = 'addons' and method = 'POST' then 1 end as w_addons from sync ''' transformed = sqlContext.sql(sql_transform)
transformed.registerTempTable("tx") sql_device_activity = ''' select uid, dev, max(ua_os) as ua_os, max(ua_browser) as ua_browser, max(ua_version) as ua_version, min(t) as min_t, max(t) as max_t, sum(posts) as posts, sum(gets) as gets, sum(puts) as puts, sum(dels) as dels, sum(aoks) as aoks, sum(oops) as oops, sum(fups) as fups, sum(r_clients) as r_clients, sum(r_crypto) as r_crypto, sum(r_forms) as r_forms, sum(r_history) as r_history, sum(r_keys) as r_keys, sum(r_meta) as r_meta, sum(r_bookmarks) as r_bookmarks, sum(r_prefs) as r_prefs, sum(r_tabs) as r_tabs, sum(r_passwords) as r_passwords, sum(r_addons) as r_addons, sum(w_clients) as w_clients, sum(w_crypto) as w_crypto, sum(w_forms) as w_forms, sum(w_history) as w_history, sum(w_keys) as w_keys, sum(w_meta) as w_meta, sum(w_bookmarks) as w_bookmarks, sum(w_prefs) as w_prefs, sum(w_tabs) as w_tabs, sum(w_passwords) as w_passwords, sum(w_addons) as w_addons from tx group by uid, dev ''' rolled_up = sqlContext.sql(sql_device_activity)
# Store device activity rollups sync_log_device_activity_s3base = "{}/sync_log_device_activity/v1".format(dest_s3_prefix) sync_log_device_activity_s3path = "{}/day={}".format(sync_log_device_activity_s3base, target_day) # TODO: Do we need to repartition? rolled_up.repartition(5).write.parquet(sync_log_device_activity_s3path, mode="overwrite")
def compute_device_counts(device_activity, target_day): device_activity.registerTempTable("device_activity") df = "%Y%m%d" last_week_date = dt.strptime(target_day, df) - timedelta(7) last_week = dt.strftime(last_week_date, df) sql_device_counts = """ select uid, count(distinct dev) as devs from (select uid, dev from device_activity where uid in (select distinct(uid) from device_activity where day = '{}') and day > '{}' and day <= '{}') group by uid """.format(target_day, last_week, target_day) return sqlContext.sql(sql_device_counts)
# Compute and store device counts # Re-read device activity data from S3 so we can look at historic info device_activity = sqlContext.read.parquet(sync_log_device_activity_s3base) device_counts = compute_device_counts(device_activity, target_day) sync_log_device_counts_s3path = "{}/sync_log_device_counts/v1/day={}".format(dest_s3_prefix, target_day) device_counts.repartition(1).write.parquet(sync_log_device_counts_s3path, mode="overwrite")