rwson

rwson

一个前端开发

redux-logic源码阅读

在用ReactRedux做开发时, 都会用到异步的一些东西, 之前更多的用的是redux-thunk或者redux-saga之类的, 但是都有用的不顺的地方, 有一次突然发现redux-logic是一个很不错的解决方案, 用起来也感觉很顺手, 与市面上其他redux中间件不同的分析都在这里, 感兴趣的可以自己查看。

首先我们来看下redux-logic的基本用法:


//	logic/index.js
import { createLogic } from 'redux-logic';

const someLogic = createLogic({
	//	当前logic监听的actionType
	type: 'SOME_ACTION_TYPE',

	//	取消当前logic执行的actionType
	cancelType: 'CANCEL_TYPE',

	//	是否获取最后一个返回
	latest: true,

	//	当前actionType的业务逻辑
	async process({ getState, action, cancelled }, dispatch, done) {
		const res = await asyncFn();
		dispatch(newAction({
			...res
		}));
		done();
	}
});

export someLogic;

//	store/index.js
import { createStore, applyMiddleware } from 'redux';
import { createLogicMiddleware } from 'redux-logic';
import { routerMiddleware } from 'react-router-redux';

import reducer from '../reducers';
import history from '../history';
import { someLogic } from '../logic';

const configureStore = () => {
    const reduxRouteMiddleware = routerMiddleware(history),

		//	将所有的logic应用到中间件中
    	loggicMiddleware = createLogicMiddleware([someLogic]),
    	middlewares = process.env.NODE_ENV === 'development' ? [loggicMiddleware, reduxRouteMiddleware, logger] : [loggicMiddleware, reduxRouteMiddleware],
        createStoreWithMiddleware = applyMiddleware(...middlewares)(createStore),
        store = createStoreWithMiddleware(reducer);
    return store;
};

export default configureStore();

本文会逐步分析redux-logic的相关实现, 首先把redux-logic克隆到本地, 源码都位于redux-logic/src里面, 所以我们从src/index.js开始:

import createLogic, { configureLogic } from './createLogic';

import createLogicMiddleware from './createLogicMiddleware';

export {
  configureLogic,
  createLogic,
  createLogicMiddleware
};

export default {
  configureLogic,
  createLogic,
  createLogicMiddleware
};

从上面的代码中可以看到, 一共导出3个函数, configureLogic的功能就是对所有的logic进行配置(超时警告时间等), 我们主要看createLogiccreateLogicMiddleware:

  • createLogic

createLogic位于src/createLogic

/**
 * 创建一个Logic对象
 * @param  {Object} logicOptions
 *         @param  {String}     logicOptions.name           可选值, 主要用于Logic中的异常提示, 可选值
 *         @param  {String}     logicOptions.type           触发当前Logic的redux action type
 *         @param  {String}     logicOptions.cancelType     取消执行当前Logic的redux action type
 *         @param  {Boolean}    logicOptions.latest         是否只获取最后一次的结果,类似redux-saga中的takeLatest effect
 *         @param  {Number}     logicOptions.debounce       函数去抖配置, 单位为毫秒
 *         @param  {Object}     logicOptions.throttle       函数节流配置, 单位为毫秒
 *         @param  {Function}   logicOptions.validate       在执行process之前的一个钩子, 可以对当前action执行一些操作
 *         @param  {Function}   logicOptions.transform      validate的一个别名, validate和transform只需指定一个即可
 *         @param  {Function}   logicOptions.process        当前redux action type对应的处理逻辑(发起异步请求, 在异步请求返回成功之后触发新的redux action)
 *         @param  {Object}     logicOptions.processOptions process中需要的一些配置
 *         @param  {Number}     logicOptions.warnTimeout    超时警告时间, 默认60秒, 需要在process中手动调用done来终止这个Logic, 如果是一个持续性的Logic, warnTimeout需要设置成0
 * @return {Object}              创建出来的Logic
 */
export default function createLogic(logicOptions = {}) {
    //  无效配置项验证, 把无效的配置项键名放数组返回
    const invalidOptions = getInvalidOptions(logicOptions, allowedOptions);
    if (invalidOptions.length) {
        throw new Error(`unknown or misspelled option(s): ${invalidOptions}`);
    }

    //  name, type, cancelType, validate, transform都从传入的logicOptions里面获取
    //  如果其他配置项没有在logicOptions中声明, 就从默认配置中获取或者给一个默认值
    const {
        name,
        type,
        cancelType,
        warnTimeout = defaultOptions.warnTimeout,
        latest = defaultOptions.latest,
        debounce = defaultOptions.debounce,
        throttle = defaultOptions.throttle,
        validate,
        transform,
        process = emptyProcess,
        processOptions = {}
    } = logicOptions;

    //  type必传
    if (!type) {
        throw new Error('type is required, use \'*\' to match all actions');
    }

    //  validate和tranform只能同时指定一个
    if (validate && transform) {
        throw new Error('logic cannot define both the validate and transform hooks they are aliases');
    }

    //  warnTimeout需要放在processOptions同级
    if (typeof processOptions.warnTimeout !== 'undefined') {
        throw new Error('warnTimeout is a top level createLogic option, not a processOptions option');
    }

    //  获取processOptions中的无效配置项
    const invalidProcessOptions = getInvalidOptions(processOptions, allowedProcessOptions);
    if (invalidProcessOptions.length) {
        throw new Error(`unknown or misspelled processOption(s): ${invalidProcessOptions}`);
    }

    //  如果validate和transform都没传入,就用默认的, 否则就用传入的validate
    const validateDefaulted = (!validate && !transform) ?
        identityValidation :
        validate;

    //  如果在processOptions里面指定了dispatchMultiple, warnTimeout应该是0
    if (NODE_ENV !== 'production' &&
        typeof processOptions.dispatchMultiple !== 'undefined' &&
        warnTimeout !== 0) {
        console.error(`warning: in logic for type(s): ${type} - dispatchMultiple is always true in next version. For non-ending logic, set warnTimeout to 0`);
    }

    /**
        根据process.length可以获取传入的process对应的处理函数中的形参个数
        从而确定processOption中的一些默认值

        const fn = function(arg1, agr2) {};
        console.log(fn.length);  -> 2

        const fn = function(arg1, agr2, arg3) {};
        console.log(fn.length); -> 3
    **/
    switch (process.length) {
        //  如果没有或只有一个形参没有传入且dispatchReturn没有在processOptions传入, 就把它设置成true
        case 0:
        case 1:
            setIfUndefined(processOptions, 'dispatchReturn', true);
            break;

        //  两个形参(single-dispatch模式[已废弃])
        case 2:
            if (NODE_ENV !== 'production' &&
                !processOptions.dispatchMultiple &&
                warnTimeout !== 0) {
                console.error(`warning: in logic for type(s): ${type} - single-dispatch mode is deprecated, call done when finished dispatching. For non-ending logic, set warnTimeout: 0`);
            }
            break;

        /**
            3个形参及更多, 认为是multi-dispatch模式
            processOptions.dispatchMultiple = processOptions === undefined ? true : processOptions.dispatchMultiple
        **/
        case 3:
        default:
            setIfUndefined(processOptions, 'dispatchMultiple', true);
            break;
    }

    //  返回处理好的对象
    return {
        name: typeToStrFns(name),
        type: typeToStrFns(type),
        cancelType: typeToStrFns(cancelType),
        latest,
        debounce,
        throttle,
        validate: validateDefaulted,
        transform,
        process,
        processOptions,
        warnTimeout
    };
}

从上面的分析中我们可以得出: 在createLogic中做的主要是一些参数的验证和默认值的处理, 下面我们一起看看他里面的调用的一些的实现:

先来看下getInvalidOptions:

   function getInvalidOptions(options, validOptions) {
	  return Object.keys(options)
	    .filter(k => validOptions.indexOf(k) === -1);
   }

	/**	比如在createLogic里的第一行就调用了该方法

	export default function createLogic(logicOptions = {}) {
	  const invalidOptions = getInvalidOptions(logicOptions, allowedOptions);
	  //	...
	}

	
	const allowedOptions = [
	  'name',
	  'type',
	  'cancelType',
	  'latest',
	  'debounce',
	  'throttle',
	  'validate',
	  'transform',
	  'process',
	  'processOptions',
	  'warnTimeout'
	];
	
	**/

从上面的分析中我们可以看出, getInvalidOptions用来获取Object里的不合法的配置项, 并把非法的key作为数组的形式返回。

typeToStrFns

//	如果是数组形式就针对数组的每一项都调用typeToStrFns, 返回一个新数组
//	如果是函数形式就返回函数体的字符串形式
//	其它直接返回
function typeToStrFns(type) {
  if (Array.isArray(type)) { return type.map(x => typeToStrFns(x)); }
  return (typeof type === 'function') ?
    type.toString() :
    type;
}

setIfUndefined

function setIfUndefined(obj, propName, propValue) {
  if (typeof obj[propName] === 'undefined') {
    obj[propName] = propValue;
  }
}

没什么好说的😂

  • createLogicMiddleware createLogic分析完了, 下面一起看看createLogicMiddleware, createLogicMiddleware位于src/createLogicMiddleware里:
/**
	@param arrLogic  Array.<logic>
	@param deps 	 Object
**/
export default function createLogicMiddleware(arrLogic = [], deps = {}) {
  if (!Array.isArray(arrLogic)) {
    throw new Error('createLogicMiddleware needs to be called with an array of logic items');
  }

	//	判断是否有重复的logic
  const duplicateLogic = findDuplicates(arrLogic);
  if (duplicateLogic.length) {
    throw new Error(`duplicate logic, indexes: ${duplicateLogic}`);
  }

	/**
		因为redux-logic集成了rxjs
		所以Subject和BehaviorSubject都是rxjs里的主体
		BehaviorSubject作为Subject的一个变体, 和Subject不同的是它有一个初始值
		https://cn.rx.js.org/manual/overview.html#h15
	**/
  const actionSrc$ = new Subject();

	//	用来监视所有action
  const monitor$ = new Subject();
  const lastPending$ = new BehaviorSubject({ op: OP_INIT });

	//	对monitor$使用累加器函数,返回生成的中间值,可选的初始值
  monitor$
    .scan((acc, x) => { // 追加一个pending状态的计数器
      let pending = acc.pending || 0;
      switch (x.op) {
        case 'top' : 		// 当前action位于logic栈的顶级
        case 'begin' : 		// 开始触发一个action
          pending += 1;
          break;

		/**
			在createLogic里的process中调用了done, done的实现稍候分析;
		**/
        case 'end':
        case 'bottom': 			// action变成了一个被转换后的新action

		/**
			在createLogic里的validate中调用了allow, 且触发了一个新的action
			craeteLogic({
				...
				validate({ getState, action }, allow, reject) {
					allow(action);
				}
			})
		**/
        case 'nextDisp' :
        case 'filtered' : 		// 	当前action由于无效被过滤
        case 'dispatchError' : // 	派发action的时候异常(好像暂时没用到)

		/**
			在action拦截器(validate)里面执行reject
			craeteLogic({
				...
				validate({ getState, action }, allow, reject) {
					reject(action);
				}
			})
		**/
        case 'cancelled':
          pending -= 1;
          break;
      }
      return {
        ...x,
        pending
      };
    }, { pending: 0 })

	//	https://cn.rx.js.org/manual/usage.html
    .subscribe(lastPending$);

  let savedStore;
  let savedNext;
  let actionEnd$;
  let logicSub;
  let logicCount = 0;

	//	缓存传入的logic数组
  let savedLogicArr = arrLogic;

	//	调用完createLogicMiddleware后返回的redux中间件
  function mw(store) {
	//	多次调用了createLogicMiddleware, 并且传入了不同的store
    if (savedStore && savedStore !== store) {
      throw new Error('cannot assign logicMiddleware instance to multiple stores, create separate instance for each');
    }

	//	缓存本次调用传入的store
    savedStore = store;

    return next => {
      savedNext = next;

		//	从applyLogic返回中获取action$, sub, 把logicCount赋值给cnt
      const { action$, sub, logicCount: cnt } =
            applyLogic(arrLogic, savedStore, savedNext,
                       logicSub, actionSrc$, deps, logicCount,
                       monitor$);
      actionEnd$ = action$;
      logicSub = sub;
      logicCount = cnt;

      return action => {
        debug('starting off', action);
        monitor$.next({ action, op: 'top' });
        actionSrc$.next(action);
        return action;
      };
    };
  }

  //	给mw挂载一个指向monitor$的monitor$属性
  mw.monitor$ = monitor$;

    /**
     * 一个自定义钩子函数, 主要用于测试
     * @param  {Function} fn   可写入相关测试逻辑
     * @return {Objetc}
     */
    mw.whenComplete = function whenComplete(fn = identity) {
        return lastPending$
            //  只有当x.pending > 0是才继续往下走
            .takeWhile(x => x.pending)
            .map(( /* x */ ) => undefined)
            .toPromise()
            .then(fn);
    };

	/**
		给当前redux中间件动态添加依赖(相当于createLogicMiddleware第二个参数的拓展)
	**/
  mw.addDeps = function addDeps(additionalDeps) {
	//	所添加的依赖必须是一个对象类型
    if (typeof additionalDeps !== 'object') {
      throw new Error('addDeps should be called with an object');
    }

	//	遍历所添加的中间件
    Object.keys(additionalDeps).forEach(k => {
      const existing = deps[k];
      const newValue = additionalDeps[k];
		//	所添加的中间件不能和当前已有的重名(当前已经有的不能被覆盖)
      if (typeof existing !== 'undefined' &&
          existing !== newValue) {
        throw new Error(`addDeps cannot override an existing dep value: ${k}`);
      }
      deps[k] = newValue;
    });
  };

	/**
		给当前redux中间件动态添加新的logic
		@param arrNewLogic Array.<Logic>
	**/
  mw.addLogic = function addLogic(arrNewLogic) {
    if (!arrNewLogic.length) { return { logicCount }; }

	//	合并到当前已有的数组里面
    const combinedLogic = savedLogicArr.concat(arrNewLogic);
    const duplicateLogic = findDuplicates(combinedLogic);

	//	判断是否有重复
    if (duplicateLogic.length) {
      throw new Error(`duplicate logic, indexes: ${duplicateLogic}`);
    }
    const { action$, sub, logicCount: cnt } =
          applyLogic(arrNewLogic, savedStore, savedNext,
                     logicSub, actionEnd$, deps, logicCount, monitor$);
    actionEnd$ = action$;
    logicSub = sub;
    logicCount = cnt;
    savedLogicArr = combinedLogic;
    debug('added logic');
    return { logicCount: cnt };
  };

	/**
		给当前redux中间件合并新的logic
		@param arrNewLogic Array.<Logic>
	**/
  mw.mergeNewLogic = function mergeNewLogic(arrMergeLogic) {
    // 判断是否重名
    const duplicateLogic = findDuplicates(arrMergeLogic);
    if (duplicateLogic.length) {
      throw new Error(`duplicate logic, indexes: ${duplicateLogic}`);
    }

    // 过滤掉重复的
    const arrNewLogic = arrMergeLogic.filter(x =>
      savedLogicArr.indexOf(x) === -1);
    return mw.addLogic(arrNewLogic);
  };

  /**
	替换当前所有的logic变成新的logic
   **/
  mw.replaceLogic = function replaceLogic(arrRepLogic) {
    const duplicateLogic = findDuplicates(arrRepLogic);

	//	判断新的logic数组里是否有重复的logic
    if (duplicateLogic.length) {
      throw new Error(`duplicate logic, indexes: ${duplicateLogic}`);
    }
    const { action$, sub, logicCount: cnt } =
          applyLogic(arrRepLogic, savedStore, savedNext,
                     logicSub, actionSrc$, deps, 0, monitor$);
    actionEnd$ = action$;
    logicSub = sub;
    logicCount = cnt;
    savedLogicArr = arrRepLogic;
    debug('replaced logic');
    return { logicCount: cnt };
  };

  return mw;
}

从上面的分析中我们可以看出, createLogicMiddleware返回了一个中间件, 该中间件基于Rxjs实现, 返回的中间件中处理相关redux action对应的逻辑。

createLogicMiddleware中调用了一些其他函数, 逐个看下这些函数的实现:

findDuplicates, 该方法完成查找数组里面重复的Logic, 并记录重复的下标返回

/**
	@param arrLogic  Array.<Logic>   logic数组
**/
/**
 * @param  {Array.<Logic>}  arrLogic Logic数组
 * @return {Array.<Number>}          重复的Logic下标
 */
function findDuplicates(arrLogic) {
  return arrLogic.reduce((acc, x1, idx1) => {
    //  不是同一个下标, 且值相等的情况下就把下标放到acc里面
    if (arrLogic.some((x2, idx2) => (idx1 !== idx2 && x1 === x2))) {
      acc.push(idx1);
    }
    return acc;
  }, []);
}

再看看看applyLogic的实现:

/**
 * @param  {Array.<Logic>}   arrLogic        Logic数组
 * @param  {Object}         store            redux store
 * @param  {Function}       next             redux中间件中的第二层函数
 * @param  {Rx.Subject}     sub              当前action对应的
 * @param  {Rx.Subject}     actionIn$        当前action对应的可订阅对象
 * @param  {Object}         deps             createLogicMiddle的第二个参数
 * @param  {Number}         startLogicCount  用于命名
 * @param  {Rx.Subject}     monitor$         全局可订阅对象
 * @return {Object}
 */
function applyLogic(arrLogic, store, next, sub, actionIn$, deps, startLogicCount, monitor$) {
    if (!store || !next) { throw new Error('store is not defined'); }

    //  如果当前Logic已经是一个Rx.Subject(已经被订阅过了), 取消订阅
    if (sub) { sub.unsubscribe(); }

    //  对当前Logic数组进行操作(命名等), 返回一个新数组
    const wrappedLogic = arrLogic.map((logic, idx) => {
        //  给当前未指定name的Logic进行命名并且返回, naming稍候分析
        const namedLogic = naming(logic, idx + startLogicCount);

        //  包装命名后的Logic, wrapper稍候分析
        return wrapper(namedLogic, store, deps, monitor$);
    });

    const actionOut$ = wrappedLogic.reduce((acc$, wep) => wep(acc$), actionIn$);

    //  订阅新的Observable对象
    const newSub = actionOut$.subscribe(action => {
        debug('actionEnd$', action);
        try {
            const result = next(action);
            debug('result', result);
        } catch (err) {
            console.error('error in mw dispatch or next call, probably in middlware/reducer/render fn:', err);
            const msg = (err && err.message) ? err.message : err;
            monitor$.next({ action, err: msg, op: 'nextError' });
        }
        //  action变成了一个被转换后的新action
        monitor$.next({ nextAction: action, op: 'bottom' });
    });

    return {
        action$: actionOut$,
        sub: newSub,
        logicCount: startLogicCount + arrLogic.length
    };
}

从上面的分析中我们可以看出, applyLogic返回了一个新的Rx.Observable, 里面调用了namingwrapper, 下面我们逐个分析下。

先看naming的实现吧:

/**
 * 判断当前传入的Logic有没有name, 有就不做任何操作直接返回, 没有就给当前Logic添加一个name属性后返回
 * @param  {Object} logic 当前Logic
 * @param  {Number} idx   当前Logic在arrLogic中的下标地址
 * @return {Object}
 */
function naming(logic, idx) {
    if (logic.name) { return logic; }
    return {
        ...logic,
        name: `L(${logic.type.toString()})-${idx}`
    };
}

再看下wrapper的, wrapper是写在src/logicWrapper里的, wrapper返回的是一个Rx.Observable:

/**
 * Logic中相关处理
 * @param  {Object}     options.action   当前action
 * @param  {Object}     options.logic    当前logic
 * @param  {Object}     options.store    redux store
 * @param  {Object}     options.deps     createLogicMiddleware的第二个参数
 * @param  {Rx.Subject} options.cancel$  取消Logic执行的订阅对象
 * @param  {Rx.Subject} options.monitor$ 全局可订阅对象
 * @return {Rx.Observable}
 */
export default function createLogicAction$({ action, logic, store, deps, cancel$, monitor$ }) {

    //  reduxStore.getState()
    const { getState } = store;

    //  从当前logic中取得相关配置参数
    const {
        name,
        warnTimeout,
        process: processFn,
        processOptions: {
            dispatchReturn,
            dispatchMultiple,
            successType,
            failType
        }
    } = logic;

    //  当前Logic的拦截器
    const intercept = logic.validate || logic.transform;

    debug('createLogicAction$', name, action);

    //  开始本次action的执行
    monitor$.next({ action, name, op: 'begin' });

    /**
        1.当前action发生改变
        2.在validate/transform中调用allow
        3.无效的action type
        4.action被取消
        interceptComplete都会变成true, 标记拦截器处理完成
     **/
    let interceptComplete = false;

    //  https://cn.rx.js.org/class/es6/Observable.js~Observable.html#instance-method-take
    const logicAction$ = Observable.create(logicActionObs => {
            //  创建一个主题(只发出一个值), 用来订阅`取消Logic执行的订阅对象`, 在取消本次action后, 通知`取消Logic执行的订阅对象`
            const cancelled$ = (new Subject()).take(1);
            cancel$.subscribe(cancelled$);
            cancelled$
                .subscribe(
                    () => {
                        //  确保cancel不会被调用2次(在createLogicMiddle中追加的pending只会被减一次)
                        if (!interceptComplete) {
                            monitor$.next({ action, name, op: 'cancelled' });
                        } else {
                            monitor$.next({ action, name, op: 'dispCancelled' });
                        }
                    }
                );

            //  如果当前Logic不是一个持续性的, 且没有在warnTimeout / 1000秒内调用done(warnTimeout > 0), 就给出异常提示
            if (NODE_ENV !== 'production' && warnTimeout) {
                Observable.timer(warnTimeout)
                    //  https://cn.rx.js.org/class/es6/Observable.js~Observable.html#instance-method-takeUntil
                    //  https://cn.rx.js.org/class/es6/Observable.js~Observable.html#instance-method-defaultIfEmpty
                    .takeUntil(cancelled$.defaultIfEmpty(true))
                    .do(() => {
                        //	console.error(`warning: logic (${name}) is still running after ${warnTimeout / 1000}s, forget to call done()? For non-ending logic, set warnTimeout: 0`);
                    })
                    .subscribe();
            }

            const dispatch$ = (new Subject())
                .mergeAll()
                .takeUntil(cancel$);

            dispatch$
                /**
                    .do({
                        nextOrObserver: mapToActionAndDispatch,
                        error: mapErrorToActionAndDispatch
                    })
                    这里省略了nextOrObserver和error
                    https://cn.rx.js.org/class/es6/Observable.js~Observable.html#instance-method-do
                 **/
                .do(
                    mapToActionAndDispatch,
                    mapErrorToActionAndDispatch
                )
                .subscribe({
                    error: ( /* err */ ) => {
                        //  在发生异常后, 终止本次acion, 并且取消订阅cancelled$
                        monitor$.next({ action, name, op: 'end' });
                        cancelled$.complete();
                        cancelled$.unsubscribe();
                    },
                    complete: () => {
                        //  本次action处理完成
                        monitor$.next({ action, name, op: 'end' });
                        cancelled$.complete();
                        cancelled$.unsubscribe();
                    }
                });

            //  触发redux里面的action
            function storeDispatch(act) {
                monitor$.next({ action, dispAction: act, op: 'dispatch' });
                return store.dispatch(act);
            }

            /**
             * 适配不同情况的action, 组装后如果是一个有效的redux action, 就调用reduxStore.dispatch
             * @param  {Object} actionOrValue
             */
            function mapToActionAndDispatch(actionOrValue) {
                /**
                    let act;
                    if (isInterceptAction(actionOrValue)) {
                        act = unwrapInterceptAction(actionOrValue);
                    } else {
                        if (successType) {
                            act = mapToAction(successType, actionOrValue, false);
                        } else {
                            act = actionOrValue;
                        }
                    }
                    把下面的代码拆成上面的样子, 大概做了下面几件事情:
                    判断是不是一个拦截器, 如果是就把之前包装的拦截器解包
                    判断processOptions.successType是否存在, 存在就调用mapToAction, 拼出一个新的redux action, 调用reduxSrore.dispatch
                    否则就直接使用actionOrValue
                **/
                const act = (isInterceptAction(actionOrValue)) ? 
                unwrapInterceptAction(actionOrValue) : (successType) ? 
                mapToAction(successType, actionOrValue, false) : actionOrValue;

                if (act) {
                    storeDispatch(act);
                }
            }

            /**
             * 根据actionOrValue的类型来组装可以被reduxStore.dispatch调用的action
             * @param  {any} actionOrValue
             * @return {Object}
             */
            function mapErrorToActionAndDispatch(actionOrValue) {
                //  拦截器类型的直接调用触发__interceptAction
                if (isInterceptAction(actionOrValue)) {
                    const interceptAction = unwrapInterceptAction(actionOrValue);
                    return storeDispatch(interceptAction);
                }

                //  判断Logic中的processOptions里有没有failType
                if (failType) {
                    //  如果有failType, 组装一个新的redux action并触发
                    const act = mapToAction(failType, actionOrValue, true);
                    if (act) {
                        return storeDispatch(act);
                    }
                    return;
                }

                //  actionOrValue本身就是一个异常
                if (actionOrValue instanceof Error) {
                    const act =
                        //  actionOrValue本身包含type, 直接调用redux.dispatch(actionOrValue)
                        //  否则包装出一个redux action(type为UNHANDLED_LOGIC_ERROR), 在调用redux.dispatch
                        (actionOrValue.type) ? actionOrValue :
                        {
                            type: UNHANDLED_LOGIC_ERROR,
                            payload: actionOrValue,
                            error: true
                        };
                    return storeDispatch(act);
                }

                //  actionOrValue是一个plain object或一个函数(action creator)
                const typeOfValue = typeof actionOrValue;
                if (actionOrValue && (typeOfValue === 'object' || typeOfValue === 'function')) {
                    return storeDispatch(actionOrValue);
                }

                //  非异常/函数/plain object的情况
                storeDispatch({
                    type: UNHANDLED_LOGIC_ERROR,
                    payload: actionOrValue,
                    error: true
                });
            }

            /**
             * 组装出一个有效的redux action并返回
             * @param  {String|Function} type       redux action type
             * @param  {Object} payload             redux payload
             * @param  {Error|Unfdeined} err        error
             * @return {Object}
             */
            function mapToAction(type, payload, err) {
                //  action type本身是一个action creator, 直接执行type
                if (typeof type === 'function') {
                    return type(payload);
                }
                //  包装出一个有效的redux action
                const act = { type, payload };
                if (err) { act.error = true; }
                return act;
            }

            // allowMore is now deprecated in favor of variable process arity
            // which sets processOptions.dispatchMultiple = true then
            // expects done() cb to be called to end
            // Might still be needed for internal use so keeping it for now
            const DispatchDefaults = {
                allowMore: false
            };

            /**
             * 触发
             * @param  {[type]} act     [description]
             * @param  {[type]} options [description]
             * @return {[type]}         [description]
             */
            function dispatch(act, options = DispatchDefaults) {
                const { allowMore } = applyDispatchDefaults(options);
                //  action !== undefined
                if (typeof act !== 'undefined') {
                    /**
                        let action;
                        if (isObservable(act)) {
                            action = act;
                        } else if (isPromise(act)) {
                            action = Observable.fromPromise(act);
                        } else if (act instanceof Error) {
                            action = Observable.throw(act);
                        } else {
                            action = Observable.of(act);
                        }
                        dispatch$.next(action);

                        https://cn.rx.js.org/class/es6/MiscJSDoc.js~ObserverDoc.html#instance-method-next
                     **/
                    dispatch$.next(
                        (isObservable(act)) ? act :
                        (isPromise(act)) ? Observable.fromPromise(act) :
                        (act instanceof Error) ? Observable.throw(act) :
                        Observable.of(act)
                    );
                }
                if (!(dispatchMultiple || allowMore)) {
                   dispatch$.complete();
                }
                return act;
            }

            function applyDispatchDefaults(options) {
                return {
                    ...DispatchDefaults,
                    ...options
                };
            }

            //  拼装createLogic中相关钩子函数(validate/tranform/process)中的第一个参数
            const depObj = {
                ...deps,
                cancelled$,
                ctx: {}, // 在不同钩子中共享数据
                getState,
                action
            };

            function shouldDispatch(act, useDispatch) {
                //  新的action为空
                if (!act) { return false; }
                //  在触发另外一个action之前, 确保触发的是一个新的action
                if (useDispatch === 'auto') {
                    return (act.type !== action.type);
                }
                //  否则根据useDispatch是否为空, 返回
                return (useDispatch);
            }

            const AllowRejectNextDefaults = {
                useDispatch: 'auto'
            };

            function applyAllowRejectNextDefaults(options) {
                return {
                    ...AllowRejectNextDefaults,
                    ...options
                };
            }

            //  拦截器(validate/tranform)里的allow或next
            function allow(act, options = AllowRejectNextDefaults) {
                handleNextOrDispatch(true, act, options);
            }
            function reject(act, options = AllowRejectNextDefaults) {
                handleNextOrDispatch(false, act, options);
            }

            //  完成本次action, 在createLogic中的process最后调用
            function done() {
                dispatch$.complete();
            }

            /**
             * 对当前拦截器类型action(validate/transform)做一次包装, 方便后面判断
             * @param  {Object} act 当前action
             * @return {Object}
             */
            function wrapActionForIntercept(act) {
                if (!act) { return act; }
                return {
                    __interceptAction: act
                };
            }

            /**
             * 判断传入的action是否为拦截器类型的
             * @param  {Object}  act 当前action
             * @return {Boolean}
             */
            function isInterceptAction(act) {
                return act && act.__interceptAction;
            }

            /**
             * 对拦截器执行解包
             * @param  {Object}  act 当前action
             * @return {Object}      redux action
             */
            function unwrapInterceptAction(act) {
                return act.__interceptAction;
            }

            /**
             * 拦截器(validate/tranform)里的allow、reject实现, 触发新的redux action
             * @param  {Boolean} shouldProcess 是否执行process
             * @param  {Object} act            新的redux action
             * @param  {Object} options
             */
            function handleNextOrDispatch(shouldProcess, act, options) {
                const { useDispatch } = applyAllowRejectNextDefaults(options);
                //  判断是否应该触发传入的redux action
                if (shouldDispatch(act, useDispatch)) {
                    monitor$.next({ action, dispAction: act, name, shouldProcess, op: 'nextDisp' });
                    interceptComplete = true;
                    dispatch(wrapActionForIntercept(act), { allowMore: true }); // will be completed later
                    logicActionObs.complete(); // dispatched action, so no next(act)
                } else { // normal next
                    if (act) {
                        monitor$.next({ action, nextAction: act, name, shouldProcess, op: 'next' });
                    } else {
                        //  无效的action, 直接结束本次拦截器
                        monitor$.next({ action, name, shouldProcess, op: 'filtered' });
                        interceptComplete = true;
                    }
                    postIfDefinedOrComplete(act, logicActionObs);
                }

                //  执行Logic中的process回调
                if (shouldProcess) {
                    //  组织depObj的action参数
                    depObj.action = act || action;
                    try {
                        const retValue = processFn(depObj, dispatch, done);
                        /**
                            如果在createLogic指定了processOption.dispatchReturn为true, 并且prcess执行完之后返回有效的值
                            就再把返回值作为一个新的redux action进行触发
                            否则直接结束dispatch$这个Rx.Subject
                            
                            执行process, 并且接收返回值
                            判断processOption.dispatchReturn
                                成立: 判断返回值是否有效

                                    有效: dispatch -> mapToActionAndDispatch
                                        mapToActionAndDispatch返回一个有效的action:
                                            继续reduxStore.dispatch(mapToActionAndDispatch(retValue))

                                    无效: dispatch$.complete -> monitor$.next({ action, name, op: 'end' }); cancelled$.complete(); cancelled$.unsubscribe();
                        **/
                        if (dispatchReturn) {
                            if (typeof retValue === 'undefined') {
                                dispatch$.complete();
                            } else {
                                dispatch(retValue);
                            }
                        }
                    } catch (err) {
                        console.error(`unhandled exception in logic named: ${name}`, err);
                        //  执行process的过程中发生异常
                        dispatch(Observable.throw(err));
                    }
                } else {
                    //  传入的act是一个空值, 或者和当前的type相同, 或者useDispatch不成立
                    dispatch$.complete();
                }
            }

            /**
             * 在本次拦截器之后执行
             * @param  {Object} act       新的action
             * @param  {Rx.Subject} act$  当前action对应的Observable对象
             */
            function postIfDefinedOrComplete(act, act$) {
                //  如果新的action存在, 执行新的action
                if (act) {
                    act$.next(act);
                }
                interceptComplete = true;
                act$.complete();
            }

            //  开始本次action的执行
            function start() {
                intercept(depObj, allow, reject);
            }

            start();
        })
        .takeUntil(cancel$)
        //  规定logicAction$值发出一个值就完成
        .take(1);

    return logicAction$;
}