Two possible solutions -


신고
Posted by navis94

hive PTF-WINDOWING

2015.01.22 11:12

까먹기 전에..

TABLE-DEF 는 WINDOW-DEF 와 PTF-DEF 로 나뉜다. WINDOW-DEF 는 일반적인 윈도우 선언을 사용하는 function 들이고 PTF-DEF 는 사용자가 임의의 테이블을 만들어 내는 function 으로 script operator 와 동일한 성격을 가진다고 보면 된다 (noop/noopwithmap/matchpath 등이 있는데 몇개 없음). OpConf 내의 정의와 실제 evaluation 되는 순서는 반대이므로 주의.

PartitionedTableFunctionDef : TABLE-DEF2 -> TABLE-DEF1 -> INPUT-TABLE

PTFInvocation : INPUT -> TABLE-DEF1 -> TABLE-DEF2

* 기본적으로 각 DEF 에 대해 start-partition -> process-row* -> finish-partition 순서로 진행

target partition 이 처리되는 방식에 따라 input streaming mode 와 non-input-streaming mode 로 나눌 수 있다. input streaming 은 partition 내의 row 를 하나씩 넣으면서 결과를 뽑아내는 방식이고 non-input-streaming 는 전체 파티션을 다 읽어 partition iterator 를 만든 후 이를 처리하는 방식이다. 이 중에 input streaming mode 가 좀 더 효율적이라 할 수 있다. non-input-streaming mode 는 결과값을 만들어 내는 방식에 따라 다시 iterator-out mode 와 partition-out mode 로 나뉜다.

우선 TABLE-DEF 들이 input streaming mode 가 가능 한지를 검사 (TableFunctionEvaluator.setCanAcceptInputAsStream()) 하여 모든 하위 function 들이 아래와 같은 범주에 속하는 경우에는 input streaming mode 를 적용할 수 있다.

1. The Function implements ISupportStreamingModeForWindowing 

2. Or returns a non null Object for the getWindowingEvaluator, that implements ISupportStreamingModeForWindowing. 

3. Is an invocation on a 'fixed' window.  So no Unbounded Preceding or Following.

non-input-streaming mode 의 경우 output 을 iterator 방식으로 줄 수 있는지를 조사한다. WINDOW-DEF 와 input-streaming 이 가능 한 모든 TABLE-DEF 들이 iterator 방식에 속한다. input 은 partition iterator 로 동일하지만 output 방식에 따라 호출되는 메쏘드가 다르다.

partition-out : execute(PTFPartitionIterator<Object> pItr, PTFPartition outP))

iterator-out : Iterator<Object> iterator(PTFPartitionIterator<Object> pItr)

최악의 경우 function * partition row 의 evaluation 이 일어나며 function * partition row ^2 의 컬럼 억세스가 발생한다. 


실제 값을 evaluation 하는 방법은 function 의 종류에 따라 달라지며, 아래와 같이 총 3 가지가 있다.

streaming function

non-streaming function : pivot + non-pivot

streaming function 이라도 외부 조건에 따라 non-streaming 방식으로 사용 될 수 있으며, 이 function 이 pivot 이 가능하다면 non-streaming pivot function 으로 간주된다. (non input-streaming mode에서 getRowsRemainingAfterTerminate() 가 0 이 아닌 경우. input streaming  mode 에서는 항상 사용 가능)

streaming function 의 경우 process-row 에서 들어오는 row 값으로 그때 그때 evaluation 한다. 이때 function 의 특성에 따라 결과가 바로 나올 수도 있고 안나올 수도 있는데 (window frame 이 following X 가 있는 경우), 결과값이 나오면 이 값을 다음 TABLE-DEF 로 넘겨서 계속 evaluation 을 진행한다. 이경우 마지막 finish-partition 이 호출될 때에 나머지 결과값들이 나오게 된다. input-streaming mode 에서는 이러한 방식이 가능하지만 non input-streaming mode 에서는 현재 불가능 하다.

non-streaming/pivot function 은 PTFPartitionIterator 에 대한 결과 값을 전체 LIST 형태로 반환하는 것으로 앞서 설명한 것 처럼 streaming 타입으로 처리하는 것이  불가능 한 경우에 사용된다. 전체 파티션에 대한 해당 컬럼의 결과는 한번 evaluation 되어 LIST 형태로 저장되며, 여기에서 값을 하나씩 꺼내 가는 방식으로 동작한다. pivot 타입이 아닌 경우 이는 전체 partition 에 대해 동일한 결과를 내는 함수로 것으로 간주되며, 한번 evaluation 된 값이 모든 row 에 동일하게 적용된다. 

non-streaming/non-pivot function 은 row index 가 증가할 때 마다 그에 맞는 범위의 partition iterator 를 생성하며, 이 범위의 모든 값을 iterate + terminate 한 결과값을 반환한다.

1. partition out 의 경우 조건상 streaming 을 사용할 수 없다. 

streaming 타입으로 처리하는 것이 가능한 경우는 결과 Iterator 내에서 next 가 호출될 때 값이 evaluation 된다. 




신고
Posted by navis94


hive-0.15.0 의 hadoop 버전이 2.6으로 올라가서 (귀찮게스리) 이번에 업그레이드를 했다. 하고나니 MR 이고 TEZ 고 다 안됨. 리소스 매니저님이 노드가 맘에 안드신다고 (http://localhost:8088/cluster/nodes/unhealthy) 하셔서 보니 

1/1 local-dirs are bad: /tmp/hadoop-navis/nm-local-dir; 1/1 log-dirs are bad: /home/navis/projects/hadoop-2.6.0/logs/userlogs

라고 써있다. 다시 node-manager 로그를 찾아 보니 

WARN org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection: Directory /home/navis/projects/hadoop-2.6.0/logs/userlogs error, used space above threshold of 90.0%, removing from list of valid directorie

라네. 120G 짜리 SSD 인데 벌써 다 찼나 보다. 게임도 없고 동영상도 없는데.. 여튼 HDFS 상의 데이터들을 좀 지워줘야 할 듯.


참고로 위의 90% 는 디폴트 값인데, 아래와 같은 긴 이름의 설정을 변경하여 무시할 수도 있겠다.

yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage=100


아예 다 끌수도 있다.

yarn.nodemanager.disk-health-checker.enable=false;


이 외에도 interval-ms / health-checker.interval-ms / min-healthy-disks / max-disk-utilization-per-disk-percentage / min-free-space-per-disk-mb 등등이 있음.


----

Hive 의 MiniTezTest 가 안되는 것도 동일한 문제에서 발생한 것으로 보임. 데이터 디렉토리 날리고 나니 잘 돌아감.

신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-8669

Hive 는 SQL 외에 몇가지 특별한 command 를 제공한다. set/create function/compile 등등이 이에 속한다. 현재는 이게 static 한 factory 형태로 되어 있지만 사용자가 정의한 command 를 처리하는 구현체를 사용할 수 있도록 한다면 몇가지 재미있는 일들이 가능하다. 

현재 운영 정보를 반환한다던가 하는 심플한 커맨드를 구현하여 사용할 수 있겠지만, 인터페이스만 잘 정의하면 DSL 을 처리하는 파서 or rewrite 를 만드는 것이 가능하다. 잘만 하면 pig 를 hive 에 올릴 수도 있겠다 싶었는데, 잠깐 pig 문법을 살펴 보니 이동네도 장난이 아니라 완전 전환은 어렵지 싶다.

여튼 잘 구현되면 import 나 bulk loading 같이 단일 SQL 문으로 표현하기 힘든 작업들이 첫 대상이 되지 싶다.

신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-2573

Hive 에서 UDF 를 관리하는 FunctionRegistry 클래스는 singleton 으로 하나만 생성되며 HiveServer 와 같이 클라이언트/서버 방식으로 사용하는 경우라면 결과적으로 모든 클라이언트가 하나의 registry 를 share 하게 된다. Hive 에는 UDF 를 등록/제거하는 API 를 제공하고 있는데, 이는 누군가가 UDF 를 재정의 하면 이것이 모든 클라이언트에게 반영된다는 것을 의미한다.

당연히 말도 안되는 문제이고 hive-0.9 시절부터 패치를 만들었지만, 그동안 아무도 관심을 안 가져서 묻혀있던 이슈의 하나이다. 2011 년 11월에 등록했으니 무려 3년 가까이 된..

기본적으로 system function 용 registry 와 session registry 를 따로 만들어 create/drop temporary function 는 session registry 를, create/drop function 은 system registry 를 수정하는 것으로 패치는 엄청 크지만 컨셉도 간단하고 명확하며 구현도 어려울 것이 없음. 

그러나 그동안 permanent function(HIVE-6047) 이라는 요상한 기능이 create/drop function 문법을 먼저 차지해 버리면서 이대로 커밋하는 것이 불가능하게 되었다. 이게 제대로 된 spec 도 없이 대충 만들어 지다 보니 만든 사람도 이게 뭔지 제대로 모르는 것 같음. permanent function 이라는 이름부터가 말도 안되는 것으로, Hortonworks 가 망친 것 중 하나로 꼽을 수 있을 듯.

여튼 이 똥같은 코드와 의미를 새로 정의하느라 일주일 정도 걸린 듯.

신고
Posted by navis94

output commit

2014.10.27 09:43

(Utilities.taskTmpPrefix : "_task_tmp."  , Utilities.tmpPrefix : "_tmp.")


spec_path : ~/_ext-10001

tmpPath : ~/_tmp._ext-10001/

final-paths : tmpPath/taskId 

                    : ~/_tmp._ext-10001/_tmp_~  

taskOutputTempPath : new Path(spec.getParent(), taskTmpPrefix + spec.getName() 
                                        : ~/_task_tmp._ext-10001/

output-paths : new Path(taskOutputTempPath, tmpPrefix + taskId) 

                         : ~/_task_tmp._ext-10001/_tmp_~  


output -> final 은 FileSinkOperator 가 close 될때, final -> spec 은 jobClose 가 호출될때 수행됨(Utilities.mvFileToFinalPath).

신고
Posted by navis94

예전에 구현한 pseudo-MR 이 몇몇 테스트에서 FAIL 을 발생시켜서 내용을 조사. 


기본적으로 기존 jobClose 는 parent operator 의 상태와 관계없이 child 에게 전부 전파하는 방식으로 구현되어 있었는데, 우리는 이걸 partially jobClose 를 할 필요가 있으므로 모든 parent 들이 jobClose 된 상태인 경우에만 전파하게 하였다. 8시간짜리 테스트를 돌려서 결과를 보니 auto_join, bucket_mapjoin 등의 결과가 누락되는 문제 발생. 웃긴건 실제 환경에서는 아무 문제가 없는데 qtest 에서만 문제가 생김.

1. mapper 가 수행되기 전에 기존 root-operator 들을 MapOperator 라는 runtime operator 의 child 로 등록을 하는데, in-memory 상황에서는 이것이 그대로 남다보니 jobClose 가 아예 전파되지 않음.
2. 몇몇 top operator 들은 MapWork 의 aliasToWork mapping 에 존재하지 않음. 이들에 대해서 jobClose 가 전파되지 않음.
3. reducer operator 의 parents 들의 children 은 null 로 setting 되는데, reducer operator 의 parent 는 null setting 되지 않고 남은 상태 이므로 reducer 에 대해 호출된 jobClose 가 전파되지 않음.

벼라별 삽질 끝에, ExecDriver 에서 jobClose 를 호출하기 전에 전체를 싹 다 뒤져서 top-operator 를 추출하고 순서대로 jobClose 를 호출하게 수정. 오늘밤 테스트 예정.

추가

4. MapOperator 에서는 필요한 input 에 해당하는 operator 들만 자신의 child 로 등록을 하는데, 여기에 해당하지 않는 operator 들의 parent 로도 자신을 등록을 하여 jobClose 가 되지 않는다. 

신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-7377

ATSHook 은 hadoop-2 의 timeservice 에 쿼리 처리 시간과 같은 정보를 전달해 주는 Hook 으로 Pre/Post/FailHook 으로 등록하여 사용한다. (https://issues.apache.org/jira/browse/HIVE-7076)

hive.exec.pre.hooks=ql.src.java.org.apache.hadoop.hive.ql.hooks.ATSHook

hive.exec.post.hooks=ql.src.java.org.apache.hadoop.hive.ql.hooks.ATSHook

hive.exec.failure.hooks=ql.src.java.org.apache.hadoop.hive.ql.hooks.ATSHook

내부적으로는 쓰레드풀에 ExecutionContext 를 queue 에 넣으면 Executor 들이 여기에서 필요한 정보를 만들어 http 로 전달하는데 만약 timeservice 가 설정 되어 있지 않거나 네트웍이 문제로 정보 전송에 delay 가 생기는 경우 queue 에 계속 ExecutionContext 가 쌓이게 된다. 

ExecutionContext 는 Plan, Task 등등을 포함하는 비교적 큰 객체로, 이 상태가 유지되면 OOM 을 피할 수 없다. 이 패치는 사용자가 queue 크기를 지정할 수 있게 해 주며 쌓인 작업의 갯수가 이를 넘어서는 경우는 제일 오래된(먼저 들어온) 정보를 discard 하여 메모리 사용량을 일정 레벨 이하로 유지하게 해 준다.

신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-4867

RS operator 를 사용하게 되는 SQL 중에 GBY 를 제외한 나머지 경우에 대해서 발생하는 것으로 RS 의 KEY/VALUE 파트에 데이터가 중복으로 들어가는 문제가 있다. 예를 들어 아래와 같은 쿼리를 돌려보면,

explain select * from src order by key;

TableScan alias: src Select Operator expressions: key (type: string), value (type: string) Reduce Output Operator key expressions: _col0 (type: string) value expressions: _col0 (type: string), _col1 (type: string) Extract

RS 의 key expression 와 value expression 에 동일한 컬럼 _col0 이 각각 들어 있는 것을 확인 할 수 있다. 이는 JOIN 도 마찬가지이다.

explain select * from src a join src b on a.key=b.key;

TableScan alias: b Reduce Output Operator key expressions: key (type: string) value expressions: key (type: string), value (type: string) TableScan alias: a Reduce Output Operator key expressions: key (type: string) value expressions: key (type: string), value (type: string)


이 이슈의 목적은 이렇게 두번씩 중복되어 들어가는 column 을 (KEY 에) 한번만 사용하여 shuffling 데이터를 줄이는 것이다. 

ORDER-BY 는 원래 EXT operator 를 사용하여 VALUE 부분을 꺼내 포워딩 하였지만, 이를 SELECT operator 로 치환하여 KEY,VALUE 에서 값을 꺼내도록 수정하면 된다. 간단해 보이지만 하이브의 X 같은 코드에서는 이런것도 엄청 복잡하다. 아래는 수정된 코드의 일부분.

RowResolver selectRR = new RowResolver(); ArrayList<ExprNodeDesc> selCols = new ArrayList<ExprNodeDesc>(); ArrayList<String> selOutputCols = new ArrayList<String>(); Map<String, ExprNodeDesc> selColExprMap = new HashMap<String, ExprNodeDesc>(); for (int i = 0; i < index.length; i++) { ExprNodeColumnDesc desc; ColumnInfo prev = columnInfos.get(i); String[] nm = inputRR.reverseLookup(prev.getInternalName()); ColumnInfo info = new ColumnInfo(prev); String field; if (index[i] >= 0) { field = Utilities.ReduceField.KEY + "." + keyColNames.get(index[i]); } else { field = Utilities.ReduceField.VALUE + "." + valueColNames.get(-index[i] - 1); } info.setInternalName(field); desc = new ExprNodeColumnDesc(info.getType(), info.getInternalName(), info.getTabAlias(), info.getIsVirtualCol()); selCols.add(desc); selectRR.put(nm[0], nm[1], info); selOutputCols.add(getColumnInternalName(i)); selColExprMap.put(getColumnInternalName(i), desc); } SelectDesc select = new SelectDesc(); select.setColList(selCols); select.setOutputColumnNames(selOutputCols); Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(select, new RowSchema(selectRR.getColumnInfos()), interim), selectRR); output.setColumnExprMap(selColExprMap);

아래는 이러한 삽질의 결과가 만들어 내는 새로운 plan 이다.EXT 대신 SELECT 가 사용되었다.

TableScan alias: src Select Operator expressions: key (type: string), value (type: string) Reduce Output Operator key expressions: _col0 (type: string) value expressions: col1 (type: string) Select Operator expressions: KEY.reducesinkkey0 (type: string), VALUE._col1 (type: string)

JOIN 은 아직 진행중.
------

SORT BY 의 경우 OP-RR-EXT 가 한 메쏘드 안에서 이루어 져서 상관이 없는데, JOIN 의 경우 각 tag 에 대해 RS 를 다 만든 다음 JOIN 을 만들게 되어 필요한 정보가 누락된다. 즉 KEY.xx 에 대한 column info 를 참조할 수가 없다. 이걸 RR 에 넣자니 schema 도 같이 변경되어 optimizer 에 영향을 줄 것 같고, 그냥 만들자니 alias 정보들이 없어지는데, 이걸 쓰는 데는 없겠지만 무척 찜찜하다.

여튼 고민이 많아지네.
------

parent operator 의 column info 를 참조하도록 수정. 그러나 column expr map 이 문제가 된다. JOIN 의 value 값은 condition expression 에 ExprNodeDesc 형태로 존재하는데, 예전에는 부조건 VALUE 부분을 참조하면 되었지만 이제 KEY 부분도 참조 해야 하므로 관련 코드를 왕창 수정해야 하는 일이 발생. 게다가 column RS 의 expr map 에 들어가는 column name 은 경우에 따라 필드 태그(KEY,VALUE) 가 붙기도 하고(PTF, GBY) 안 붙기도 하는데(JOIN, OBY), 이 column name 값으로 CP 를 수행하므로.. 이하 생략..

----
CP, PPD, Lineage 는 어떻게 해결했지만 (addMappingOnly 로 mapping 만 있고 schema 에는 포함되지 않는 정보를 RowResolver 에 포함시켰음), ReduceSinkDedup, CorrelationOptimizer, SubqueryRewrite 와 같은 무시무시한 부분들이 고장..

----
드디어 완료. 멋짐ㅋ

TableScan alias: b Reduce Output Operator key expressions: key (type: string) sort order: + Map-reduce partition columns: key (type: string) value expressions: value (type: string) TableScan alias: a Reduce Output Operator key expressions: key (type: string) sort order: + Map-reduce partition columns: key (type: string) value expressions: value (type: string) Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {KEY.reducesinkkey0} {VALUE._col0} 1 {KEY.reducesinkkey0} {VALUE._col0} outputColumnNames: _col0, _col1, _col4, _col5



신고
Posted by navis94

https://issues.apache.org/jira/browse/HIVE-7027

Field access 를 포함하는 쿼리를 뷰로 만들고 이를 select 했을때 MR 내에서 Operator 들을 초기화 하는 과정에 Expr 이 참조하는 컬럼 이름을 못 찾는 문제가 보고 되었음. 

java.lang.RuntimeException: cannot find field test_c from [0:_col0, 1:_col5]
        at org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.getStandardStructFieldRef(ObjectInspectorUtils.java:415)
        at org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector.getStructFieldRef(StandardStructObjectInspector.java:150)
        at org.apache.hadoop.hive.ql.exec.ExprNodeColumnEvaluator.initialize(ExprNodeColumnEvaluator.java:55)
        at org.apache.hadoop.hive.ql.exec.ExprNodeFieldEvaluator.initialize(ExprNodeFieldEvaluator.java:53)
        at org.apache.hadoop.hive.ql.exec.ExprNodeFieldEvaluator.initialize(ExprNodeFieldEvaluator.java:53)
        at org.apache.hadoop.hive.ql.exec.Operator.initEvaluators(Operator.java:934)
        at org.apache.hadoop.hive.ql.exec.Operator.initEvaluatorsAndReturnStruct(Operator.java:960)

이런 문제는 RowResolver 와 ColExprMap 과 같이 Hive 의 젤 X 같은 코드 + CP/PPD 같은 젤 복잡한 코드가 합작해서 만들어 내는게 보통이라 전혀 쳐다보고 싶지 않았지만, 내가 최근에 커밋한 코드가 CP + limit pushdown 에 관련된 부분이라 도둑이 제발 저리듯 찾아보게 되었다.

두어시간의 삽질 끝에 FieldDesc 의 field (ColumnDesc) 의 alias 가 PPD 과정에 스리 슬쩍 바뀌는 현상을 찾아냄. 알고 보니 PPD 가 Expr mapping 을 과정에 ExprNodeDesc 를 clone 하는데, FieldDesc 의 field 는 clone 하지 않고 그냥 사용해서 생긴 버그였음. 즉,

   public ExprNodeDesc clone() {
-    return new ExprNodeFieldDesc(typeInfo, desc, fieldName, isList);
+    return new ExprNodeFieldDesc(typeInfo, desc.clone(), fieldName, isList);
   }

이게 다임. 끝.

신고
Posted by navis94

카테고리

분류 전체보기 (31)
Apache Hive (29)

최근에 달린 댓글

최근에 받은 트랙백

태그목록

달력

«   2017/11   »
      1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30    

티스토리 툴바