marketflow package

Submodules

marketflow.ITCHbin module

This script is based on the ITCH v5.0 spec.

It assumes their squirrelly binary message format.

class marketflow.ITCHbin.ITCHv5(fname)

Bases: object

Convert an ITCH v5.0 file to reasonable records

Currently, we redundantly unpack and keep/print the record type indicator

Struct documented here: https://docs.python.org/3.4/library/struct.html#format-characters

ITCH v5.0 record formats detailed in section 4.1 here: http://www.nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/NQTVITCHspecification.pdf

I’m using the equivalent p/s codes to indicate unevaluated bytes vs actual ascii / “alpha” data

Using ord here to make lookups more straightforward with bytes (not sure why rec[0] below doesn’t return a byte...)

base_fname()

Get the file name, excluding any filepath chars, file extensions, and version numbers

print_records()

This could be redefined to get what kind of output you like

rec_types = {65: '>c2h6pqci8sl', 66: '>c2h6pq', 67: '>c2h6pqlqcl', 68: '>c2h6pq', 69: '>c2h6pqlq', 70: '>c2h6pqci8sl4s', 72: '>c2h6p8s2c4s', 73: '>c2h6p2qc8s3l2c', 75: '>c2h6p8sicl', 76: '>c2h6p4s8s3c', 80: '>c2h6pqcl8slq', 81: '>c2h6pq8slqc', 82: '>c2h6p8s2ci2c2s5cic', 83: '>c2h6pc', 85: '>c2h6p2q2l', 86: '>c2h6p3q', 87: '>c2h6pc', 88: '>c2h6pql', 89: '>c2h6p8sc'}
records()

Generator that returns unpacked records from self.fname

Returns records as a list. Note that ascii strings are returned as bytes.

std_prefix = '>c2h6p'
to_fixed_width()

Output the records to a fixed-width text for each message type

to_string(b)

Try to decode b to ascii

This is why people don’t like Python 3

marketflow.ITCHbin.main()

marketflow.clean_dsenames module

class marketflow.clean_dsenames.Permno_Map(dsefile='crsp/dsenames.csv')

Bases: object

  1. Reads in dsenames file from crsp
  2. Subsets
drop_dups(dsenames)

Consolidates multiple records for same ticker symbol into one by collapsing trading date range

dse_rootsplit(dsenames)

Splits the root and the suffix into two separate variables, SYM_ROOT and SYM_SUFFIX and flags suffix extraction cases FLAG index:

=0 : base case, symbol has no suffix =1 : NASDAQ, share class =2 : NASDAQ, foreign shares or voting/non-voting shares =3 : NASDAQ, reverse stock split =4 : non-NASDAQ, share class suffix

Includes manual adjustments for idiosyncratic securities, should be re-evaluated from time to time

dse_subset(dsenames, date=20100101, regular=True, active=True, beneficial=False, when_issued=False)
Limit to our “good” set of securities.
Default settings include securites that are actively trading in normal fashion on some exchange
date : int
Not really an int, but the naïve conversion from the datestring.
regular : bool
Limit to “regular” stocks. i.e. the security is past the “When-Issued” stage and the company is not going through bankruptcy proceedings)
active : bool
Limit to entries for stocks that are actively trading
beneficial : bool
If =False, we exclude stocks that are “shares of beneficial interest”, which indicates that the stocks are not trading normally due to their inclusion in some sort of trust.
when_issued : bool
If =False, we exclude when_issued shares, which have been approved for trading but have not yet begun trading actively
get_permno(cd, root, date)

Get the permno for a given symbol root.

Remember, permno does not change with suffix.

process(dsenames)

Run all processing steps in a reasonable order

marketflow.dsf_with_sic module

class marketflow.dsf_with_sic.DSF_SIC_Map(dsffile='crsp/dsf.csv', sicfile='sic_codes.txt')

Bases: object

docstring for SIC_Map

dsf_startdate(date=20100101)
dsf_subset(to_keep=['PERMNO', 'DATE', 'PRC', 'VOL', 'SHROUT', 'RET', 'HSICCD'])
process(day=20100101, columns=['PERMNO', 'DATE', 'PRC', 'VOL', 'SHROUT', 'RET', 'HSICCD'])
sic_merge()

marketflow.hdf5 module

Work with TAQ data and HDF5 files using pytables

We’re not sure this is the best way to go, but it’s a reasonable place to start for a standard binary format

class marketflow.hdf5.H5Writer(h5_fname, title=None, filters=None)

Bases: object

Set up an hdf5 file, write tables from numpy struct arrays.

If a table already exists, default behavior is to append.

append(path, name, data)

Put data in a table at path. Create the table if needed.

path : str
‘/’-separated path from the root
data : compatible array/buffer object
E.g., numpy structured array

XXX currently, we are not being very smart about chunkshape. We should revisit. If we get two chunks for the same location, but with different dtypes, this function will try to do an append that won’t work!

finalize_hdf5()
set_table_type(target_dtype)

Convert NumPy dtype to PyTable descriptor (adapted from blaze.pytables). E.g.: ——– >>> dt = np.dtype([(‘name’, ‘S7’), (‘amount’, ‘i4’), (‘time’, ‘M8[us]’)]) >>> this.set_table_type(dt) # doctest: +SKIP {‘amount’: Int32Col(shape=(), dflt=0, pos=1),

‘name’: StringCol(itemsize=7, shape=(), dflt=’‘, pos=0)}
tb_desc = None
marketflow.hdf5.conv_to_hdf5(taq_name, h5_name)

Read raw bytes from TAQ, write to HDF5

marketflow.hdf5.taq2h5(overwrite=False)

Basic conversion from zip file to HDF5, use like this:

$ taq2h5 ../../local_data/EQY_US_ALL_BBO_201502*.zip

(It’s installed as a package script)

marketflow.processing module

Tools for operating on chunks of financial time-series data.

marketflow.processing.Downsample(iterator_in, p=0.001)

Return a random set of records for each chunk, with probability p for each record

marketflow.processing.JoinedChunks(iterator_in, columns, row_limit=inf)

If a chunk matches the columns from a previous chunk, concatenate!

The logic only inspects the first record. row_limit is provided to help ensure memory limits. But is NOT a limit on records in memory (you can have about the row_limit + size of the base chunks coming off disk)

class marketflow.processing.ProcessChunk(iterator_in, *args, **kwargs)

Bases: object

An abstract base class for processing chunks.

A class-based structure is unnecessary in the straightforward generator functions above. But once we start having a bit more structure, this allows something with a bit more flexibility.

Probably we should be using Dask or Blaze or something. Next step, maybe?

class marketflow.processing.Sanitizer(iterator_in, *args, **kwargs)

Bases: marketflow.processing.ProcessChunk

Take a TAQ file and make it fake while preserving structure

ascii_bytes = b'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
fake_symbol_replace(chunk, symbol_column='Symbol_root')

Make a new fake symbol if we don’t have it yet, and return it

fudge_columns = ['Bid_Price', 'Bid_Size', 'Ask_Price', 'Ask_Size']
fudge_up(chunk)

Increase each entry in column by some random increment.

Make sure the values stay monotonic, and don’t get bigger than max_value.

symbol_map = {}
class marketflow.processing.SplitChunks(iterator_in, *args, **kwargs)

Bases: marketflow.processing.ProcessChunk

return_format(chunk)

Return a chunk in the requested format

marketflow.raw_taq module

Basic, efficient interface to TAQ data using numpy

A central design goal is minimizing external dependencies

class marketflow.raw_taq.BytesSpec(bytes_per_line, computed_fields=None)

Bases: object

A description of the records in raw TAQ files

check_present_fields()

self.initial_dtype_info should be of form, we encode newline info here!

[(‘Time’, 9),
(‘Exchange’, 1), ...

]

Assumption is that the last field is a newline field that is present in all versions of BBO

convert_dict = {'Bid_Size': <class 'numpy.int32'>, 'hour': <class 'numpy.int8'>, 'Bid_Price': <class 'numpy.float64'>, 'Sequence_Number': <class 'numpy.int64'>, 'minute': <class 'numpy.int8'>, 'Ask_Price': <class 'numpy.float64'>, 'msec': <class 'numpy.uint16'>, 'Ask_Size': <class 'numpy.int32'>}
convert_dtype = [('hour', <class 'numpy.int8'>), ('minute', <class 'numpy.int8'>), ('msec', <class 'numpy.uint16'>), ('Bid_Price', <class 'numpy.float64'>), ('Bid_Size', <class 'numpy.int32'>), ('Ask_Price', <class 'numpy.float64'>), ('Ask_Size', <class 'numpy.int32'>), ('Sequence_Number', <class 'numpy.int64'>)]
initial_dtype_info = [('hour', 2), ('minute', 2), ('msec', 5), ('Exchange', 1), ('Symbol_root', 6), ('Symbol_suffix', 10), ('Bid_Price', 11), ('Bid_Size', 7), ('Ask_Price', 11), ('Ask_Size', 7), ('Quote_Condition', 1), ('Market_Maker', 4), ('Bid_Exchange', 1), ('Ask_Exchange', 1), ('Sequence_Number', 16), ('National_BBO_Ind', 1), ('NASDAQ_BBO_Ind', 1), ('Quote_Cancel_Correction', 1), ('Source_of_Quote', 1), ('Retail_Interest_Indicator_RPI', 1), ('Short_Sale_Restriction_Indicator', 1), ('LULD_BBO_Indicator_CQS', 1), ('LULD_BBO_Indicator_UTP', 1), ('FINRA_ADF_MPID_Indicator', 1), ('SIP_generated_Message_Identifier', 1), ('National_BBO_LULD_Indicator', 1)]
passthrough_strings = ['Exchange', 'Symbol_root', 'Symbol_suffix', 'Quote_Condition', 'Market_Maker', 'Bid_Exchange', 'Ask_Exchange', 'National_BBO_Ind', 'NASDAQ_BBO_Ind', 'Quote_Cancel_Correction', 'Source_of_Quote', 'Retail_Interest_Indicator_RPI', 'Short_Sale_Restriction_Indicator', 'LULD_BBO_Indicator_CQS', 'LULD_BBO_Indicator_UTP', 'FINRA_ADF_MPID_Indicator', 'SIP_generated_Message_Identifier', 'National_BBO_LULD_Indicator']
target_dtype

We’re being careful about operations on this value!

class marketflow.raw_taq.TAQ2Chunks(taq_fname, chunksize=None, do_process_chunk=True)

Bases: object

Read in raw TAQ BBO file, and return numpy chunks (cf. odo)

DEFAULT_CHUNKSIZE = 1000000
chunks(numlines, infile)

Do the conversion of bytes to numpy “chunks”

day = None
first_line = None
month = None
numlines = None
process_chunk(all_bytes)

Convert the structured ndarray all_bytes to the target_dtype

If you did not specify do_process_chunk, you might run this yourself on chunks that you get from iteration.

year = None

marketflow.utility module

A set of utilities for financial data analysis

marketflow.utility.timeit(method)

Return a function that behaves the same, except it prints timing stats.

Lightly modified from Andreas Jung. Unlicensed, but simple enough it should not be a license issue:

Module contents