기타

[Spark] json->parquet 저장시 특수문자 해결

공부의 Sun 2021. 10. 7. 20:00

목적

  • parquet의 경우, attribute name에 " ,;{}()\n\t=" 문자가 들어가면 rename을 시켜주어야한다. 에러메시지는 아래와 같다.
    'Attribute name "my column" contains invalid character(s) among " ,;{}()\\n\\t=". Please use alias to rename it.;'
    ...
    pyspark.sql.utils.AnalysisException: 'Attribute name "some-ar ray" contains invalid character(s) among " ,;{}()\\n\\t=". Please use alias to rename it.;'​
  • json을 parquet로 저장 시 key값에 해당 문자들이 들어가 데이터가 저장이 제대로 되지 않는 문제가 있었다.
  • 해당 문제 해결 방법을 기록한다.

과정

  1. json 데이터를 textFile로 읽는다.
  2. 정규식, 재귀를 이용해서 key에 해당 문자가 들어갔다면 다른 문자로 치환하거나 삭제한다. 필자는 underscore(_)를 이용하였다.
  3. 코드는 아래와 같다.
import re, json

def remove_special_char(input_str):
    return re.sub('[ ,;\{\}\(\)\n\t=]','_',input_str)


def traverse_json_recursively(current_dict):    
    for key, value in current_dict.items():  
        if type(value) is dict:
            traverse_json_recursively(current_dict.get(key))
        elif type(value) is list:
            for element in value:
                traverse_json_recursively(element)
        
        new_key = remove_special_char(key)
        current_dict[new_key] = current_dict.pop(key)
    
    return current_dict
    
...

rdd = spark.sparkContext.textFile(${json_data_path})
df = spark.read.json(rdd.map(lambda x: json.dumps(traverse_json_recursively(json.loads(x))))
...
반응형