Table of Contents

  • 참조: appsflyer pull api 가이드

  • push api가 아닌 pull api를 사용하는 이유.

    • 앱스플라이어 이벤트를 많이 정의할수록 push api가 빈번하게 호출된다.
    • 이는 서비스 품질 저하로 이어질 수 있음. (완벽한 분석용 DB가 따로 구축되어 있다면 상관 없다.)
    • 그러므로 실시간 데이터 적재보다는 하루에 한번, 전일자 데이터를 적재하기로 결정했고 이런 용도에는 pull api가 적합하다.
  • product와 archive에 동시에 적재하는 이유.

    • 앱스플라이어에서 데이터를 무한정 제공하진 않는다.
    • 가입한 플랜에 따라 최근 x일자 데이터만 조회가 가능하다.
    • 그렇기에 미리미리 자체적으로 데이터를 보관해야 한다.
    • 이를 위해 앱스플라이어에서 직접 AWS s3로 데이터를 알아서 쌓아주는 data locker라는 서비스를 제공하지만 매우 비싸다.
    • 내부 db와의 join을 위해 product에 적재해야 하지만 계속 쌓아두기엔 용량의 압박이 있다.
    • 그러므로 동일한 데이터를 product와 archive에 동시에 쌓고 일정 시간이 지난 데이터는 product에서 삭제해서 용량 확보.
  • 주의점

  • 문제점

    • is_primary_attribution의 경우, True/False로 들어올때도 있고 0/1로 들어올때도 있었다. 데이터의 통일성을 위해, 그리고 예외처리를 위해 binary_to_boolean함수를 만들어서 처리함.
    • 테스트를 위해 event_value에 최대한 많은 데이터를 담아 전송했으나 실제 이만큼 많은 데이터를 적재할 필요는 없다. 다른 column들과 많이 중복되기도 하고. 그래서 필요한 부분만 가져오고 나머지는 None으로 만들어버렸다.

1. API 정리

2. 테이블 설계

api 제공 column 필요? 디비 column 명 datatype 비고
Attributed Touch Type O attributed_touch_type text
Attributed Touch Time O attributed_touch_time timestamp
Install Time O install_time timestamp
Event Time O event_time timestamp
Event Name O event_name text
Event Value O event_value text
Event Revenue X event_revenue
Event Revenue Currency X event_revenue_currency
Event Revenue USD X event_revenue_usd
Event Source X event_source
Is Receipt Validated X is_receipt_validated
Partner O partner text
Media Source O media_source text
Channel O channel text
Keywords O keywords text
Campaign O campaign text
Campaign ID O campaign_id text
Adset O adset text
Adset ID O adset_id text
Ad O ad text
Ad ID O ad_id text
Ad Type O ad_type text
Site ID O site_id text
Sub Site ID X sub_site_id text
Sub Param 1 X sub_param_1 text
Sub Param 2 X sub_param_2 text
Sub Param 3 X sub_param_3 text
Sub Param 4 X sub_param_4 text
Sub Param 5 X sub_param_5 text
Cost Model X cost_model
Cost Value X cost_value
Cost Currency X cost_currency
Contributor 1 Partner O contributor_1_partner text
Contributor 1 Media Source O contributor_1_media_source text
Contributor 1 Campaign O contributor_1_campaign text
Contributor 1 Touch Type O contributor_1_touch_type text
Contributor 1 Touch Time O contributor_1_touch_time timestamp
Contributor 2 Partner O contributor_2_partner text
Contributor 2 Media Source O contributor_2_media_source text
Contributor 2 Campaign O contributor_2_campaign text
Contributor 2 Touch Type O contributor_2_touch_type text
Contributor 2 Touch Time O contributor_2_touch_time timestamp
Contributor 3 Partner O contributor_3_partner text
Contributor 3 Media Source O contributor_3_media_source text
Contributor 3 Campaign O contributor_3_campaign text
Contributor 3 Touch Type O contributor_3_touch_type text
Contributor 3 Touch Time O contributor_3_touch_time timestamp
Region X region text
Country Code X country_code text
State X state text
City X city text
Postal Code X postal_code text
DMA X dma text
IP X ip text
WIFI O wifi boolean
Operator X operator text
Carrier X carrier text
Language X language text
AppsFlyer ID O appsflyer_id text
Advertising ID O advertising_id text
IDFA X idfa text advertising_id에 같이 적재
Android ID X android_id text
Customer User ID O customer_user_id integer
IMEI X imei text
IDFV X idfv text
Platform O platform text
Device Type X device_type
OS Version X os_version
App Version X app_version
SDK Version X sdk_version
App ID X app_id
App Name X app_name
Bundle ID X bundle_id
Is Retargeting O is_retargeting boolean
Retargeting Conversion Type O retargeting_conversion_type text
Attribution Lookback O attribution_lookback text
Reengagement Window O reengagement_window text
Is Primary Attribution O is_primary_attribution boolean
User Agent X user_agent
HTTP Referrer X http_referrer
Original URL X original_url

3. 테이블 생성

create table appsflyer_raw_data
(
    attributed_touch_type   text,
    attributed_touch_time   timestamp,
    install_time    timestamp,
    event_time  timestamp,
    event_name  text,
    event_value text,
    partner text,
    media_source    text,
    channel text,
    keywords    text,
    campaign    text,
    campaign_id text,
    adset   text,
    adset_id    text,
    ad  text,
    ad_id   text,
    ad_type text,
    site_id text,
    contributor_1_partner   text,
    contributor_1_media_source  text,
    contributor_1_campaign  text,
    contributor_1_touch_type    text,
    contributor_1_touch_time    timestamp,
    contributor_2_partner   text,
    contributor_2_media_source  text,
    contributor_2_campaign  text,
    contributor_2_touch_type    text,
    contributor_2_touch_time    timestamp,
    contributor_3_partner   text,
    contributor_3_media_source  text,
    contributor_3_campaign  text,
    contributor_3_touch_type    text,
    contributor_3_touch_time    timestamp,
    wifi    boolean,
    appsflyer_id    text,
    advertising_id  text,
    customer_user_id integer,  
    platform    text,
    is_retargeting  boolean,
    retargeting_conversion_type text,
    attribution_lookback    text,
    reengagement_window text,
    is_primary_attribution  boolean
)

4. 안드로이드&ios dataframe 합치기

  • 둘 다 동일한 format이라 문제 없음

5. temp테이블로 to_sql

  • temp_appsflyer_raw_data

6. temp테이블 -> 적재테이블로 insert

전체 스크립트

#-*-coding:utf-8
import datetime
import locale
import datetime
import time
import os
import pandas as pd
import sqlalchemy
import psycopg2
import numpy as np
import json
# from slacker import Slacker

# 작업이 완료되면 슬랙으로 noti를 주기위해 넣음. 다른 방식으로 작업 결과를 체크한다면 지워도 무방함.
# SLACK_CHANNEL = 'YOUR_SLACK_CHANNEL'
# SLACK_TOKEN = "YOUR_SLACK_BOT_TOKEN"

def api_calling(api_call_command, download_path):
    while True:
        os.system(api_call_command)
        # 용량이 0바이트인 csv가 다운로드 되었을 경우
        if os.path.getsize(download_path) == 0:
            #slack.chat.post_message(SLACK_CHANNEL, "[failed]" + download_path, as_user=True)
            os.remove(download_path)
            time.sleep(10*60)   # 10분 기다렸다가 다시 다운로드 시도해
            continue
        break

def binary_to_boolean(input_str):
    try:
        if int(input_str) == 1:
            return True
        elif int(input_str) == 0:
            return False
        else:
            return input_str
    except:
        return input_str

def connect(user, password, db, host, port=DEFAULT_PORT):
    '''Returns a connection and a metadata object'''
    url = 'postgresql://{}:{}@{}:{}/{}'
    url = url.format(user, password, host, port, db)

    # The return value of create_engine() is our connection object
    connection = sqlalchemy.create_engine(url, client_encoding='utf8')

    return connection


# product DB 연결 정보
product_user = ''
product_password = ''
product_host=''
product_port =
product_db = ''

# archive DB 연결 정보
archive_user = ''
archive_password = ''
archive_host=''
archive_port =
archive_db = ''


# 어제자 데이터를 가져와 적재하기 위해 어제 날자를 구함
today = datetime.datetime.now().date()
yesterday = today - datetime.timedelta(1)

# 어제 데이터만 조회하기 위함
from_date=to_date=yesterday


product_connection_string = "dbname={dbname} user={user} host={host} password={password} port={port}"\
                            .format(dbname=product_db,
                                    user=product_user,
                                    host=product_host,
                                    password=product_password,
                                    port=product_port)

archive_connection_string = "dbname={dbname} user={user} host={host} password={password} port={port}"\
                            .format(dbname=archive_db,
                                    user=archive_user,
                                    host=archive_host,
                                    password=archive_password,
                                    port=archive_port)


# 다운로드 경로& api url 지정
download_path_report = "WRITE_YOUR_PATH/{date}_{platform}_{type}_report.csv"

download_path_android_install_report = download_path_report.format(date=yesterday, platform='android', type='install')
download_path_android_in_app_events_report = download_path_report.format(date=yesterday, platform='android', type='in_app_events')

download_path_ios_install_report = download_path_report.format(date=yesterday, platform='ios', type='install')
download_path_ios_in_app_events_report = download_path_report.format(date=yesterday, platform='ios', type='in_app_events')

api_url_android_install_report = "https://PULL_API_URL_AND_API_KEY&from={from_date}&to={to_date}&timezone=Asia/Seoul"
api_url_android_in_app_events_report = "https://PULL_API_URL_AND_API_KEY&from={from_date}&to={to_date}&timezone=Asia/Seoul"

api_url_ios_install_report = "https://PULL_API_URL_AND_API_KEY&from={from_date}&to={to_date}&timezone=Asia/Seoul"
api_url_ios_in_app_events_report = "https://PULL_API_URL_AND_API_KEY&from={from_date}&to={to_date}&timezone=Asia/Seoul"

api_url_android_install_report = api_url_android_install_report.format(from_date=from_date
                                                                 , to_date=to_date)
api_url_android_in_app_events_report = api_url_android_in_app_events_report.format(from_date=from_date
                                                                                     , to_date=to_date)
api_url_ios_install_report = api_url_ios_install_report.format(from_date=from_date
                                                                 , to_date=to_date)
api_url_ios_in_app_events_report = api_url_ios_in_app_events_report.format(from_date=from_date
                                                                            , to_date=to_date)


api_call="wget -O '{download_path}' '{api_url}'"
api_call_android_install_report = api_call.format(download_path=download_path_android_install_report
                                               , api_url = api_url_android_install_report)
api_call_android_in_app_events_report = api_call.format(download_path=download_path_android_in_app_events_report
                                               , api_url = api_url_android_in_app_events_report)
api_call_ios_install_report = api_call.format(download_path=download_path_ios_install_report
                                               , api_url = api_url_ios_install_report)
api_call_ios_in_app_events_report = api_call.format(download_path=download_path_ios_in_app_events_report
                                               , api_url = api_url_ios_in_app_events_report)


# raw data report download
api_calling(api_call_android_install_report, download_path_android_install_report)
api_calling(api_call_ios_install_report, download_path_ios_install_report)
api_calling(api_call_android_in_app_events_report, download_path_android_in_app_events_report)
api_calling(api_call_ios_in_app_events_report, download_path_ios_in_app_events_report)


android_install_report = pd.read_csv(download_path_android_install_report)
android_in_app_events_report = pd.read_csv(download_path_android_in_app_events_report)
ios_install_report = pd.read_csv(download_path_ios_install_report)
ios_in_app_events_report = pd.read_csv(download_path_ios_in_app_events_report)

total_report = pd.concat([  android_install_report
                          , android_in_app_events_report
                          , ios_install_report
                          , ios_in_app_events_report], axis = 0)

# column명을 정제. 대문자를 소문자로, 공백문자를 underscore로
total_report.columns = [column.lower().replace(' ', '_') for column in total_report.columns]


# ios의 경우 idfa컬럼값을 advertising_id로 복사
total_report.loc[total_report.platform == 'ios', 'advertising_id'] = total_report.loc[total_report.platform == 'ios', 'idfa']

#db에 적재할 column만 가져오기
necessary_columns = [
    'attributed_touch_type', 'attributed_touch_time', 'install_time', 'event_time', 'event_name', 'event_value',
    'partner', 'media_source', 'channel', 'keywords', 'campaign', 'campaign_id', 'adset', 'adset_id', 'ad','ad_id', 'ad_type', 'site_id',
    'contributor_1_partner', 'contributor_1_media_source', 'contributor_1_campaign', 'contributor_1_touch_type', 'contributor_1_touch_time',
    'contributor_2_partner', 'contributor_2_media_source', 'contributor_2_campaign', 'contributor_2_touch_type', 'contributor_2_touch_time',
    'contributor_3_partner', 'contributor_3_media_source', 'contributor_3_campaign', 'contributor_3_touch_type', 'contributor_3_touch_time',
    'wifi', 'appsflyer_id', 'advertising_id', 'customer_user_id', 'platform', 'is_retargeting', 'retargeting_conversion_type', 'attribution_lookback', 'reengagement_window', 'is_primary_attribution']

sliced_report = total_report[necessary_columns]

#구매 이벤트일 경우에만 영수증 아이디를 남기고 나머지 경우에는 모두 삭제
sliced_report['event_value'] = sliced_report['event_value'].apply(lambda x: int(json.loads(x)['af_receipt_id'])
                                                                       if 'af_receipt_id' in str(x)
                                                                       else None)

# is_primary_attribution의 경우 True/False로 들어올때도 있고 0/1로 들어올 떄도 있다.
# Boolean으로 처리해주기 위해 데이터를 전처리해주자
sliced_report['is_primary_attribution'] = sliced_report['is_primary_attribution'].apply(lambda x: binary_to_boolean(x))

# 기존에 적재하는 테이블에 어제자 데이터를 insert하는 형식으로 하기보단 아래 순서대로 작업을 진행.
# 1. 다운받은 csv를 temp 테이블로 옮긴 뒤,
# 2. temp테이블의 모든 레코드를 기존 적재 테이블로 옮기고
# 3. temp테이블 drop
# 4. 다운받은 csv파일 삭제

#connect production DB with sqlalchemy
product = connect(product_user, product_password, product_db, product_host, product_port)


# 1. 다운받은 csv를 temp 테이블로 옮긴 뒤,
sliced_report.to_sql('temp_appsflyer_raw_data', product, index=False,
                    dtype={
                           'attributed_touch_type':sqlalchemy.types.Text(),
                           'attributed_touch_time':sqlalchemy.types.TIMESTAMP(),
                           'install_time':sqlalchemy.types.TIMESTAMP(),
                           'event_time':sqlalchemy.types.TIMESTAMP(),
                           'event_name':sqlalchemy.types.Text(),
                           'event_value':sqlalchemy.types.Text(),
                           'partner':sqlalchemy.types.Text(),
                           'media_source':sqlalchemy.types.Text(),
                           'channel':sqlalchemy.types.Text(),
                           'keywords':sqlalchemy.types.Text(),
                           'campaign':sqlalchemy.types.Text(),
                           'campaign_id':sqlalchemy.types.Text(),
                           'adset':sqlalchemy.types.Text(),
                           'adset_id':sqlalchemy.types.Text(),
                           'ad':sqlalchemy.types.Text(),
                           'ad_id':sqlalchemy.types.Text(),
                           'ad_type':sqlalchemy.types.Text(),
                           'site_id':sqlalchemy.types.Text(),
                           'contributor_1_partner':sqlalchemy.types.Text(),
                           'contributor_1_media_source':sqlalchemy.types.Text(),
                           'contributor_1_campaign':sqlalchemy.types.Text(),
                           'contributor_1_touch_type':sqlalchemy.types.Text(),
                           'contributor_1_touch_time':sqlalchemy.types.TIMESTAMP(),
                           'contributor_2_partner':sqlalchemy.types.Text(),
                           'contributor_2_media_source':sqlalchemy.types.Text(),
                           'contributor_2_campaign':sqlalchemy.types.Text(),
                           'contributor_2_touch_type':sqlalchemy.types.Text(),
                           'contributor_2_touch_time':sqlalchemy.types.TIMESTAMP(),
                           'contributor_3_partner':sqlalchemy.types.Text(),
                           'contributor_3_media_source':sqlalchemy.types.Text(),
                           'contributor_3_campaign':sqlalchemy.types.Text(),
                           'contributor_3_touch_type':sqlalchemy.types.Text(),
                           'contributor_3_touch_time':sqlalchemy.types.TIMESTAMP(),
                           'wifi':sqlalchemy.types.Boolean(),
                           'appsflyer_id':sqlalchemy.types.Text(),
                           'advertising_id':sqlalchemy.types.Text(),
                           'customer_user_id':sqlalchemy.types.Integer(),
                           'platform':sqlalchemy.types.Text(),
                           'is_retargeting':sqlalchemy.types.Boolean(),
                           'retargeting_conversion_type':sqlalchemy.types.Text(),
                           'attribution_lookback':sqlalchemy.types.Text(),
                           'reengagement_window':sqlalchemy.types.Text(),
                           'is_primary_attribution':sqlalchemy.types.Boolean()
                          }
                  )

# connect production DB wiht psycopg2
product_psycopg = psycopg2.connect(product_connection_string)
pc = product_psycopg.cursor()

# 2. temp테이블의 모든 레코드를 기존 적재 테이블로 옮기고
# 3. temp테이블 drop
sql_insert = """
insert into appsflyer_raw_data
select *
from temp_appsflyer_raw_data
"""
sql_drop = """drop table temp_appsflyer_raw_data"""

pc.execute(sql_insert)
pc.execute('commit')
pc.execute(sql_drop)
pc.execute('commit')
pc.close()

# 동일한 작업을 archive DB에서도 반복
#connect archive DB with sqlalchemy
archive = connect(archive_user, archive_password, archive_db, archive_host, archive_port)

#temp 테이블로 넣기
sliced_report.to_sql('temp_appsflyer_raw_data', archive, index=False,
                    dtype={
                           'attributed_touch_type':sqlalchemy.types.Text(),
                           'attributed_touch_time':sqlalchemy.types.TIMESTAMP(),
                           'install_time':sqlalchemy.types.TIMESTAMP(),
                           'event_time':sqlalchemy.types.TIMESTAMP(),
                           'event_name':sqlalchemy.types.Text(),
                           'event_value':sqlalchemy.types.Text(),
                           'partner':sqlalchemy.types.Text(),
                           'media_source':sqlalchemy.types.Text(),
                           'channel':sqlalchemy.types.Text(),
                           'keywords':sqlalchemy.types.Text(),
                           'campaign':sqlalchemy.types.Text(),
                           'campaign_id':sqlalchemy.types.Text(),
                           'adset':sqlalchemy.types.Text(),
                           'adset_id':sqlalchemy.types.Text(),
                           'ad':sqlalchemy.types.Text(),
                           'ad_id':sqlalchemy.types.Text(),
                           'ad_type':sqlalchemy.types.Text(),
                           'site_id':sqlalchemy.types.Text(),
                           'contributor_1_partner':sqlalchemy.types.Text(),
                           'contributor_1_media_source':sqlalchemy.types.Text(),
                           'contributor_1_campaign':sqlalchemy.types.Text(),
                           'contributor_1_touch_type':sqlalchemy.types.Text(),
                           'contributor_1_touch_time':sqlalchemy.types.TIMESTAMP(),
                           'contributor_2_partner':sqlalchemy.types.Text(),
                           'contributor_2_media_source':sqlalchemy.types.Text(),
                           'contributor_2_campaign':sqlalchemy.types.Text(),
                           'contributor_2_touch_type':sqlalchemy.types.Text(),
                           'contributor_2_touch_time':sqlalchemy.types.TIMESTAMP(),
                           'contributor_3_partner':sqlalchemy.types.Text(),
                           'contributor_3_media_source':sqlalchemy.types.Text(),
                           'contributor_3_campaign':sqlalchemy.types.Text(),
                           'contributor_3_touch_type':sqlalchemy.types.Text(),
                           'contributor_3_touch_time':sqlalchemy.types.TIMESTAMP(),
                           'wifi':sqlalchemy.types.Boolean(),
                           'appsflyer_id':sqlalchemy.types.Text(),
                           'advertising_id':sqlalchemy.types.Text(),
                           'customer_user_id':sqlalchemy.types.Integer(),
                           'platform':sqlalchemy.types.Text(),
                           'is_retargeting':sqlalchemy.types.Boolean(),
                           'retargeting_conversion_type':sqlalchemy.types.Text(),
                           'attribution_lookback':sqlalchemy.types.Text(),
                           'reengagement_window':sqlalchemy.types.Text(),
                           'is_primary_attribution':sqlalchemy.types.Boolean()
                          }
                  )

# connect archive DB wiht psycopg2
archive_psycopg = psycopg2.connect(archive_connection_string)
ac = archive_psycopg.cursor()

ac.execute(sql_insert)
ac.execute('commit')
ac.execute(sql_drop)
ac.execute('commit')
ac.close()


# 4. 다운받은 csv파일 삭제
os.remove(download_path_android_install_report)
os.remove(download_path_android_in_app_events_report)
os.remove(download_path_ios_install_report)
os.remove(download_path_ios_in_app_events_report)

# 작업이 완료되면 슬랙으로 noti
# slack = Slacker(SLACK_TOKEN)
# slack.chat.post_message(SLACK_CHANNEL, "appsflyer_raw_data.py done", as_user=True)