Resources
July 11, 2024

Getting Started with TDAG - How to Create a Portfolio

This example shows how to use TDAG to easly create a portfolio of crypto altcoins.

Getting Started with TDAG - How to Create a Portfolio

In this tutorial, we will show you how to use TDAG to create a simple market-weight strategy using the 9 largest crypto altcoins. The individual coin investments of our portfolio will be weighted by the coins' market capitalizations. To adapt to market shifts while keeping transaction fees low, we rebalance the portfolio with updated weights at the close of each quarter.

If you have not installed TDAG, check out our previous post introducing TDAG.

Setting up the Pipeline

The Main Sequence Ecosystem offers an efficient method for managing and utilizing various assets in combination with different currencies and exchanges via VAM. In the current TDAG version, VAM is required for specifying assets.


from assets_orm import CONSTANTS, Asset

assets, response = Asset.filter( 
  symbol__in=["ADA", "BCH", "DOT", "BCH", "LINK", "LTC", "MATIC", "SOL", "ATOM"], 
  asset_type=CONSTANTS.ASSET_TYPE_CRYPTO_SPOT, 
  currency="USDT", 
  execution_venue__symbol=CONSTANTS.BINANCE_FUTURES_EV_SYMBOL
)

A TDAG Pipeline consists of nodes for specific tasks. For this strategy, we create four nodes.

  • CoinPrices: Queries the last day of hourly price data from Binance for the assets.
  • MarketCaps: Retrieves the daily market capitalizations for the assets.
  • PortfolioWeights: Uses the market capitalizations to calculate the portfolio weights.
  • PortfolioBacktest: Computes the portfolio returns using the price data and portfolio weights.

CoinPrices Node

To create a new node, we extend the `TimeSerie` class with customized logic. We need to implement two functions: `__init__` and  `update_series_from_source`.

The node queries prices for a specified timeframe, currently set using the MAX_DAYS_LOOKBACK constant of 1 day.


class CoinPrices(TimeSerie):
   """
   Class to fetch and update coin prices from the Binance exchange using ccxt library.
   """
   MAX_DAYS_LOOKBACK = 1

   @TimeSerie._post_init_routines()
   def __init__(self, asset_list: ModelList, query_frequency: str, *args, **kwargs):
        # initializations

   def update_series_from_source(self, latest_value, **class_arguments):
        # query or data processing logic

We will explain both functions in more detail.

The node is initialized with the assets to query and the frequency. We also create the binance client to query data from.


@TimeSerie._post_init_routines()
def __init__(self, asset_list: ModelList, query_frequency: str, *args, **kwargs):
   """
   Initialize CoinPrices with a list of assets and query frequency.

   Parameters:
   asset_list (ModelList): List of assets to track.
   query_frequency (str): Frequency of querying the prices (e.g., '1h').
   """
   self.asset_list = asset_list
   self.asset_symbols_filter = [a.internal_id for a in asset_list]
   self.ex = ccxt.binance()
   self.query_frequency = query_frequency
   super().__init__(*args, **kwargs)

Now to the processing logic within `update_series_from_source`.

The function queries the price data of the assets and returns it in a pandas DataFrame, which is then automatically stored in the database.


def update_series_from_source(self, latest_value, **class_arguments):
       """
       Update coin prices time series.

       Parameters:
       latest_value: The latest price time value in the database, none if there are no values in the database

       Returns:
       pd.DataFrame: DataFrame containing price data that gets stored in the database
       """
       current_time = datetime.now(tz=pytz.utc)
       max_lookback_time = (current_time - timedelta(days=self.MAX_DAYS_LOOKBACK)).timestamp()

       if latest_value:
           last_query_times_per_coin = self.metadata["sourcetableconfiguration"]["multi_index_stats"]["max_per_asset_symbol"]
       else:
           last_query_times_per_coin = {}

       prices_list = []
       for coin in self.asset_list:
           coin_start_time = max_lookback_time
           if coin.id in last_query_times_per_coin:
               # coin has already been queried
               offset = pd.Timedelta("5ms")
               coin_start_time = (pd.to_datetime(last_query_times_per_coin[coin.id]) + offset).timestamp()

           ohlcv = self.ex.fetch_ohlcv(coin.symbol + "/USD", self.query_frequency, since=int(coin_start_time * 1000))

           df = pd.DataFrame(ohlcv, columns=["date", "open", "high", "low", "close", "volume"])

           df["time_index"] = pd.to_datetime(df["date"], unit="ms").dt.tz_localize("UTC")
           df["asset_symbol"] = coin.id

           print(f"get {len(df)} new price data points for coin {coin.name} from {datetime.fromtimestamp(coin_start_time, tz=pytz.utc)} to {current_time}")
           prices_list.append(df)

       if len(prices_list):
           prices = pd.concat(prices_list, axis=0)
           prices = prices.set_index(["time_index", "asset_symbol"])
           return prices
       else:
           return pd.DataFrame()

The parameter `latest_value` is convenient, as it contains the timestamp of the most recent update in the database. If no data exists in the table, `latest_value` is None, allowing us to filter our query to only retrieve data updated after `latest_value`.

As this node processes data for multiple coins, we need to retrieve the latest values for all individual coins stored in the database. This is done by accessing the metadata information and table configuration.


last_query_times_per_coin = self.metadata["sourcetableconfiguration"]["multi_index_stats"]["max_per_asset_symbol"]

We then iterate over each coin and check if the coin has been queried before. If it has, we add a small offset to avoid querying the same value twice.


coin_start_time = (pd.to_datetime(last_query_times_per_coin[coin.id]) + offset).timestamp()

For this query, we use the `ccxt` library, which provides uniform access to data from various exchanges. The asset data must include two columns to be stored in the database:

  • `time_index`:The timestamp of the data point in milliseconds and UTC format.
  • `asset_symbol`: The coin ID as specified in VAM.

ohlcv = self.ex.fetch_ohlcv(coin.symbol + "/USD", self.query_frequency, since=int(coin_start_time * 1000))

df = pd.DataFrame(ohlcv, columns=["date", "open", "high", "low", "close", "volume"])
df["time_index"] = pd.to_datetime(df["date"], unit="ms").dt.tz_localize("UTC")
df["asset_symbol"] = coin.id

Using CoinPrices Data in Other Nodes

The coin prices data can be readily used by downstream nodes. The CoinPrice node is initialized with the assets to track, the query frequency, and also specifies which parameters can change yet still be stored in the same table.

This is done using the `local_kwargs_to_ignore` parameter, which is set to ignore the asset list. This means that if the node is called with a different asset list, all the data is still stored in the same table, allowing for easy creation of new strategies and maintainability.


self.prices = CoinPrices(
   asset_list=asset_list,
   query_frequency=price_query_frequency,
   local_kwargs_to_ignore=["asset_list"],
)

Data can be accessed using pre-defined functions on the CoinPrices object. For example, to calculate the returns for our portfolio, we can use the function `self.prices.get_df_greater_than(latest_value)` to retrieve all price data after a certain time value. If `latest_value` is None, all the data will be returned. This method is helpful for obtaining only the data necessary for calculations, which significantly speeds up execution.

MarketCaps Node

This node is responsible for retrieving market capitalization data to calculate the asset weights within the portfolio. In a similar way to the price node, we utilize the CoinGecko API to fetch daily market capitalizations.

PortfolioWeights Node

This node calculates daily portfolio weights based on market capitalizations. It includes the MarketCaps node as an attribute to access the necessary data. We also apply the `local_kwargs_to_ignore` parameter to manage asset list changes.


self.base_feature = MarketCaps(
   asset_list=asset_list,
   local_kwargs_to_ignore=["asset_list"],
)

Within the `update_series_from_source` function, we compute new portfolio weights for all days since the last value:


def update_series_from_source(self, latest_value, **class_arguments):
   """
   Update portfolio weights time series.

   Parameters:
   latest_value: The latest portfolio weight time value in the database, none if there are no values in the database

   Returns:
   pd.DataFrame: Updated DataFrame containing portfolio weights data that gets stored in the database
   """
   data = self.base_feature.get_df_greater_than(latest_value)
   total_market_cap = data.groupby("time_index")["market_cap"].transform("sum")
   data["portfolio_weight"] = data["market_cap"] / total_market_cap

   print(f"{len(data)} new portfolio weights are calculated.")

   return data.drop(columns=["market_cap"])

PortfolioBacktest Node

This node integrates previous computations to calculate the portfolio returns. It includes both the portfolio weights and price data as attributes. It's important to note that we do not use `local_kwargs_to_ignore` for the PortfolioWeights node because each weight calculation depends on the entire asset list.


self.weights = PortfolioWeights(
   asset_list=asset_list,
)

self.prices = CoinPrices(
   asset_list=asset_list,
   query_frequency=price_query_frequency,
   local_kwargs_to_ignore=["asset_list"],
)

The `update_series_from_source` function retrieves both prices and weights. We adjust for a minimum data range by modifying the `latest_value` to ensure that at least two price points are included. The weights are from the last quarter day, which is typical for ETFs.


def update_series_from_source(self, latest_value, **class_arguments):
   """
   Update portfolio returns time series.

   Parameters:
   latest_value: The latest portfolio return time value in the database, none if there are no values in the database

   Returns:
   pd.DataFrame: DataFrame containing portfolio returns data that gets stored in the database
   """

   if latest_value is not None:
       # adapt latest_value to get at least two prices
       latest_value = pd.Timestamp(latest_value) - pd.Timedelta("5ms")

   prices = self.prices.get_df_greater_than(latest_value)[["open"]].reset_index()
   prices = prices.pivot(index="time_index", columns="asset_symbol", values="open")

   if len(prices) < 2:
       print(f"0 new portfolio returns are calculated.")
       return pd.DataFrame()

   returns = (prices / prices.shift(1)) - 1

   end_last_quarter = pd.to_datetime(datetime.today() - pd.tseries.offsets.QuarterEnd(), unit="ms", utc=True).normalize()
   portfolio_weights = self.weights.get_df_between_dates(end_last_quarter, end_last_quarter)
   portfolio_weights = portfolio_weights.reset_index().pivot(
       index="time_index", columns="asset_symbol", values="portfolio_weight"
   )
   assert len(portfolio_weights) == 1

   portfolio_returns = pd.DataFrame(
       (portfolio_weights[prices.columns].to_numpy() * returns).sum(axis=1)
   )

   print(f"{len(portfolio_returns)} new portfolio returns are calculated.")
   return portfolio_returns

Running the Pipeline

To run the pipeline, we first need to create the local pickle files for the individual nodes, which are then used by the scheduler. If code changes are made without altering the signatures, the local pickle files need to be manually deleted.

The Scheduler executes the Pipeline by distributing the nodes to Ray tasks. The nodes are executed in parallel if there are no dependencies.

For this example however, we execute the pipeline in debug mode and run it in a single process without parallelization.


ts1 = PortfolioBacktest(
   asset_list=ModelList(assets),
   price_query_frequency="1h"
)

if os.path.isfile(ogm.get_ts_pickle_path(local_hash_id=ts1.local_hash_id)) == False:
 lpm = ts1.local_persist_manager  # call lpm to guarantee ts exist
 pickle_path, ts = get_or_pickle_ts_from_sessions(
     local_hash_id=ts1.local_hash_id,
     remote_table_hashed_name=ts1.remote_table_hashed_name,
     set_dependencies_df=True,
     return_ts=True,
 )

DEBUG_RUN = True
SchedulerUpdater.debug_schedule_ts(
 time_serie_hash_id=ts1.local_hash_id,
 break_after_one_update=True,
 run_head_in_main_process=True,
 wait_for_update=False,
 force_update=True,
 debug=DEBUG_RUN,
 update_tree=True,
)

Conclusion

In this tutorial we presented a simple way to use TDAG for a market-weight strategy with the top crypto altcoins. The pipeline dynamically adjusts portfolio weights based on market caps and rebalances quarterly. This guide covered setting up the pipeline, defining node logic, and running the pipeline, giving you the building block for creating more advanced trading strategies.

With the flexibility of TDAG, you can now experiment with different assets, rebalance intervals, or trading rules. Should you have any questions or encounter issues, our community and documentation are readily available to assist.