Маршрут перемещения потока данных: загрузка в реляционную БД

Использование языка программирования R вместе со свободной реляционной системой управления базами данных PostgresSQL может значительно ускорить и упростить процесс загрузки данных в БД.

d6dc841f117a41ebb4ecc040f6aaafca.jpeg

Структурирование файлов


Перед началом загрузки данных в PostgreSQL, следует рассортировать файлы по типу в разные директории. R делает операции достаточно простыми на уровне ОС:

#### 1. Setting directory to FTP folder where files incoming from Adobe
## Has ~2000 files in it from 2 years of data
setwd("~/Downloads/datafeed/")

#### 2. Sort files into three separate folders
## Manifests - plain text files
if(!dir.exists("manifest")){
  dir.create("manifest")
  lapply(list.files(pattern = "*.txt"), function(x) file.rename(x, paste("manifest", x, sep = "/")) 
}

## Server calls tsv.gz
if(!dir.exists("servercalls")){
  dir.create("servercalls")
  lapply(list.files(pattern = "*.tsv.gz"), function(x) file.rename(x, paste("servercalls", x, sep = "/"))) 
}

## Lookup files .tar.gz
if(!dir.exists("lookup")){
  dir.create("lookup")
  lapply(list.files(pattern = "*.tar.gz"), function(x) file.rename(x, paste("lookup", x, sep = "/"))) 
}

Возможно, при большем количестве разнотипных файлов их лучше было бы упростить через функцию, вместо того, чтобы писать код для каждого типа. Но идея от этого не меняется — следует проверить наличие директории и в случае отсутствия, создать ее. После этого нужно переместить файлы в созданную директорию.

Соединение и загрузка данных в PostgreSQL через R


Как только файлы структурированы, можно начинать загружать их в PostgreSQL, используя RPostgreSQL пакет R. RPostgreSQL DBI-совместимый, поэтому строка подключения будет одинаковой для любого типа ядра базы данных; при загрузке части данных в БД практически гарантированно потребуется текстовый формат (используется colClasses = аргумент «character» в R). Такая необходимость возникает из-за постоянных изменений и модификаций Adobe Analytics (пакет решений корпоративного уровня для сбора статистики сайтов и работы с ней); текстовый формат столбца не допускает потери данных.

library(RPostgreSQL)

# Connect to database
conn = dbConnect(dbDriver("PostgreSQL"), 
                 user="postgres", 
                 password="", 
                 host="localhost", 
                 port=5432, 
                 dbname="adobe")

#Set directory to avoid having to use paste to build urls
setwd("~/Downloads/datafeed/servercalls")

#Set column headers for server calls
column_headers <- read.delim("~/Downloads/datafeed/lookup/column_headers.tsv", stringsAsFactors=FALSE)

#Loop over entire list of files
#Setting colClasses to character only way to guarantee all data loads
#File formats or implementations can change over time; fix schema in database after data loaded
for(file in list.files()){
  print(file)
  df <- read.csv2(file, sep = "\t", header = FALSE, stringsAsFactors = FALSE, colClasses = "character")
  dbWriteTable(conn, name = 'servercalls', value = df, row.names=FALSE, append = TRUE)
  rm(df)
}

#Run analyze in PostgreSQL so that query planner has accurate information
dbGetQuery(conn, "analyze servercalls")

При помощи небольшого количества кода была сгенерирована определенная структура таблицы, загрузились данные и PostgreSQL начал проводить анализ и сбор статистики для эффективных поисковых запросов.

Загрузка таблиц поиска в PostgreSQ


Вместе с данными вызова сервера, что загружены в БД, необходимо загрузить и таблицы поиска. Они сохраняют постоянный формат и RPostgreSQL получает корректные типы столбцов.

library(RPostgreSQL)

# Connect to database
conn = dbConnect(dbDriver("PostgreSQL"), 
                 user="postgres", 
                 password="", 
                 host="localhost", 
                 port=5432, 
                 dbname="adobe")

setwd("~/Downloads/datafeed/lookup/")

#Create function due to repetitiveness
#Since we're loading lookup tables with mostly same values each time, put source file in table
loadlookup <- function(tblname){ 
  df <- read.csv2(paste(tblname,".tsv", sep=""), sep = "\t", header = FALSE, stringsAsFactors = FALSE)
  df$file <- file
  dbWriteTable(conn, name = tblname, value = df, row.names=FALSE, append = TRUE)
}

#untar files, place in directory by day
for(file in list.files(pattern = "*.tar.gz")){
  print(file)
  untar(file)
  
  for(tbl in c("browser_type", "browser", "color_depth", "column_headers", 
               "connection_type", "country", "event", "javascript_version",
               "languages", "operating_systems", "plugins", "referrer_type", 
               "resolution", "search_engines")){
    
    loadlookup(tbl)
    
  }
}

© Habrahabr.ru