https://issues.apache.org/jira/browse/HIVE-4204

많은 컬럼들을 인자로 받는 UDF 를 사용하는 경우 컬럼 이름들을 모두 나열하는 것은 실수하기도 쉽고 보기에도 좋지 않다. 전체 컬럼을 인자로 한다면 star argument(*) 를 사용하면 되겠지만(HIVE-3490), 두번째 컬럼부터 끝까지.. 이런식의 UDF 라면 이것도 불가능 한데 ellipsis 를 지원하다면 좀 더 수월하게 의도를 표현할 수 있다.

즉 아래와 같은 표현을 

select some_udtf(a2,a3,a4,a5,a6) from table;

다음과 같이 표현 할 수 있다.

select some_udtf(a2...) from table;

일반적인 케이스는 아니지만 사용자가 구현한 UDTF 가 임의의 갯수의 컬럼을 반환 한다거나 쿼리 문자열을 프로그래밍적으로 생성하는 경우 등등에 많은 도움이 된다.

신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-3286

일반적인 reduce-side join 에서 수행 시간에 가장 큰 영향을 끼치는 문제는 key 의 skewness 이다. reduce-side join 시 mapper 는 join key 의 hash 값을 기준으로 repartitioning 을 하게 되는데 이 값이 특정 reducer 에 몰리는 경우를 skew 가 발생했다고 하며, 해당 reducer 는 다른 reducer 들의 작업이 모두 끝난 후에도 한참을 더 돌아가는 것을 볼 수 있다. 이러한 현상은 실제 배치 처리에서 매우 빈번하게 발생한다.

이를 위해 Hive 는 skew join 이라는 최적화 기법을 지원한다(hive.optimize.skewjoin). 이 기능은 reducer 에서 grouping 된 key 들에 대해 바로 join 을 수행 하기 전에 사용자가 설정한 threshold(hive.skewjoin.key) 보다 더 많은 지를 검사하여 skewness 를 인식한다. skew 되지 않은 경우는 일반적인 join 을 수행하지만, skew 로 판정된 key 들은 join 을 하지 않고 임시로 HDFS 상에 저장해 두며 추가적인 MR 을 통해 join (mapjoin 도 가능) 을 처리한 다음 skew 되지 않은 값들의 join 결과와 merging 하는 방식이다. (자세한 내용은 디자인 문서를 참조한다. https://cwiki.apache.org/confluence/display/Hive/Skewed+Join+Optimization)

문제는 이 방식이 skewness 를 알기 위해 전체 데이터에 대한 shuffling 과정을 거쳐야 하므로 성능적으로 얻을 수 있는 부분이 그리 크지 않다는 데에 있다. skewness 의 정도가 매우 심하지 않으면 skew join optimization 을 쓰지 않을때 보다 더 느려지는게 보통이다.

이 문제를 해결하기 위해 테이블의 skewness 를 테이블의 메타정보에 저정하는 방식이 있다. 이러한 skewness 가 지정된 테이블을 skewed table 이라 하며 list bucketing(aka. LB) 을 이용하여 구현한다. 그러나 skewed table 의 skew 키가 join 키와 일치하지 않는 경우가 많고 중간 테이블에 대한 skewness 을 지정할 수 없다는 문제가 있다. 또한 LB 를 사용하는 자체가 table 의 데이터 포맷(디렉토리 구조)에 많은 제약을 가하므로 별도의 hash bucketing 이나 insert into 등이 수행되지 않는 단점이 있다. LB 에 대한 자세한 내용은 https://cwiki.apache.org/confluence/display/Hive/ListBucketing 을 참조하도록 한다.

위 패치에서 제시하는 방식은 inline-skew join 이라 불리는 것으로 skewed table 이 아닌 일반 테이블의 skewness 를 사용자가 쿼리에 직접 지정할 수 있도록 한다. 예를 들면 다음과 같다.

select count(*) from src a JOIN src b ON a.key=b.key 

   SKEWED ON (a.key > 100 CLUSTER BY 20 PERCENT);

위의 쿼리는 a.key > 100 을 만족하는 경우에 대해 전체 reducer 갯수의 20% 를 할당하여 처리하며 그 이외의 경우는 (a.key <= 100) 는 나머지 80% 의 reducer 를 할당하여 처리한다. 즉 위의 MR job 에 10 개의 reducer slot 이 사용 된다면(이는 MR job 전체 인풋의 크기에 따라 결정된다) skew 조건을 만족하는 key/value 는 2개의 reducer 에 의해, 나머지는 8개의 reducer 에 의해 처리 된다. 

row 가 reducer operator 에 forwarding 되면 skew condition 을 evaluation 하여 skew 클러스터에 할당된다. skew 조건은 여러개 지정할 수 있으나, 모든 row 에 대해 순서대로 evaluation 되므로 성능에 영향을 줄 수 있으므로 너무 많이 지정하지 않는 것이 좋다. 각 skew 클러스터는 조건에 사용된 driving alias (클러스터 조건식에 등장하는 컬럼이 존재하는 alias, 위의 경우 a) 를 big alias 로 가정하여 skew 클러스터의 reducer 들에게 random forwarding 하며, 나머지 alias 들은 skew 클러스터의 모든 reducer 에게 broadcasting 한다. skew 되지 않은 row 들은 일반적인 hash reparitioning 을 한다. 두개의 조인으로 처리되는 일반적인 skew join 과는 달리 하나의 join task 로 skew 작업을 수행한다.

위의 쿼리는 대략 다음과 같이 처리된다.

a.key|b.key<=100 --> reducer-0~7 (hash repartitioning)

a.key=200 --> reducer-8  (random distribution) |  b.key=200 --> reducer-8,9 (broadcasting)

a.key=300 --> reducer-9  (random distribution) |  b.key=200 --> reducer-8,9 (broadcasting)


신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-6312

Authorization 등을 위해 Hive 의 doAs 옵션을 쓰는 경우(hive.server2.enable.doAs = true) 사용자의 subject 정보는 thrift call 에 embedding 되어 client 에서 server 로 전달된다. hiveserver 는 TUGIContainingProcessor 에서 이 정보를 이용하여 subject 를 새로 생성하여 doAS 로 하위 작업들을 호출하는데 이때의 login 정보가 Hive 의 authorization 과정이나 외부 FileSystem 에 접근할 때 사용된다.

문제는 hadoop 에서 제공하는 FileSystem cache 를 쓰는 경우 이 cache 키값이 uri + subject 정보이며, subject 는 identity 만을 비교한다. 즉 동일한 사용자라 하더라도 subject 객체가 다르므로 모든 call 에 대해 FileSystem 을 새로 만들게 되어 memory leak 이 발생한다. FileSystem cache 는 디폴트로 enable 되어 있으며 HDFS 를 위한 hadoop.io.DistributedFileSystem 의 경우 상당한 메모리를 점유하므로 아무리 hiveserver 에 많은 메모리를 할당 하더라도 수천~수만 call 을 못버티고 OOM 으로 죽게 된다. 

이 문제를 해결하는 가장 간단한 방법은 hadoop 의 FileSystem cache 를 disable 하는 것이지만 (fs.hdfs.impl.disable.cache = true), FileSystem 은 Hive 내에서 compile/optimizing 과정에 매우 빈번하게 사용되므로 약간의 성능 저하는 불가피하다. 무엇보다 Hive 는 FileSystem 을 close 하지 않으므로 또 다른 문제을 야기할 수 있다.

이 패치는 session 생성시에 subject 정보를 한번 만들어 저장하고 각 call 에 embedding 된 session ID 로 session 을 찾은 다음 session 내의 subject 로 doAs 를 호출하는 것으로 memory leakage 와 성능 문제를 동시에 해결한다. 

신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-5799

hiveserver2 를 띄우고 JDBC 로 접근하는 경우 사용자 코드 불량이나 네트웍 등의 문제로 사용중이던 session 이나 operation 들이 hiveserver  에 계속 남게 되는 경우가 있는데 현재는 이를 close 할 수 있는 방법이 없다. session 이나 operation 들은 제법 덩치가 큰 객체로 메모리의 상당 부분을 차지하고 있게 된다. 

이 패치는 hiveserver 를 시작할 때 감시 쓰레드를 같이 띄워 오래동안 접근한 흔적이 없는 session 과 operation 을 찾아 close 해 주는 것으로 check 주기와 session threshold, operation threshold 를 지정하여 사용한다.

session/operation 검사 주기 msec : hive.server2.session.check.interval

session timeout threshold msec hive.server2.idle.session.timeout

operation  timeout threshold msec hive.server2.idle.operation.timeout

hiveserver 의 감시 쓰레드는 기본적으로 terminal 상태(FINISHED, CANCELED, CLOSED, ERROR)의 session/operation들만 검사하게 되어 있는데 이 값을 - 로 주면 상태에 관계없이 모든 session/operation 을 사한다. 예를 들어 hive.server2.idle.session.timeout=-1000 로 설정하면 1 초 동안 사용하지 않은 모든 session 을 close 하는 무서운 기능이 된다.

ps. msec 으로 설정하려니 큰 시간을 계산하기 힘들어서 시간 관련 postfix 를 지정할 수 있도록 했다. s :seconds, m : minutes, h : hours, d : days 이며 hive.server2.idle.session.timeout=7d 하면 일주일이 된다.

신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-5962

Hive 의 모든 identifier 는 ascii 로 정의되어 있다. 가끔 테이블 이름이나 컬럼 이름을 한글로 쓰고 싶다는 괴상한 요구 사항을 받는 경우가 있는데, 이 패치는 그것을 위한 것이다. antlr 문법 파일을 수정해야 하므로 오작동 가능성 때문에 메인 브랜치에는 넣을 수 없고 꼭 필요한 싸이트에 대해서만 적용하여 릴리즈 한다. 

기본적으로 Lexer 룰을 고쳐 identifier 에 UTF 문자들을 수용하게 한 것인데, 아마도 mapping 테이블을 엄청나게 크게 만들기 때문에 메모리나 성능적인 문제가 있을 수도 있겠다. 그 외에 metastore 의 문자열 길이 문제 때문에 MYSQL 의 테이블 스키마를 수정한 정도.

신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-5901

Driver.cancel() 명령 호출시 아직 수행되지 않은 Task 들은 cancel 되지만 running 상태의 Task 는 종료 될 때 까지 기다리게 된다. 이 패치는 running 상태의 Task 들을 찾아서 shudown() 을 호출해 주며 MRTask 의 경우 현재 진행중인 Job 을 취소 시켜 바로 리소스를 반환하게 한다. 

JDBC 의 cancel 명령어도 결과적으로 Driver.cancel() 을 수행하게 되어 있다.

신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-1662

Hive 가 지원하는 VC 들 중에 인풋 파일명(INPUT__FILE__NAME)이 있는데 이를 런타임에 Filter OP 에서 evaluation 하지 않고 MR 시작 전에 InputSplit 레벨에서 미리 evaluation 하여 scan 을 줄이는 기능이다. 파티션 프루닝(PPR)과 비슷한 역할을 한다.

Hive 가 파티션을 지원하기는 하지만 Hive 에서 정해진 형식(pcol1=a/pcol2=b)으로 디렉토리 구조가 되어 있어야 하므로 임의의 방식으로 정리된 디렉토리에 대해서는 이를 바로 적용할 수가 없는 경우가 많다. 즉 Hive 에서 지원하는 디렉토리 형식에 맞추어 한번 더 ETL 하는 과정이 필요 한데 이 자체가 크게 성능에 영향을 주지 않는다 하더라도 이 프로세스에 대한 관리 수요가 커지는 것은 막을 수 없다. 게다가 아마존 S3 스토리지와 같이 파일 Move 의 비용을 무시할 수 없는 시스템을 사용하는 경우라면 더욱 큰 문제가 된다.

예를 들어 하나의 디렉토리가 있고 파일명이 날짜로 구성된 경우 Hive 의 파티션에 맞추기 위해서는 이를 각 날자별로 디렉토리를 만들어 다시 옮기고 각각에 대해 파티션을 만드는 다소 지루한 과정이 필요하지만 이 기능을 활용하는 경우 VC 에 대한 필터만 주면 파티션을 이용하는 것과 거의 동일한 성능적인 이점을 활용 할 수 있다는 장점이 있다.

srcbucket2 은 Hive 테스트에 많이 쓰이는 테이블들 중 하나인데, 이 테이블 안에는 key 로 bucketing 된 네개의 파일 (srcbucket20.txt, srcbucket21.txtsrcbucket22.txtsrcbucket23.txt) 이 들어 있다. 

select * from srcbucket2 where INPUT__FILE__NAME rlike '.*/srcbucket2[03].txt'

srcbucket2 에 대해 위와 같은 쿼리를 날리는 경우 원래는 네개의 파일을 다 읽어서 FILTER OP 에서 필터링을 하게 되지만, 이 패치를 적용하면 Split 을 만드는 과정에서 두개의 파일만 골라 인풋으로 사용하게 된다.

신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-6329

민감한 컬럼 데이터에 대해 encoding 을 적용하고 권한이 있는 사람만 decoding 하여 실제 값을 볼 수 있게 제한하는 등의 요건이 많이 들어와서 구현한 것. HDFS 레벨의 블록 인코딩과 비슷한 방법을 사용하자는 의견도 있었지만 전체 데이터를 인코딩 해야 하므로 속도 문제도 예상이 되었고 무엇보다 사내에 Hadoop 수정이 가능한 인력이 없으므로 결국 Hive 레벨에서 구현하는 것으로 결정되었다. 

Hive 에서 실제 wire format 은 SerDe 에 의해 결정되는데 이 부분을 살짝 고쳐주면 되긴 하지만 이를 싸이트 마다 다르게 수정하는 것은 말도 안되므로 인터페이스를 하나 뽑아서 인코딩/디코딩 부분을 재활용 가능하게 만들었다. 현재는 LazySimpleSerDe 와 HBaseSerDe 만 지원한다. 이들은 FieldRewritable 이라는 마커 인터페이스를 붙여 놓았다.

기본 인터페이스는 다음과 같다. 

public interface FieldRewriter {

  void init(List<String> columnNames, List<TypeInfo> columnTypes, Properties properties) throws IOException;

  void encode(int index, ByteStream.Input input, ByteStream.Output output) throws IOException;

  void decode(int index, ByteStream.Input input, ByteStream.Output output) throws IOException;

}

아래는 테스트에 사용한 샘플 구현체. Base64 로 인코딩하여 저장하고, 필요시 디코딩하여 원래 값을 보여준다.

public class Base64Rewriter extends AbstractFieldRewriter {

  public void encode(int index, ByteStream.Input input, ByteStream.Output output) throws IOException {

    output.write(Base64.encodeBase64(input.toBytes()));

  }

  public void decode(int index, ByteStream.Input input, ByteStream.Output output) throws IOException {

    output.write(Base64.decodeBase64(input.toBytes()));

  }

}

이를 아래와 같이 사용한다. 테이블 생성시 SERDEPROPERTIES 로 column.encode.names 에 인코딩 컬럼을 나열하고 column.encode.classname 에 구현체 이름을 넣어주면 끝.

create table encode_test2(id INT, name STRING, phone STRING, address STRING)

  STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES (

    'hbase.columns.mapping' = ':key,private:name,private:phone,private:address',

    'column.encode.names'='phone,address', 

    'column.encode.classname'='org.apache.hadoop.hive.serde2.Base64Rewriter');


신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-6411

Hive 에서 HBaseStorageHandler 를 사용할 때 HBase 의 row 를 composite key 로 구성하는 경우가 많은데 Hive 의 필드 DELIMITER 를 지정하는 방법 외에는 딱히 사용할 방법이 없다. 이 패치는 row 컬럼을 사용자가 원하는 형태의 오브젝트에 할당하는 것으로 read/write 포맷을 사용자 임의대로 사용할 수 있으며 row 에 대한 predicate 를 다양한 방법으로 처리 할 수 있게 해 준다.

사용자는 용도에 따라 두가지 인터페이스를 구현할 수 있다. HBaseKeyFactory 는 HBase 테이블에 대한 READ 를 가능하게 하며  HBaseWritableKeyFactory 는 READ 와 WRITE 를 가능하게 해준다. row 에 대한 predicate 를 처리하기 위해  HivePredicateAnalyzer 를 추가로 구현 할 수도 있다. 이를 이용하여 HBase 에서 제공하는 다양한 Filter 들을 적용하여 scan 속도를 올릴 수도 있다. 

public interface HBaseKeyFactory {

  void init(SerDeParameters parameters, Properties properties) throws SerDeException;

  ObjectInspector createObjectInspector(TypeInfo type) throws SerDeException;

  LazyObjectBase createObject(ObjectInspector inspector) throws SerDeException;

}

init 은 사용자가 구현한 Factory 를 초기화 하는 것으로 HBaseSerDe 생성 과정의 초기 단계에 호출되며, 뒤이은 row object 및 object inspector 를 만드는 과정에서 createObjectInspector/createObject 가 각각 호출 된다. Row Column 으로 사용될 Object 는 LazyObjectBase 를 구현해야 한다.

Write 를 커스터마이징 하기 위해서 (별도로 지정하지 않으면 JSON string 으로 저장됨) HBaseWritableKeyFactory 를 구현할 수도 있다. HBaseWritableKeyFactory 는 HBaseKeyFactory 의 서브 인터페이스이다.

public interface HBaseWritableKeyFactory extends HBaseKeyFactory {

  boolean serialize(Object object, ObjectInspector inspector, ByteStream.Output output) throws IOException;

}

포맷 구현시 row 의 sub element 들에 대해 고정 길이 컬럼을 사용하면 HivePredicateAnalyzer 구현시 partial predicate 에 대해서도 어느정도 필터를 사용할 수 있다.


아래는 사용 예이다. SERDEPROPERTIES 에 hbase.composite.key.factory 를 지정하면 된다.

CREATE TABLE hbase_ck_4(key struct<col1:string,col2:string,col3:string>, value string)

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'

WITH SERDEPROPERTIES (

    "hbase.table.name" = "hbase_custom2",

    "hbase.columns.mapping" = ":key,cf:string",

    "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory2"); 


-- Insert

from src tablesample (5 rows)

insert into table hbase_ck_4 select

struct(

  cast(key as string),

  cast(cast(key + 1000 as int) as string),

  cast(cast(key + 2000 as int) as string)),

value;


-- Select

select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238';


신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-6470


tez guy Gunther 가 자기 취미대로 Explain 포맷을 고쳐놨는데, 가끔 indent 가 맞지 않는 경우가 있음.


원래 이랬는데, 


STAGE PLANS:

  Stage: Stage-1

    Map Reduce

      Alias -> Map Operator Tree:

        src 

          TableScan

            alias: src

            Select Operator

              expressions:

                    expr: key

                    type: string

              outputColumnNames: _col0

              File Output Operator

                compressed: false

                GlobalTableId: 0

                table:

                    input format: org.apache.hadoop.mapred.TextInputFormat

                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


지금은 이렇게..


STAGE PLANS:

  Stage: Stage-1

    Map Reduce

      Map Operator Tree:

          TableScan

            alias: src

            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE

            Select Operator

              expressions: key (type: string)

              outputColumnNames: _col0

              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE

              File Output Operator

                compressed: false

                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE

                table:

                    input format: org.apache.hadoop.mapred.TextInputFormat

                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


난 내용도 좀 맘에 안드는데가 TableScan 앞에 indent 가 두번 들어간게 볼수록 짜증이 남. 그래서 이거 고쳤음.


신고
Posted by navis94

카테고리

분류 전체보기 (31)
Apache Hive (29)

최근에 달린 댓글

최근에 받은 트랙백

태그목록

달력

«   2017/11   »
      1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30    

티스토리 툴바