Apache Sqoop – Uma abordagem prática para a unificação de dados

32
4

Este artigo aborda a ferramenta Apache Sqoop, que desde 2012 é um dos projetos top-level da Apache Software Foundation. O Apache Sqoop – abreviação de “SQL para Hadoop” – tem como objetivo executar a transferência eficiente e bidirecional de dados entre o Hadoop e diversos serviços de armazenamento externo de dados estruturados.

O conteúdo explica em detalhes o processo de importação de dados de um banco relacional para o HDFS, ilustrando como o Sqoop faz esse processo internamente, além de exemplificar como fazer a sincronização das bases por meio da importação incremental de dados.

Também é descrito o processo de importação de dados para o Hive e, por fim, como o Sqoop realiza a exportação dos dados do Hadoop para um banco relacional. O Apache Sqoop é uma excelente ferramenta e deve ser levada em consideração em qualquer projeto de Big Data, uma vez que facilita a integração e utilização dos dados armazenados em bases relacionais ou mainframes no seu ecossistema Hadoop.

Introdução

Apache Hadoop é um framework projetado para o processamento massivo de dados de diversos tipos (estruturados, semiestruturados e não estruturados), no qual esses são armazenados no HDFS (Hadoop Distributed File System) e manipulados por tarefas MapReduce. As tarefas MapReduce são responsáveis por fazer o processamento, a combinação entre dados e, por fim, a produção de algum resultado a partir desse processamento.

Apesar de o Hadoop ser amplamente conhecido por sua capacidade de armazenamento e processamento de grandes quantidades de dados, muita informação ainda hoje está armazenada em bancos de dados relacionais. Assim, surgiu o Apache Sqoop cujo objetivo é executar a transferência eficiente e bidirecional de dados entre o Hadoop e diversos serviços de armazenamento externo de dados estruturados.

O Apache Sqoop pode ser útil (i) quando se deseja utilizar ferramentas de Big Data para o processamento de bases relacionais, (ii) para a integração de bases relacionais e mainframes com dados já presentes no Hadoop ou, ainda, (iii) para o arquivamento dos dados no Hadoop.

O Sqoop atualmente possui duas frentes de desenvolvimento. Uma delas, chamada de Sqoop 1, é a evolução natural do projeto inicial, encontra-se na versão 1.4.6 e foi a versão utilizada nesse artigo por ser a última versão estável para produção. O Sqoop 2 é um projeto totalmente novo e implementa o conhecido modelo de cliente e servidor, no qual o servidor é responsável por gerenciar o envio dos dados do banco relacional para o Hadoop e vice-versa.

Outras melhorias implementadas no Sqoop 2 são a clara definição das interfaces dos conectores, clientes mais simples de serem utilizados, uma API que permite a execução de funções do Sqoop a partir de código em Java, entre outras. Ambos os projetos podem ser obtidos por meio de sua página oficial.

Neste artigo veremos como realizar o processo de importação de um banco relacional para o HDFS. Esse processo será apoiado pelo Sqoop. Também veremos como realizar a importação para o Hive e como o Sqoop apoia a exportação de dados do Hadoop para um banco relacional.

Todos os exemplos de comandos Sqoop presentes neste artigo são reproduções ou readaptações – autorizadas pela O’Reilly Media – dos exemplos presentes no livro Apache Sqoop Cookbook Unlocking Hadoop for Your Relational Database, de 2013, escrito por Kathleen Ting e Jarek Jarcec Cecho.

Hadoop e Hive

Apache Hadoop surgiu na separação da parte de computação distribuída da ferramenta chamada Apache Nutch – um famoso web crawler – criado por Mike Cafarella e Doug Cutting, criador também do Lucene e Hadoop. O Hadoop é uma plataforma escalável e confiável para armazenamento e análise de grande volumes de dados. Ele foi projetado para ser executado em clusters compostos por computadores convencionais onde falhas de disco rígido, memória, comunicação, dentre outras são acontecimentos comuns.

Com esta premissa em mente, o sistema de arquivos Distribuídos Hadoop (HDFS ou, em inglês, Hadoop Distributed File System) armazena grande volume de dados de forma confiável – ou seja, sem perdas de informações – por meio da criação cópias destes dados em mais de um nó (computador) do cluster.

Além do HDFS, uma outra parte importante do Hadoop se chama MapReduce, um framework que abstrai a complexidade do processamento paralelo e distribuído. MapReduce são duas tarefas distintas – Map e Reduce –, em que a primeira processa os dados de entrada de forma paralela. O resultado deste processamento é então organizado e ordenado pelo framework para que seja enviado para as tarefas Reduce, que combinam os dados a fim de obter o resultado final.

Tipicamente, as tarefas de mapeamento são executadas no mesmo nó do cluster onde estão os dados a serem processados, reduzindo assim o tráfego de rede. Escrever tarefas MapReduce não é uma tarefa trivial. Contudo, uma vez feito, processar os dados de forma paralela e distribuída em centenas de nós do cluster se torna uma tarefa simples.

Uma das facilidades oferecidas pelo Sqoop é a capacidade de importação dos dados direto para o Hive. Hive é um dos projetos mais famosos do ecossistema Hadoop e foi iniciado pelo Facebook em 2007, mas atualmente é um projeto top-level da Apache Software Foundation, assim como o Sqoop, e recebe contribuição de outras grandes empresas.

O data warehouse do Facebook havia sido construído inicialmente com soluções comerciais de bancos de dados relacionais. Contudo, devido à necessidade de processamento e armazenamento de grandes volumes de dados exigida pelo Facebook, foi decidido utilizar o Hadoop, por permitir aumentar a capacidade de armazenamento com baixo custo e sem tempo de inatividade e por possuir um framework de processamento distribuído que reduziu consideravelmente o tempo de execução das consultas.

No entanto, criar tarefas MapReduce não era fácil e consumia muito tempo dos seus funcionários. Assim, eles decidiram criar o Hive com os conceitos já familiares dos bancos de dados.

Hive é um sistema de data warehouse escalável e tolerante a falhas, projetado para trabalhar com um grande volume de dados que crescem rapidamente. Ele é um sistema para processamento em batch e pode fazer consultas sobre um volume de dados na ordem de petabytes. O Hive fornece uma linguagem de consulta simples chamada HiveQL – baseado em SQL –, que permite aos usuários familiarizados com o SQL facilmente realizar consultas, sumarizações e análise de dados.

O Hive não foi projetado para ser um sistema de processamento de transações em tempo real. Ele foi construído sobre a infraestrutura do Hadoop o que lhe garante vários benefícios como escalabilidade, tolerância a falhas e excelente taxa de transferência. Contudo, o framework MapReduce costuma ter alta latência, causando lentidão no agendamento e execução de tarefas. Assim, o tempo para realizar consultas no Hive sobre um pequeno conjunto de dados é de segundos ou até minutos para terminar.

Bancos de dados suportados

Por padrão, o Sqoop utiliza JDBC (Java Database Connectivity) para se conectar aos bancos e, por esse motivo, acredita-se que ele é compatível com uma grande quantidade de bancos de dados, uma vez que os fornecedores implementam essa API. No entanto, o Sqoop não garante a compatibilidade e a performance com todos os bancos que possuem conectores JDBC devido às formas de implementação dessa API e a ligeiras diferenças que possam existir na sintaxe SQL de cada banco.

A Tabela 1 contém uma relação completa dos bancos e suas respectivas versões que foram testadas com o Sqoop. Pode ser necessário instalar os drivers de conexão mesmo com o banco de dados estando presente na lista.

Banco de dados Versão String de conexão
HSQLDB 1.8.0+ jdbc:hsqldb:*//
MySQL 5.0+ jdbc:mysql://
Oracle 10.2.0+ jdbc:oracle:*//
PostgreSQL 8.3+ jdbc:postgresql://
CUBRID 9.2+ jdbc:cubrid:*
Tabela 1. Lista de bancos de dados testados no Sqoop. Fonte: Sqoop User Guide (v1.4.6). Disponível em <http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html>. 21 de dezembro de 2015. Pode ser necessário instalar os drivers de conexão mesmo o banco de dados estando presente na lista.

Importação de dados

O Sqoop realiza a importação dos dados em arquivos de texto (–as-textfile), arquivos Sequence File (–as-sequencefile), Avro (–as-avrodatafile) ou Parquet (–as-parquetfile). Na importação para arquivos textuais, cada registro do banco se tornará uma linha no arquivo de destino onde cada coluna é delimitada por padrão pela vírgula ou outro separador definido pelo usuário por meio do argumento –fields-terminated-by. A Tabela 2 contém uma relação de outros argumentos que podem ser utilizados na importação para arquivos texto.

Argumentos Descrição
–enclosed-by <char> Define o caractere que iniciará e terminará cada campo.
–escaped-by <char> Define o caractere de escape.
–fields-terminated-by <char> Define o caractere separador dos campos.
–lines-terminated-by <char> Define o caractere para o fim de linha.
–mysql-delimiters Utiliza os delimitadores MySQL padrão. Barra invertida mais o caractere n ( n ) para linha, barra invertida ( ) como caractere de escape e aspas simples ( ‘ ) como caractere opcional para início e fim de cada campo.
–optionally-enclosed-by <char> Define o caractere opcional que iniciará e terminará cada campo. O caractere opcional será utilizado apenas quando o caractere delimitador aparecer no dado importado.
Tabela 2. Argumentos que podem ser utilizados na importação dos dados para arquivos texto.  Fonte: Sqoop User Guide (v1.4.6). Disponível em <http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html>. 21 de dezembro de 2015.

É importante observar os caracteres especiais que podem estar inseridos no conteúdo das tabelas. Por exemplo, o texto Apache Sqoop, uma abordagem prática para a unificação de dados pode causar problemas na importação caso o delimitador padrão (a vírgula) seja utilizado, porque o caractere vírgula faz parte do texto. O formato de texto delimitado é ideal quando não estão sendo importados dados binários. Além disso, esse tipo de formato é legível por humanos e fácil de manipular.

Para ilustrar a simplicidade do Sqoop, o exemplo ilustrado na Listagem 1 detalha o comando Sqoop utilizado na importação de uma tabela MySQL chamada Cidades, representada pela Tabela 3.

id país cidade
1 Estados Unidos Palo Alto
2 República Tcheca Brno
3 Brasil Belo Horizonte
Tabela 3. Tabela MySQL chamada Cidades

O comando Sqoop da Listagem 1 especifica – por meio do comando import – que o Sqoop realize a operação de importação de dados. Na linha seguinte, o argumento –connect recebe como parâmetro o endereço do servidor seguido pelo nome do banco de dados e o número da porta, quando necessário. Em seguida, é informado ao Sqoop o nome de usuário e senha para acesso ao banco de dados e, por último, o nome da tabela a ser importada.

Listagem 1. Comando Sqoop exemplificando a utilização do argumento –table para a importação completa da tabela cidades.
sqoop import

–connect jdbc:mysql://mysql.example.com/DBTeste:Porta

–username eduardo

–password 123456

–table cidades

A seguir, temos o arquivo CSV (Comma-separated values), resultado da importação da tabela MySQL Cidades:

1,Estados Unidos,Palo Alto

2,República Tcheca,Brno

3,Brasil,Belo Horizonte

Por padrão, o Sqoop importará os dados para o HDFS em um diretório de mesmo nome da tabela de origem. Esse diretório de destino pode ser alterado por meio do argumento –warehouse-dir, tendo como parâmetro um diretório qualquer.

Dentro do diretório especificado serão criados outros diretórios com o nome da tabela de origem contendo os arquivos com os respectivos dados importados. Uma outra opção é o argumento –target-dir seguido do caminho completo para o diretório de destino desejado, como apresentado no exemplo da Listagem 2.

Listagem 2.  Comando Sqoop ilustrando a utilização do argumento –target-dir que define o diretório de destino desejado.
sqoop import

–connect jdbc:mysql://mysql.example.com/DBTeste

–username eduardo

–password 123456

–table cidades

–target-dir /etl/input/cidades

No exemplo, a senha é passada por meio do argumento –password. Apesar de essa ser uma forma muito simples de autenticação com o banco relacional, ela é também a maneira menos segura. Outra opção seria por meio do argumento –password-file, que recebe como parâmetro um arquivo que contém a senha. Pode-se ainda restringir as permissões de acesso a esse arquivo concedendo restrições de leitura apenas para o proprietário do arquivo (chmod 400).

Uma terceira opção seria por meio do argumento -P, que solicitará a senha mediante o console a cada execução do Sqoop. Por último, no Hadoop 2.6.0 foi implementada a API Credential Provider, que se propõe a resolver o problema de armazenamento de senhas em arquivos de texto, arquivos de configuração ou inseridos em código. A API Credential Provider se responsabiliza por fazer o armazenamento e recuperação das senhas, em que para cada senha existe um apelido que pode ser utilizado em substituição. Esse apelido pode então ser utilizado nos arquivos de configuração, o que protege a senha original de ser descoberta.

O Sqoop permite a importação de toda uma tabela (conforme ilustrado anteriormente pela Listagem 1) ou de algumas colunas específicas, ordenadas a critério do usuário. Para a importação de algumas colunas específicas, basta utilizar o argumento –columns separando o nome das colunas por vírgula – por exemplo, –columns “id, cidade, pais”. Além disso, o Sqoop permite restringir as linhas a serem importadas através do argumento –where, como ilustrado na Listagem 3.

Listagem 3. Comando Sqoop exemplificando a utilização dos argumentos –columns–whereonde serão importados apenas registros com id maiores que 100.
sqoop import 

connect jdbc:mysql://mysql.example.com/BDTeste 

username eduardo 

password 123456 

table cidades 

columns “id,cidade,pais” 

where “id > 100” 

Opcionalmente, o usuário pode substituir os três argumentos –table, –columns e –where pelo argumento –query, onde se deve definir toda a consulta SQL. O exemplo da Listagem 4 importará exatamente os mesmos dados do exemplo da Listagem 3.

Listagem 4. Comando Sqoop exemplificando a utilização do argumento –query em substituição aos argumentos –table, –columns e –where.
sqoop import

–connect jdbc:mysql://mysql.example.com/BDTeste

–username eduardo

–password 123456

–query “SELECT id, cidade, pais FROM cidades WHERE $CONDITIONS AND id > 100”

–target-dir /etl/input/cidades

–split-by id

No caso de tabelas que possuem colunas do tipo binário, o Sqoop realiza a importação delas de duas formas. Quando esse tipo de coluna possui tamanho de até 16 MB, ou algum valor distinto, mas definido pelo argumento –inline-lob-limit <número de bytes>, então esse campo é inserido na mesma linha, junto às demais colunas desse registro. Porém, se o campo ultrapassar o valor padrão de 16 MB ou o valor definido pelo argumento –inline-lob-limit, o campo será armazenado em um novo arquivo  de até 263 bytes cada  em uma subpasta _lobs no diretório de destino.

Processo de importação

O processo de importação dos dados é dividido em duas etapas principais. Na primeira delas, o Sqoop faz a leitura dos metadados da tabela de origem (nome das colunas, tipos dos dados etc.) para que sejam convertidos em um tipo de dados Java durante a criação de uma classe que encapsulará um registro dessa tabela. Essa classe, além de possuir os atributos que representam as colunas da tabela, também possui métodos para a serialização e desserialização desses dados. Ela tambémé um subproduto do processo de importação e pode ser utilizada por qualquer outra tarefa de MapReduce.

Por padrão, a classe possui o nome da tabela de origem e é salva no diretório corrente de onde o Sqoop está sendo chamado. Após a compilação dessa classe, o código compilado é salvo na pasta /tmp. Tanto o nome da classe quanto os diretórios de destino da classe gerada e da classe compilada podem ser alterados por meio dos seguinte argumentos: –class-name–outdir–bindir, respectivamente.

Após a extração dos metadados e criação da classe, o Sqoop dá início ao processo de importação dos dados, que nada mais é que uma simples consulta SQL. Veja o código a seguir:

SELECT coluna1, coluna2, coluna3, … FROM tabela

O Sqoop, por padrão, utiliza o nível de isolamento de transação chamado read committed, que assegura a não ocorrência de leituras sujas – ou seja, que uma transação TA leia dados modificados mas ainda não confirmados por uma transação TB.

No entanto, enquanto o Sqoop faz a leitura dos registros, os dados podem ser modificados e confirmados por outras transações. É importante ter ciência deste fato no processo de importação dos dados. Caso isso seja um obstáculo para o projeto, uma opção é não fazer modificações no banco até que todo o processo de importação seja finalizado.

Uma das grandes vantagens do Sqoop é a capacidade de paralelizar o processo de importação, distribuindo a consulta SQL por meio dos nós do seu cluster, conforme exemplificado na Figura 1. Para isso, ele precisa saber qual coluna da tabela a ser importada deve ser utilizada para dividir os dados de origem e quantos processos deverão ser utilizados em paralelo.

O mais adequado é a utilização de uma coluna que possua valores uniformemente distribuídos e que possa ser passada para o Sqoop por meio do argumento –split-by. Quando não informada a coluna, o Sqoop tenta escolher por si só; e por padrão ele escolhe geralmente a chave primária da tabela.

Figura 1

Figura 1. Exemplo do processo de importação. A figura exemplifica o processo de importação em que o Sqoop faz inicialmente a leitura dos metadados da tabela de origem, que serão utilizados na criação de uma classe Java usada na serialização e desserialização dos dados, para em seguida paralelizar em quatro tarefas MapReduce a consulta na tabela e transferir os dados para o HDFS.

Outro parâmetro importante é o número de tarefas de mapeamento utilizado no paralelismo, que pode ser parametrizado por meio do argumento -m ou –num-mappers. Por padrão, o Sqoop paraleliza o processo de importação dos dados em quatro tarefas. Em seguida, ele obtém o maior e menor valor da coluna escolhida, para então fazer uma divisão simples, conforme apresenta a Figura 2.

Figura 2

Figura 2. Fórmula utilizada.

Considere uma coluna id hipotética em que o menor valor é 0, o maior valor é 1000 e a importação dos dados será paralelizada em quatro tarefas, assim como a Figura 1. Desse modo, o Sqoop dividirá em partes iguais ((1000 – 0 ÷ 4) = 250) a coluna especificada, dando para cada tarefa de mapeamento uma consulta SQL com um intervalo diferente, como exemplificado na Listagem 5.

Listagem 5. Exemplo de divisão de uma consulta SQL.
SELECT * FROM Tabela WHERE (id>= 0) AND (id < 250)

SELECT * FROM Tabela WHERE (id>= 250) AND (id < 500)

SELECT * FROM Tabela WHERE (id>= 500) AND (id < 750)

SELECT * FROM Tabela WHERE (id>= 750) AND (id <= 1000)

Se a coluna utilizada na divisão não for uniformemente distribuída, isso pode prejudicar a performance do processo de importação, visto que algumas tarefas podem ficar sobrecarregadas e outras com pouco ou nenhum trabalho. Suponha que a coluna id do exemplo anterior possua apenas um valor 0 e todos os demais valores estão entre 500 e 1000.

Dessa forma, uma das tarefas não terá nenhum dado para importar, já que não existem registros no intervalo entre 250 e 500. Uma segunda tarefa terá apenas um registro para importar, pois existe apenas um registro (id = 0) entre o intervalo 0 e 250. As duas tarefas restantes dividirão todos os demais registros.

Para o MySQL, PostgreSQL, Netezza e Oracle, o Sqoop oferece o argumento –direct, que utiliza ferramentas específicas desses bancos e que oferecem melhor performance, comparados ao conector JDBC. Por exemplo, o MySQL possui as ferramentas mysqldump para realização de backup lógico (em arquivo) e mysqlimport, que faz o processo inverso – leitura dos dados a partir de um arquivo.

Entretanto, o argumento –direct possui algumas limitações, como exportar os dados apenas para arquivos texto (–as-textfile), não realizar importação de views e não tratar colunas do tipo BLOB, CLOB e LONGVARBINARY.

Importação incremental

Em um trabalho de análise de dados, é importante manter toda a informação que será utilizada sempre atualizada com relação à sua base relacional. E, para isso, o Sqoop oferece o argumento –incremental para importação apenas dos novos dados presentes na tabela. Esse argumento possui dois modos (append e lastmodified), que permitem ao Sqoop saber qual tipo de importação incremental deverá ser utilizada.

O modo append é adequado quando os seus dados de origem não sofrem atualização (UPDATE) e apenas novos dados são inseridos. Em conjunto com o argumento e parâmetro –incremental append, deve-se informar os argumentos –check-column e –last-value, em que o primeiro define qual coluna será utilizada no filtro e o último informa para essa coluna específica qual é o último valor já importado.

Ou seja, os registros que possuírem, para a coluna especificada, valores maiores que o informado por meio do argumento –last-value são novos registros e devem ser importados. No modo append, a coluna escolhida deve ser incremental para que não haja perda de valores na importação dos dados. O exemplo da Listagem 6 realiza a importação apenas dos registros que possuírem id maior que 3.

Listagem 6. Comando Sqoop exemplificando a importação incremental no modo append apenas dos registros que possuírem id maior que 3.
sqoop import

–connect jdbc:mysql://mysql.example.com/BDTeste

–username eduardo

–password 123456

–table cidades

–incremental append

–check-column id

–last-value 3

O modo lastmodified é apropriado para os casos em que os dados de origem sofrem atualização. Entretanto, a sua tabela deve (i) possuir uma coluna do tipo data (DATE, TIME, DATETIME ou TIMESTAMP), que é atualizada sempre que um novo registro é adicionado ou um dado existente é modificado; (ii) possuir uma coluna composta por valores únicos, como a chave primária.

Atendendo a esses requisitos, os mesmos argumentos –check-column–last-value devem ser utilizados, em que o argumento –check-column recebe como parâmetro o nome da coluna do tipo data, enquanto o argumento –last-value receberá o último valor da coluna tipo data que já foi importado. Além disso, deve-se utilizar o argumento –merge-key, que recebe como parâmetro a coluna de valores únicos para serem utilizados na junção dos dados.

No modo lastmodified, o Sqoop faz a importação para um diretório temporário apenas dos novos registros – isto é, registros que, baseados na coluna do tipo data definida em –check-column, sejam mais recentes que o valor informado por meio do atributo –last-value. Para finalizar o processo de importação, o Sqoop fará a junção dos dados previamente importados (Dprev) com os novos dados que estão sendo considerados (D∆).

Se o registro presente em D∆ também existir em Dprev, então trata-se de um caso de atualização (UPDATE) desse registro na base de origem, e o registro presente em Dprev será substituído pelo registro presente em D∆ utilizando a coluna com valores únicos definida por meio do argumento merge-key. Caso o registro em D∆ não exista em Dprev, então trata-se de um novo registro na base de origem e ele será apenas inserido no conjunto de dados de destino Dprev. A Listagem 7 ilustra a importação incremental no modo lastmodified, em que serão importados apenas os registros que foram inseridos ou atualizados após 02 de Fevereiro de 1987.

Listagem 7. Comando Sqoop exemplificando a importação incremental.
sqoop import

–connect jdbc:mysql://mysql.example.com/BDTeste

username eduardo

password 123456

table visits

incremental lastmodified

check-column last_update_date

last-value “1987-02-02”

merge-key id

É importante notar que a coluna definida pelo argumento check-column não deve ser do tipo CHAR, NCHAR, VARCHAR, VARNCHAR, LONGVARCHAR ou LONGNVARCHAR. Caso contrário, os operadores de comparação maior (>) e menor (<) não poderiam ser utilizados na cláusula WHERE da consulta, que obtém apenas os registros atualizados ou novos.

A Listagem 8 ilustra um trecho da saída após a execução de uma importação incremental, em que o Sqoop imprime no console o último valor importado ao final de cada execução incremental. Isso é útil para futuras execuções da importação incremental, quando o usuário faz o controle manual do último valor importado.

Listagem 8. Exemplo de saída no console após a execução de uma importação incremental.
13/03/18 08:16:36 INFO tool.ImportTool: Incremental import complete! …

13/03/18 08:16:36 INFO tool.ImportTool: –incremental lastmodified

13/03/18 08:16:36 INFO tool.ImportTool: –check-column update_date

13/03/18 08:16:36 INFO tool.ImportTool: –last-value ‘1987-05-22 02:02:02’

O controle do valor do argumento –last-value pode ser feito por meio do Sqoop metastore, serviço que faz a armazenagem, de forma não segura, (i) de senhas, (ii) da definição das tarefas de importação e exportação com seus respectivos argumentos e parâmetros e (iii) também do parâmetro para o argumento –last-value.

Ao final da execução das tarefas de importação incremental que estão armazenadas no Sqoop metastore, o Sqoop automaticamente faz a atualização e armazenagem do último valor importado para que ele seja utilizado na próxima execução dessa tarefa.

Importando dados para o Hive

Na importação direta para o Hive, o Sqoop fará a extração dos metadados da tabela de origem para a criação da tabela equivalente no Hive, caso ela não exista. Na criação da nova tabela, o Sqoop se encarrega de converter os tipos de dados de origem para tipos de dados equivalentes na tabela de destino. Caso o dado de origem precise ser convertido para um tipo de dado menos preciso, uma mensagem de alerta será impressa no console, conforme representado a seguir:

15/12/07 13:22:25 WARN hive.TableDefWriter: Column NomeColuna had to be cast to a less precise type in Hive’

Uma possível solução para esse problema é a utilização do argumento –map-column-hive, que instrui o Sqoop sobre o tipo de dado correto em que a coluna deve ser mapeada. Um exemplo de utilização desse argumento seria: –map-column-hive colunaA=DECIMAL, colunaB=INT, que recebe como parâmetro o nome da coluna no banco de dados de origem, seguido do operador igual (=) e pelo tipo de dado dessa coluna no Hive.

Caso a tabela já exista no Hive, o Sqoop irá apenas adicionar os dados ao final dessa tabela já existente. Uma alternativa seria a utilização do argumento –hive-overwrite, que solicita ao Sqoop a remoção prévia de todos os dados existentes na tabela, como exemplificado na Listagem 9.

Listagem 9. Comando Sqoop exemplificando a utilização do argumento –hive-overwrite.
sqoop import

connect jdbc:mysql://mysql.example.com/DBTeste

username eduardo

password 123456

table cidades

hive-import

hive-table BancoDados.novoNomeTabela

hive-overwrite

hive-drop-import-delims

m 32

O exemplo da Listagem 9 realiza a importação de uma tabela inteira direto para o Hive. Por padrão, o nome da tabela de destino no Hive é o mesmo nome da tabela de origem. No entanto, essa definição pode ser alterada utilizando-se o argumento –hive-table, que define o nome da tabela de destino no Hive.

Alguns caracteres especiais podem estar presentes nos dados de origem, o que pode causar divergências no resultado da importação. Existe um outro argumento, hive-drop-import-delims, que remove os caracteres n, r e 1, que são delimitadores especiais utilizados pelo Hive.

Caso a remoção desses caracteres não seja uma opção, pode-se utilizar o argumento –hive-delims-replacement, que substitui os caracteres especiais por algum outro texto. Essa opção só deve ser utilizada caso o Hive esteja configurado para utilizar os delimitadores especiais padrão. A Listagem 10 ilustra a utilização do argumento –hive-delims-replacement.

Listagem 10. Comando Sqoop exemplificando a utilização do argumento –hive-delims-replacement.
sqoop import

connect jdbc:mysql://mysql.example.com/DBTeste

username eduardo

password 123456

table cidades

hive-import

hive-delims-replacement “ESPECIAL”

A grande maioria dos bancos de dados permite que as colunas possam simplesmente não ter valores (campos NULL). Contudo, por padrão, o Sqoop faz a conversão dos valores nulos para uma string “null”. Isso pode dificultar a manipulação desses dados no Hive, uma vez que ele utiliza N para representar valores nulos.

Para contornar esse problema, o Sqoop oferece dois argumentos, –null-string e –null-non-string, em que o primeiro trata os dados de origem que são do tipo textual (VARCHAR, TEXT, NCHAR, NVARCHAR, NTEXT etc) e possuem valor NULL, substituindo-os pelo parâmetro informado para esse argumento. Já o argumento –null-non-string substitui todos os demais tipos de dados que não possuem valor – ou seja, o campo é NULL –, substituindo-os pelo parâmetro informado para esse argumento. O exemplo da Listagem 11 substitui todo e qualquer campo NULL da tabela de origem por Nvalor que o Hive entende como NULL. A barra invertida () extra no exemplo da Listagem 11 é necessária por ser o caractere de escape em Java e que será interpretada pelo compilador.

Listagem 11. Comando Sqoop exemplificando a utilização dos argumentos –null-string–null-non-string para a substituição de todo e qualquer campo NULL da tabela de origem por N.

sqoop import

connect jdbc:mysql://mysql.example.com/DBTeste

username eduardo

password 123456

table cidades

–null-string ‘N’

–null-non-string ‘N’

Exportando dados do Hadoop

O processo de exportação dos dados é muito similar ao processo de importação. Um diferencial importante, no entanto, é que a tabela no banco de dados relacional (banco de destino) deve estar previamente criada. Isso ocorre porque, apesar de o Sqoop conseguir converter os tipos de dados de origem para os tipos de dados de destino – ou seja, do banco relacional para o HDFS no processo de importação de dados –, o contrário não é possível. Suponha um dado do tipo string: o Sqoop não saberá se esse dado de destino se tornará um CHAR(64) ou um VARCHAR(250) e, por isso, ele deixa essa decisão a critério do usuário.

Assim como a importação de dados, o processo de exportação pode ser feito em paralelo, especificando o número de tarefas ou deixando que o Sqoop decida a quantidade de tarefas. Durante o processo de exportação dos dados, a tabela de destino pode não estar consistente, uma vez que nesse processo algumas tarefas podem acabar antes de outras. Assim, é aconselhável que as aplicações que venham a consumir esses dados esperem pelo término da exportação.

Uma alternativa para isso é exportar os dados para uma tabela temporária, utilizando o argumento staging-table (Listagem 12), que deve possuir exatamente o mesmo esquema da tabela de destino oficial e estar vazia, ou que seja utilizado o argumentoclear-staging-table. Uma vez que a exportação termine com sucesso para a tabela temporária, o Sqoop se encarrega de fazer a transferência dos dados dela para a tabela oficial. Caso ocorra algum problema na transferência dos dados para a tabela temporária, o Sqoop não fará nenhuma mudança na tabela oficial de destino.

Esse processo é interessante, uma vez que somente no caso de sucesso da exportação para a tabela temporária é que a tabela de destino será modificada. Porém, esse processo é mais lento porque existem duas cópias do mesmo dado, do Hadoop para a tabela temporária e da tabela temporária para a tabela oficial. Ele também requer mais espaço no banco de dados de destino, já que ao final da exportação dos dados para a tabela temporária existirão duas tabelas no banco de destino, a temporária e a oficial a ser substituída.

Listagem 12. Comando Sqoop exemplificando a exportação dos dados do Hadoop para o banco DBTeste no MySQL.
sqoop export

connect jdbc:mysql://mysql.example.com/DBTeste

username eduardo

password 123456

table cidades

staging-table staging_cidades

A Listagem 13 exemplifica a utilização do argumento –call, que permite a utilização de stored procedures ao invés da inserção direta na tabela. Stored procedures são um conjunto de cláusulas SQL registradas por meio de um nome no servidor de banco de dados. Elas são vantajosas porque facilitam a manutenção, uma vez que a stored procedure está em um único lugar (no banco de dados) para que os usuários do banco possam utilizá-la. Além disso, a aplicação não precisa ter ciência do esquema do banco de dados ao realizar uma operação; basta apenas chamar a stored procedure desejada, informando os argumentos quando necessário.

Stored procedures podem também melhorar a performance, uma vez que algumas regras de negócio podem ser implementadas direto no banco de dados. Porém, o argumento –call deve ser utilizado com cautela pois, ao fazer a exportação dos dados em paralelo, pode-se gerar múltiplas chamadas simultâneas da stored procedure especificada, podendo eventualmente causar problemas de performance no banco de destino, devido à complexidade da stored procedure e ao alto número de chamadas.

Listagem 13. Comando Sqoop exemplificando a utilização do argumento –call, que permite a utilização de stored procedures ao invés da inserção direta na tabela.
sqoop export

connect jdbc:mysql://mysql.example.com/DBTeste

username eduardo

password 123456

call populate_cities

Conclusão

O Sqoop é uma excelente ferramenta e deve ser considerada em qualquer projeto de Big Data, uma vez que facilita a integração e utilização dos dados armazenados em bases relacionais ou mainframes no seu ecossistema Hadoop. O Sqoop possui outras excelentes funcionalidades que não foram abordadas nesse artigo, mas que merecem atenção.

Agradecimentos

Agradeço à O’Reilly Media por permitir a reutilização dos exemplos presentes no livro Apache Sqoop Cookbook Unlocking Hadoop for Your Relational Database, de Kathleen Ting e Jarek Jarcec Cecho.

Bibliografia

  • Kimball, Aaron. Sqoop. In Tom White. Hadoop: The Definitive Guide: Storage and Analysis at Internet Scale. O’Reilly Media, Março de 2015. 403-424.
  • Kathleen, Ting e Cecho, Jarek Jarcec. Apache Sqoop Cookbook.  O’Reilly Media, 2013. 94.
  • Edward, Capriolo , Wampler, Dean e Rutherglen, Jason. Programming Hive. O’Reilly Media, 2012. 350.
  • Apache Hive. Disponível em <https://cwiki.apache.org/confluence/display/Hive/Home>. 21 de dezembro de 2015.

 

Banner Simplicidata

Sobre o autor:

Eduardo Miranda é professor, consultor e palestrante sobre Big Data, Data Science e Internet das Coisas. Possui sólida experiência projetando e implementando soluções de Big Data aplicadas a diferentes indústrias. É formado em Ciência da Computação pela Universidade Federal de Viçosa e mestre em Ciência da Computação pela Universidade Estadual de Campinas, onde trabalhou com banco de dados de grafos e tecnologias da web semântica.

eduardo@emiranda.com.brhttp://emiranda.com.br/

  • Luciano Bonfim

    Excelente artigo, me ajudou muito aqui nos meus estudos iniciais! vou fazer uns testes importando umas bases que tenho de sistemas com mysql/sqlserver utilizando o sqoop mas fazendo de forma incremental…

  • Eduardo Morelli

    Ótimo artigo, Miranda!

    Fiquei com 3 dúvidas, no entanto:

    1) Em meus testes com Apache Hive (versão 2.1), tenho percebido uma recomendação de uso do TEZ, ao invés do MR. Isto também acontece com o Sqoop?
    2) Nas listagens 12 e 13, a primeira linha deveria ser “sqoop export”, certo?
    3) Voltando ao Hive, poderíamos usar o Sqoop ao invés do comando “load data” ?

    Abs
    Morelli (palestrei no Big Data Week aqui no Rio!)

    • Olá Morelli.

      Você foi a primeira pessoa a chegar no Big Data Week Rio. Me lembro de você com certeza !! Fico satisfeito que tenha gostado do artigo e espero que tenha sido útil.

      Respondendo suas perguntas, eu não estou muito familizariado com Apache Tez. Eu tenho utilizado apenas distribuições Cloudera e que não possui nativamente Apache Tez. Eu acredito que você esteja utilizando uma distribuição Hortonworks e encontrei esse artigo dando um pouco mais detalhes sobre o assunto: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.6/bk_HDP_RelNotes/content/errata_sqoop_with_tez.html. Mas sinceramente não sei no momento dizer se o Sqoop faria uma recomendação de uso do Tez, ao invés do MapReduce.

      Correto, a listagem 12 e 13 deveria ser Sqoop export. Vou solicitar a alteração. Obrigado pela correção !!

      O comando LOAD DATA no Hive faz a carga de dados em uma tabela no Hive com os dados disponíveis em arquivo no formato tabular tanto no sistema de arquivos local quanto no HDFS. No entanto, o comando LOAD DATA não faz a obtenção de dados a partir de um banco relacional, função qual o Sqoop foi criado. O origem de dados no Sqoop é apenas bancos de dados e não arquivos em formato tabular e desse modo o Sqoop não substitui o comando LOAD DATA do Hive.

      Espero ter ajudado. Fique super a vontade para enviar novas perguntas em caso de dúvidas.

      • Eduardo Morelli

        Perfeito, Miranda!

        De fato, comecei com uma distribuição Hortonworks, mas a versão do Hive oferecida no Sandbox era a 1.2, muito antiga. Se quisesse utilizar a versão mais nova (2.1), teria que partir para uma instalação Hadoop do zero. E foi o que fiz: criei uma VM com Hadoop 2.7. Mas, quando fui usar o Hive 2.1, ele me pediu o TEZ, o que me obrigou a também instalá-lo (vida dura!).

        Enfim, me baseei muito neste artigo:
        https://www.infoq.com/articles/apache-tez-saha-murthy

        Mais uma vez, parabéns pelo artigo!

        Abs
        Eduardo Morelli

Top